2017-02-23 5 views
3

단일 마스터 주제에서 여러 스트림을 만들려면 어떻게해야합니까?하나의 마스터 주제에서 여러 스트림

KStreamBuilder builder = new KStreamBuilder(); 

builder.stream(Serdes.String(), Serdes.String(), "master") 
      /* Filtering logic */ 
      .to(Serdes.String(), Serdes.String(), "output1"); 

builder.stream(Serdes.String(), Serdes.String(), "master") 
      /* Filtering logic */ 
      .to(Serdes.String(), Serdes.String(), "output2"); 

KafkaStreams streams = new KafkaStreams(builder, /* config */); 
streams.start(); 

나는 다음과 같은 오류 얻을 :

org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology building: Topic master has already been registered by another source. 
    at org.apache.kafka.streams.processor.TopologyBuilder.addSource(TopologyBuilder.java:347) 
    at org.apache.kafka.streams.kstream.KStreamBuilder.stream(KStreamBuilder.java:92) 

나는 "마스터"의 각 스트림 KafkaStreams의 다른 인스턴스를 만들 필요가 있습니까을 나는 같은 것을 할 때?

답변

7

당신은 다시 사용할 수있는 KStream 만들 수 있습니다

KStream<String, String> inputStream = builder.stream(Serdes.String(), Serdes.String(), "master"); 

는 당신이 그것을 다시 사용할 수 있습니다 : 당신이 필터에 중복이없는 경우

inputStream.filter(..logic1) 
     .to(Serdes.String(), Serdes.String(), "output1"); 
inputStream.filter(..logic2) 
     .to(Serdes.String(), Serdes.String(), "output2"); 

KafkaStreams streams = new KafkaStreams(builder, /* config */); 
streams.start(); 
+2

, 당신은 또한'사용할 수를 inputStream.branch()'를 호출하여 중복되지 않는 하위 스트림을 반환합니다. –