2017-09-11 5 views
0

작업을 다시 시작하지 않고 런타임 중에 새로운 데이터 스트림을 동적으로 추가 할 수 있습니까?Apache Flink 동적으로 새 스트림 추가

는 지금까지 내가 이해로, 일반적인 FLINK 프로그램은 다음과 같습니다 : 내 경우

val env = StreamExecutionEnvironment.getExecutionEnvironment() 
val text = env.socketTextStream(hostname, port, "\n") 
val windowCounts = text.map... 

env.execute("Socket Window WordCount") 

이 가능하며, 그 예를 들면, 새 장치가 시작되고 따라서 다른 스트림을 처리해야합니다. 그러나이 새로운 스트림을 즉시 추가하는 방법은 무엇입니까?

답변

1

런타임시 Flink 프로그램에 새 스트림을 추가 할 수 없습니다.

이 문제를 해결하려면 들어오는 모든 이벤트 (예 : 모든 개별 스트림을 처리하는 Kafka 주제)가 포함 된 스트림을 만들어야합니다. 이벤트에는 어떤 스트림이 왔는지 식별하는 키가 있어야합니다. 이 키는 스트림을 keyBy에 전달하고 키 당 처리 로직을 적용 할 수 있습니다.

여러 소켓에서 읽으려면 소켓을 열기 위해 입력 (예 : 고정 소켓)에서 읽는 고유 한 SourceFunction을 쓸 수 있습니다. 그런 다음 내부적으로이 모든 소켓을 열어서 라운드 로빈 방식으로 읽을 수 있습니다.