2017-10-25 8 views
0

두 개의 카프카 KStream간에 일대일 조인을 수행하는 방법은 무엇입니까? 아래 코드는 일대일 방식으로 두 개의 Kafka KStream을 조인합니다. 누군가 KStream과의 일 대 다수 참여를 수행하는 방법을 안내 할 수 있습니까? 주제에 수신 된 데이터는 generics입니다. < String, JsonNode> 주제에 쓰여지는 데이터의 형식은 입니다. "order from": "test1 :", orderitem ":"test2 "} { "주문": "TEST1 :"OrderItem에에서 ":"TEST3 "}일대 다 KStream-KStream 조인

가이 형식으로 데이터를 얻을 수 있나요 : {"주문 ":"TEST1, { "OrderItem에에서" "TEST3"}}

public class ConsumerThreadPool { 

private static final String TOPIC = "jre1"; 
private static final String NEXTTOPIC ="Kafka"; 
private static final String FINALTOPIC="jvm1"; 
private static final Integer NUM_THREADS = 1; 
final Serializer<JsonNode> jsonSerializer = new JsonSerializer(); 
final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer(); 

final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer); 
final Serde<String> stringSerde = Serdes.String(); 


int threadNumber = 0; 
@Autowired 
private ConsumerConfigFactory consumerConfigFactory; 

@SuppressWarnings("unused") 
private ConsumerConnector consumer; 
private ExecutorService threadPool; 

public ConsumerThreadPool() { 
    threadPool = Executors.newFixedThreadPool(NUM_THREADS); 
} 

@PostConstruct 
public void startConsuming() { 
    ConsumerConfig consumerConfig = consumerConfigFactory.getConsumerConfig(); 
    consumer = createJavaConsumerConnector(consumerConfig); 
    KStreamBuilder builder = new KStreamBuilder(); 
    /* KTable<String,JsonNode> message = builder.table(stringSerde,jsonSerde,TOPIC); 


    KTable<String,JsonNode> orderstream = message 

      .filter((k,v)-> v.path("table").asText().equals("TEST.S_ORDER") 
        );    
    KTable<String,JsonNode> orderlist=message.filter((k,v)-> v.path("table").asText().equals("TEST.S_ORDER_ITEM")); 
    orderstream.to(stringSerde,jsonSerde,FINALTOPIC);  
    orderlist.to(stringSerde,jsonSerde,FINALTOPIC); */ 
    KStream<String,JsonNode>streams=builder.stream(TOPIC); 

    KStream<String,JsonNode> orderstream=streams.filter((k,v)-> v.path("table").asText().equals("TEST.S_ORDER")) 
      .map((k,v)->KeyValue.pair(v.path("after").path("ROW_ID").asText(),v)); 




    KStream<String, JsonNode> orderlist=streams.filter((k,v)-> v.path("table").asText().equals("TEST.S_ORDER_ITEM")) 
      .map((k,v)->KeyValue.pair(v.path("after").path("ORDER_ID").asText(),v)); 





    KStream<String,JsonNode> nextstream =orderstream.join(orderlist,(new ValueJoiner<JsonNode,JsonNode,JsonNode>(){ 
     @Override 
     public JsonNode apply(JsonNode first,JsonNode second){ 
      ObjectNode jNode = JsonNodeFactory.instance.objectNode(); 
      return jNode.put("from order",first.get("op_type").textValue()) 
        .put("from orderitem",second.get("op_type").textValue()); 
     } 
    }),JoinWindows.of(TimeUnit.SECONDS.toMillis(30)),stringSerde,jsonSerde,jsonSerde); 

    nextstream.to(stringSerde,jsonSerde,FINALTOPIC); 
    KafkaStreams stream=new KafkaStreams(builder, consumerConfigFactory.getConsumeConfig()); 
    stream.start(); 
    consume(); 
    stream.close(); 
} 

public void consume() { 



    @SuppressWarnings("resource") 
    KafkaConsumer<String,String> consumer = new KafkaConsumer<>(consumerConfigFactory.createConsume()); 
    consumer.subscribe(Arrays.asList(FINALTOPIC)); 

    while (true) { 
     ConsumerRecords<String, String> records = consumer.poll(100); 
     if(!records.isEmpty()){ 
      System.out.println("ConsumerRecords object created: "+records); 
      threadPool.submit(new MessageConsumer(records, threadNumber)); 
      threadNumber++; 
     } 

    } 

}  

}

답변

2

: { "OrderItem의에서" "TEST2"} 이미 언급했듯이, KStream-KStream은 이미 일대 다 조인입니다. 고유 키의 모든 조인 결과를 하나의 레코드로 집계하려는 것 같습니다.

이 작업을 수행하려면 .groupByKey().aggregate()을 적용 할 수 있습니다. 집계 함수는 빈 JSON으로 초기화되고 새 레코드는 새 조인 결과가 도착할 때마다 JSON에 추가됩니다.