두 개의 카프카 KStream간에 일대일 조인을 수행하는 방법은 무엇입니까? 아래 코드는 일대일 방식으로 두 개의 Kafka KStream을 조인합니다. 누군가 KStream과의 일 대 다수 참여를 수행하는 방법을 안내 할 수 있습니까? 주제에 수신 된 데이터는 generics입니다. < String, JsonNode> 주제에 쓰여지는 데이터의 형식은 입니다. "order from": "test1 :", orderitem ":"test2 "} { "주문": "TEST1 :"OrderItem에에서 ":"TEST3 "}일대 다 KStream-KStream 조인
가이 형식으로 데이터를 얻을 수 있나요 : {"주문 ":"TEST1, { "OrderItem에에서" "TEST3"}}
public class ConsumerThreadPool {
private static final String TOPIC = "jre1";
private static final String NEXTTOPIC ="Kafka";
private static final String FINALTOPIC="jvm1";
private static final Integer NUM_THREADS = 1;
final Serializer<JsonNode> jsonSerializer = new JsonSerializer();
final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);
final Serde<String> stringSerde = Serdes.String();
int threadNumber = 0;
@Autowired
private ConsumerConfigFactory consumerConfigFactory;
@SuppressWarnings("unused")
private ConsumerConnector consumer;
private ExecutorService threadPool;
public ConsumerThreadPool() {
threadPool = Executors.newFixedThreadPool(NUM_THREADS);
}
@PostConstruct
public void startConsuming() {
ConsumerConfig consumerConfig = consumerConfigFactory.getConsumerConfig();
consumer = createJavaConsumerConnector(consumerConfig);
KStreamBuilder builder = new KStreamBuilder();
/* KTable<String,JsonNode> message = builder.table(stringSerde,jsonSerde,TOPIC);
KTable<String,JsonNode> orderstream = message
.filter((k,v)-> v.path("table").asText().equals("TEST.S_ORDER")
);
KTable<String,JsonNode> orderlist=message.filter((k,v)-> v.path("table").asText().equals("TEST.S_ORDER_ITEM"));
orderstream.to(stringSerde,jsonSerde,FINALTOPIC);
orderlist.to(stringSerde,jsonSerde,FINALTOPIC); */
KStream<String,JsonNode>streams=builder.stream(TOPIC);
KStream<String,JsonNode> orderstream=streams.filter((k,v)-> v.path("table").asText().equals("TEST.S_ORDER"))
.map((k,v)->KeyValue.pair(v.path("after").path("ROW_ID").asText(),v));
KStream<String, JsonNode> orderlist=streams.filter((k,v)-> v.path("table").asText().equals("TEST.S_ORDER_ITEM"))
.map((k,v)->KeyValue.pair(v.path("after").path("ORDER_ID").asText(),v));
KStream<String,JsonNode> nextstream =orderstream.join(orderlist,(new ValueJoiner<JsonNode,JsonNode,JsonNode>(){
@Override
public JsonNode apply(JsonNode first,JsonNode second){
ObjectNode jNode = JsonNodeFactory.instance.objectNode();
return jNode.put("from order",first.get("op_type").textValue())
.put("from orderitem",second.get("op_type").textValue());
}
}),JoinWindows.of(TimeUnit.SECONDS.toMillis(30)),stringSerde,jsonSerde,jsonSerde);
nextstream.to(stringSerde,jsonSerde,FINALTOPIC);
KafkaStreams stream=new KafkaStreams(builder, consumerConfigFactory.getConsumeConfig());
stream.start();
consume();
stream.close();
}
public void consume() {
@SuppressWarnings("resource")
KafkaConsumer<String,String> consumer = new KafkaConsumer<>(consumerConfigFactory.createConsume());
consumer.subscribe(Arrays.asList(FINALTOPIC));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
if(!records.isEmpty()){
System.out.println("ConsumerRecords object created: "+records);
threadPool.submit(new MessageConsumer(records, threadNumber));
threadNumber++;
}
}
}
}