2017-03-25 4 views
0

부모의 레코드 내용을 기반으로 카파의 주제 (상위)에서 다른 주제 (하위)로 작성하려고합니다. 부모 항목에서 소비하는 샘플 레코드는 {"date":{"string":"2017-03-20"},"time":{"string":"20:04:13:563"},"event_nr":1572470,"interface":"Transaction Manager","event_id":5001,"date_time":1490040253563,"entity":"Transaction Manager","state":0,"msg_param_1":{"string":"ISWSnk"},"msg_param_2":{"string":"Application startup"},"msg_param_3":null,"msg_param_4":null,"msg_param_5":null,"msg_param_6":null,"msg_param_7":null,"msg_param_8":null,"msg_param_9":null,"long_msg_param_1":null,"long_msg_param_2":null,"long_msg_param_3":null,"long_msg_param_4":null,"long_msg_param_5":null,"long_msg_param_6":null,"long_msg_param_7":null,"long_msg_param_8":null,"long_msg_param_9":null,"last_sent":{"long":1490040253563},"transmit_count":{"int":1},"team_id":null,"app_id":{"int":4},"logged_by_app_id":{"int":4},"entity_type":{"int":3},"binary_data":null}입니다.kafka에 작성 kafkastreams를 사용하여 레코드 내용에 기반한 주제

나는 개체의 값과 동일한 이름의 주제에 쓸 엔티티의 값을 사용하려는 것이다

은 (엔티티의 값의 고정 된 양이 너무 내가 정적으로 생성 할 수있다는 프로그래밍 방식 어려운 경우 그 주제를 동적으로 작성). 내가 만드는 내용입니다 [email protected]

import org.apache.kafka.common.serialization.Serde; 
import org.apache.kafka.common.serialization.Serdes; 
import org.apache.kafka.streams.KafkaStreams; 
import org.apache.kafka.streams.KeyValue; 
import org.apache.kafka.streams.StreamsConfig; 
import org.apache.kafka.streams.kstream.KStream; 
import org.apache.kafka.streams.kstream.KStreamBuilder; 
import java.util.Properties; 

public class entityDataLoader { 
    public static void main(final String[] args) throws Exception { 
    final Properties streamsConfiguration = new Properties(); 
    streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "map-function-lambda-example"); 
    streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); 
    streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass().getName()); 
    streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); 

    // Set up serializers and deserializers, which we will use for overriding the default serdes 
    // specified above. 
    final Serde<String> stringSerde = Serdes.String(); 
    final Serde<byte[]> byteArraySerde = Serdes.ByteArray(); 

    // In the subsequent lines we define the processing topology of the Streams application. 
    final KStreamBuilder builder = new KStreamBuilder(); 

    // Read the input Kafka topic into a KStream instance. 
    final KStream<byte[], String> textLines = builder.stream(byteArraySerde, stringSerde, "postilion-events"); 

    String content = textLines.toString(); 
    String entity = JSONExtractor.returnJSONValue(content, "entity"); 
    System.out.println(entity); 

    textLines.to(entity); 

    final KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration); 
    streams.cleanUp(); 
    streams.start(); 

    // Add shutdown hook to respond to SIGTERM and gracefully close Kafka Streams 
    Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); 
    } 
} 

내용을 사용하려고 해요 그것은 분명 그 KStream.toString()는 기업의 가치를 시도하는 데 사용하는 올바른 방법이 아닙니다 @ .

P. JSONExtractor 클래스는 당신은 "서브 스트림"에 부모 스트림을 분할하여 하나 개의 출력 항목 (참조 http://docs.confluent.io/current/streams/developer-guide.html#stateless-transformations)

귀하의 branch() 반드시 각 "서브 스트림"을 작성 branch()을 사용할 수 있습니다

import org.json.simple.JSONObject; 
import org.json.simple.parser.ParseException; 
import org.json.simple.parser.JSONParser; 
class JSONExtractor { 

public static String returnJSONValue(String args, String value){ 
    JSONParser parser = new JSONParser(); 
    String app= null; 
    System.out.println(args); 
    try{ 
     Object obj = parser.parse(args); 
     JSONObject JObj = (JSONObject)obj; 
     app= (String) JObj.get(value); 
     return app; 
    } 
    catch(ParseException pe){ 
     System.out.println("No Object found"); 
     System.out.println(pe); 
    } 
    return app; 
} 
} 

답변

1

으로 정의된다 당신이 모든 주제를 출력 할 수 있도록 하나의 "하위 스트림"을 만들지 만, 모든 주제를 알고 있기 때문에 문제가되지 않습니다.

또한 카프카 스트림의 경우 응용 프로그램을 시작하기 전에 출력 주제를 모두 만드는 것이 좋습니다 (http://docs.confluent.io/current/streams/developer-guide.html#user-topics 참조)