Scala 2.11 및 Akka Streams Kafka 0.17을 사용하고 있습니다.Akka Streams Kafka에서 ProducerMessage를 구성하는 동안 오프셋 매개 변수를 구성하는 방법은 무엇입니까?
- 가
Source
이Source.actorRef
를 사용하여 만들어집니다 : 은 내가 스트림 있습니다. 여기에서는 액터가 일정한 간격으로 실행되고 메시지를 연속적으로 생성하도록 예정되어 있으며 스트림으로 출력됩니다. Producer
을Flow
으로 첨부했습니다. 생산자가ProducerMessage.Message
을 카프카 (Kafka) 주제로 푸시합니다.- 일부 DB 작업. 처럼 보이는
ProducerMessage.Message
을 구축 할
나는 문제가있다 :
final case class Message[K, V, +PassThrough](
record: ProducerRecord[K, V],
passThrough: PassThrough
)
내가 쉽게 실제 메시지가 들어있는 record
매개 변수를 전달할 수 있습니다. 그러나 나는 passThrough
매개 변수에서 무엇을 전달해야할지 모르겠다. docs에 따르면
passThrough
필드는Result
에Consumer#flow
통과 포함 된 모든 요소를 보유 할 수있다. 다운 스트림 작업에서 일부 컨텍스트를 전달해야하는 경우 유용한 입니다. 압축 풀기를 사용하면 압축을 풀 수 있지만 더 편리합니다. 예를 들어, 은 흐름에서 나중에 을 커밋 할 수있는ConsumerMessage.CommittableOffset
또는ConsumerMessage.CommittableOffsetBatch
이 될 수 있습니다. 내 경우
에는 카프카 주제에 가입하고 내 스트림에 대한 Source
(comittableSource
또는 plainSource
)을 발생시키는 카프카 소비자가 없습니다. 이 경우 문서에서 설명한대로 소비자 오프셋을 전달했을 것입니다. 하지만 제 경우에는 배우가 그러한 소비자를 시뮬레이션하고 있습니다. 즉 ConsumerMessage.CommittableOffset
에 액세스 할 수 없습니다. 여기서 passThrough
매개 변수는 무엇을 전달합니까? 이 경우 가장 좋은 방법은 무엇입니까?