나는 두 개의 스트림이 하나의 Int이며 다른 하나는 json이다. json 스키마에는 몇 가지 int 인 하나의 키가있다. 그래서 나는 다른 정수 스트림과 키 비교를 통해 json 스트림을 필터링해야한다. 플린 크?다른 방법으로 Apache flink 스트림을 필터링하는 방법은 무엇입니까?
0
A
답변
1
예, Flink로 이러한 종류의 스트림 처리를 수행 할 수 있습니다. 당신이 FLINK에서 필요한 기본적인 빌딩 블록이 연결된 스트림 및 상태 함수입니다 - 여기 RichCoFlatMap 사용 예제 :이 예에서
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
import org.apache.flink.util.Collector;
public class Connect {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Event> control = env.fromElements(
new Event(17),
new Event(42))
.keyBy("key");
DataStream<Event> data = env.fromElements(
new Event(2),
new Event(42),
new Event(6),
new Event(17),
new Event(8),
new Event(42)
)
.keyBy("key");
DataStream<Event> result = control
.connect(data)
.flatMap(new MyConnectedStreams());
result.print();
env.execute();
}
static final class MyConnectedStreams
extends RichCoFlatMapFunction<Event, Event, Event> {
private ValueState<Boolean> seen = null;
@Override
public void open(Configuration config) {
ValueStateDescriptor<Boolean> descriptor = new ValueStateDescriptor<>(
// state name
"have-seen-key",
// type information of state
TypeInformation.of(new TypeHint<Boolean>() {
}));
seen = getRuntimeContext().getState(descriptor);
}
@Override
public void flatMap1(Event control, Collector<Event> out) throws Exception {
seen.update(Boolean.TRUE);
}
@Override
public void flatMap2(Event data, Collector<Event> out) throws Exception {
if (seen.value() == Boolean.TRUE) {
out.collect(data);
}
}
}
public static final class Event {
public Event() {
}
public Event(int key) {
this.key = key;
}
public int key;
public String toString() {
return String.valueOf(key);
}
}
}
를 제어 스트림에서 볼 수있다 만 키는 통과 데이터 스트림 - 다른 모든 이벤트가 필터링됩니다. 나는 Flink's managed keyed state과 connected streams을 이용했습니다.
이 간단한 내용을 유지하기 위해 데이터 스트림에 JSON이 있다는 요구 사항을 무시했지만 JSON 및 Flink에서 작업하는 방법의 예를 찾을 수 있습니다.
두 스트림이 서로 상대적으로 타이밍을 제어 할 수 없으므로 결과가 비 결정적이라는 점에 유의하십시오. 스트림에 이벤트 시간 소인을 추가 한 다음 대신 RichCoProcessFunction을 사용하여이를 관리 할 수 있습니다.