2017-04-20 6 views

답변

1

예, Flink로 이러한 종류의 스트림 처리를 수행 할 수 있습니다. 당신이 FLINK에서 필요한 기본적인 빌딩 블록이 연결된 스트림 및 상태 함수입니다 - 여기 RichCoFlatMap 사용 예제 :이 예에서

import org.apache.flink.api.common.state.ValueState; 
import org.apache.flink.api.common.state.ValueStateDescriptor; 
import org.apache.flink.api.common.typeinfo.TypeHint; 
import org.apache.flink.api.common.typeinfo.TypeInformation; 
import org.apache.flink.configuration.Configuration; 
import org.apache.flink.streaming.api.datastream.DataStream; 
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; 
import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction; 
import org.apache.flink.util.Collector; 

public class Connect { 
    public static void main(String[] args) throws Exception { 
     final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 

     DataStream<Event> control = env.fromElements(
       new Event(17), 
       new Event(42)) 
       .keyBy("key"); 

     DataStream<Event> data = env.fromElements(
       new Event(2), 
       new Event(42), 
       new Event(6), 
       new Event(17), 
       new Event(8), 
       new Event(42) 
       ) 
       .keyBy("key"); 

     DataStream<Event> result = control 
       .connect(data) 
       .flatMap(new MyConnectedStreams()); 

     result.print(); 

     env.execute(); 
    } 

    static final class MyConnectedStreams 
      extends RichCoFlatMapFunction<Event, Event, Event> { 

     private ValueState<Boolean> seen = null; 

     @Override 
     public void open(Configuration config) { 
      ValueStateDescriptor<Boolean> descriptor = new ValueStateDescriptor<>(
        // state name 
        "have-seen-key", 
        // type information of state 
        TypeInformation.of(new TypeHint<Boolean>() { 
        })); 
      seen = getRuntimeContext().getState(descriptor); 
     } 

     @Override 
     public void flatMap1(Event control, Collector<Event> out) throws Exception { 
      seen.update(Boolean.TRUE); 
     } 

     @Override 
     public void flatMap2(Event data, Collector<Event> out) throws Exception { 
      if (seen.value() == Boolean.TRUE) { 
       out.collect(data); 
      } 
     } 
    } 


    public static final class Event { 
     public Event() { 
     } 

     public Event(int key) { 
      this.key = key; 
     } 

     public int key; 

     public String toString() { 
      return String.valueOf(key); 
     } 
    } 
} 

를 제어 스트림에서 볼 수있다 만 키는 통과 데이터 스트림 - 다른 모든 이벤트가 필터링됩니다. 나는 Flink's managed keyed stateconnected streams을 이용했습니다.

이 간단한 내용을 유지하기 위해 데이터 스트림에 JSON이 있다는 요구 사항을 무시했지만 JSON 및 Flink에서 작업하는 방법의 예를 찾을 수 있습니다.

두 스트림이 서로 상대적으로 타이밍을 제어 할 수 없으므로 결과가 비 결정적이라는 점에 유의하십시오. 스트림에 이벤트 시간 소인을 추가 한 다음 대신 RichCoProcessFunction을 사용하여이를 관리 할 수 ​​있습니다.