2017-11-11 11 views
0

Scala 2.11 및 Akka Streams Kafka 0.17을 사용하고 있습니다.Akka Streams Kafka에서 ProducerMessage를 구성하는 동안 오프셋 매개 변수를 구성하는 방법은 무엇입니까?

  • SourceSource.actorRef를 사용하여 만들어집니다 :

    은 내가 스트림 있습니다. 여기에서는 액터가 일정한 간격으로 실행되고 메시지를 연속적으로 생성하도록 예정되어 있으며 스트림으로 출력됩니다.
  • ProducerFlow으로 첨부했습니다. 생산자가 ProducerMessage.Message을 카프카 (Kafka) 주제로 푸시합니다.
  • 일부 DB 작업. 처럼 보이는 ProducerMessage.Message을 구축 할

나는 문제가있다 :

final case class Message[K, V, +PassThrough](
    record: ProducerRecord[K, V], 
    passThrough: PassThrough 
) 

내가 쉽게 실제 메시지가 들어있는 record 매개 변수를 전달할 수 있습니다. 그러나 나는 passThrough 매개 변수에서 무엇을 전달해야할지 모르겠다. docs에 따르면

passThrough 필드는 ResultConsumer#flow 통과 포함 된 모든 요소를 ​​보유 할 수있다. 다운 스트림 작업에서 일부 컨텍스트를 전달해야하는 경우 유용한 입니다. 압축 풀기를 사용하면 압축을 풀 수 있지만 더 편리합니다. 예를 들어, 은 흐름에서 나중에 을 커밋 할 수있는 ConsumerMessage.CommittableOffset 또는 ConsumerMessage.CommittableOffsetBatch이 될 수 있습니다. 내 경우

에는 카프카 주제에 가입하고 내 스트림에 대한 Source (comittableSource 또는 plainSource)을 발생시키는 카프카 소비자가 없습니다. 이 경우 문서에서 설명한대로 소비자 오프셋을 전달했을 것입니다. 하지만 제 경우에는 배우가 그러한 소비자를 시뮬레이션하고 있습니다. 즉 ConsumerMessage.CommittableOffset에 액세스 할 수 없습니다. 여기서 passThrough 매개 변수는 무엇을 전달합니까? 이 경우 가장 좋은 방법은 무엇입니까?

답변

0

reactive-kafka 팀과 함께 문제를 전달한 후 제 대답을 얻었습니다. 기본적으로 그들이 말한 내용은 pass through에 대한 유스 케이스가없는 경우 없음 또는 사용되지 않았 음 또는 빈 문자열 ""으로 설정할 수 있습니다.

아마도 Producer.plainSink을 사용하는 경우 ProducerMessage.Message을 구성 할 필요가 없습니다. 그런 다음 Kafka ProducerRecord을 직접 만들 수 있습니다. 그 ProducerMessage.Message 케이스 클래스는 pass through을 원하거나 필요로하는 경우를위한 컨테이너 일뿐입니다. 통과 할 요소 이외에, 그것은 단지 Kafka ProducerRecord을 포함합니다.