2017-11-29 9 views
0

스파크 직접 스트리밍을 사용할 때 동물원에 오프셋을 저장하려고합니다. 나는 그것이 API에서 사용할 수 없습니다 보는 바와 같이 우리가 JavaPairInputDstream를 얻을하는 데 사용할 수있는 방법의 해결 방법은 내가 JavaInputDstream API는 특정 오프셋에서 시작하는 옵션을 가지고 볼 수 있지만 나는 JavaPairInputDstream위해 필요오프셋에서 JavaPairInputDstream 스트림

JavaInputDStream<String> messages = KafkaUtils.createDirectStream(jsc, String.class, 
          String.class, StringDecoder.class, StringDecoder.class, String.class, kafkaParams, offsets,(messageAndMetadata) -> messageAndMetadata.message()); 

하지만이 오프셋이없는 JavaPairInputDstream을 사용합니다.

답변

0

직접 스트림에서 변환을 수행하고 키 - 값 쌍에 매핑했습니다.

final JavaPairDStream<String, String> messages2 =messages.transformToPair(pairRdd -> { 

          pairRdd.mapToPair(label->new Tuple2<>(label,label))};);