현재 mongoDb에서 elasticsearch로 데이터를 스트리밍하는 솔루션을 구축 중입니다. 내 목표는 elasticsearch에 성공한 모든 전송 항목을 추적하는 것입니다. akka-streams 및 elastic4s를 사용하고 있습니다. 현재 ES로 스트리밍이elastic4s 가입자의 응답을 akka-stream으로 되 돌린다
val esSubscriber: BulkIndexingSubscriber[CustomT] = esClient.subscriber[CustomT](
batchSize = batchSize,
completionFn = {() => elasticFinishPromise.success(());()},
errorFn = { (t: Throwable) => elasticFinishPromise.failure(t);()},
concurrentRequests = concurrentRequests
)
val esSink: Sink[CustomT, NotUsed] = Sink.fromSubscriber(esSubscriber)
처럼 그리고 이런 내 소스에서 뭔가 보이는 :
val a: [NotUsed] = mongoSrc
.via(some operations..)
.to(esSink)
.run()
이제 모든 것을 지금 내가 두 번째 싱크 계산 예 항목에 대한 로깅을하고 잘 작동합니다. 하지만 나는 정말로 elasticsearch에 전달 된 항목을 기록하고 싶습니다. elastic4s 가입자가 onAck(): Unit
와 listener: ResponseListener
및 onFailure(): Unit
제공하며, 나는이
val mongoSrc: [Source..]
val doStuff: [Flow..]
val esSink: [Flow..] //now as flow instead of sink
val logSink: [Sink[Int...]] //now gets for example a 1 for each successful transported item
mongoSrc ~> doStuff ~> esSink ~> logSink
같은 스트림으로 다시이 정보를 얻을 싶어요 어떻게 그것을 구현하는 것이? onAck
과 onFailure
의 요소를 버퍼링하는 사용자 지정 스테이지가 필요합니까? 아니면 더 쉬운 방법이 있습니까?
도움 주셔서 감사합니다.
Akka Streams reactive-kafka 드라이버는 다음과 같은 소스 코드를 볼 수 있습니다. https://github.com/akka/reactive-kafka (특히 ProducerStage) – johanandren
감사합니다. 도움이! 이 내일을 시도하십시오 – rincewind
onAck 메서드를 통해 채워진 다른 스트림을 만들 수 있습니까? – monkjack