2017-10-31 7 views
1

Flink의 Dataset API에서 다음과 같은 간단한 쿼리를 구현하려고합니다.Apache Flink : DataSet API의 NullPointerException 외부 조인

select 
    t1_value1 
from 
    table1 
where 
    t1_suppkey not in ( 
     select 
      t2_suppkey 
     from 
      table2 
    ) 

그래서 내 생각은 (... table1.leftOuterJoin (표 2))가 왼쪽 외부 조인을 수행하고 내가 t1_suppkey 및 t2_suppkey에 대한 값을 얻을 수있는 모든 행을 삭제하는 것이 었습니다.

그래서 나는 같이 그것을 시도 :

 output = table1 
    .leftOuterJoin(table2).where("t1_suppkey").equalTo("t2_suppkey") 
    .with((Table1 t1, Table2 t2) -> new Tuple2<>(t1.ps_suppkey, t2.s_suppkey)) 
    .returns(new TypeHint <Tuple2<Integer, Integer>>() {}); 

을 그러나 나는 이런 식으로 할 경우 항상 "java.lang.NullPointerException이"실패 내가 왜 모르겠어요. 왼쪽 외부 조인 대신 일반 조인을 사용하면 코드가 작동하지만 원하는 것은 아닙니다.

Left Join을 다르게 구현해야합니까, 아니면 Dataset API에서 "not in"문을 다시 작성하는 더 간단한 방법이 있습니까?

답변

0

DataSet API의 외부 조인은 JoinFunction을 내부 쪽에서 조인 레코드를 찾지 못하는 외부 레코드에도 호출합니다. 이 경우 the JoinFunction.join() method is called with null입니다.

LEFT OUTER JOIN을 사용하고 있으므로 두 번째 인수 Table2 t2null이 될 수 있습니다. 은 t2.s_suppkey에 의해 발생합니다. t2 == null을 확인하고 null이 아닌 경우 t2에만 액세스해야합니다.

또한 NOT IN이 Collector 인수가 있습니다 만 t1t2 == null 경우를 방출 FlatJoinFunction에 가입 구현할 수 있습니다.

또 다른 옵션은 예제에서 쿼리를 지원하는 Flink의 일괄 SQL support을 사용하는 것입니다.

+0

감사합니다. [this] (https://stackoverflow.com/a/47074758/8180276)와 같이 해결했습니다. – Rhyzx

0
output = table1 
.leftOuterJoin(table2) 
.where("t1_suppkey").equalTo("t2_suppkey") 
.with((Table1 t1, Table2 t2, Collector<Tuple2<Integer, Integer>> c) -> { 
if(t2 == null) { 
    c.collect(new Tuple2<>(t1.t1_suppkey, t1.t1_value1)); 
} 
else { 
    //Do nothing. 
}})