2016-07-25 5 views
0

현재 mongoDb에서 elasticsearch로 데이터를 스트리밍하는 솔루션을 구축 중입니다. 내 목표는 elasticsearch에 성공한 모든 전송 항목을 추적하는 것입니다. akka-streams 및 elastic4s를 사용하고 있습니다. 현재 ES로 스트리밍이elastic4s 가입자의 응답을 akka-stream으로 되 돌린다

val esSubscriber: BulkIndexingSubscriber[CustomT] = esClient.subscriber[CustomT](
    batchSize = batchSize, 
    completionFn = {() => elasticFinishPromise.success(());()}, 
    errorFn = { (t: Throwable) => elasticFinishPromise.failure(t);()}, 
    concurrentRequests = concurrentRequests 
    ) 
val esSink: Sink[CustomT, NotUsed] = Sink.fromSubscriber(esSubscriber) 

처럼 그리고 이런 내 소스에서 뭔가 보이는 :

val a: [NotUsed] = mongoSrc 
    .via(some operations..) 
    .to(esSink) 
    .run() 

이제 모든 것을 지금 내가 두 번째 싱크 계산 예 항목에 대한 로깅을하고 잘 작동합니다. 하지만 나는 정말로 elasticsearch에 전달 된 항목을 기록하고 싶습니다. elastic4s 가입자가 onAck(): Unitlistener: ResponseListeneronFailure(): Unit 제공하며, 나는이

val mongoSrc: [Source..] 
val doStuff: [Flow..] 
val esSink: [Flow..] //now as flow instead of sink 
val logSink: [Sink[Int...]] //now gets for example a 1 for each successful transported item 

mongoSrc ~> doStuff ~> esSink ~> logSink 

같은 스트림으로 다시이 정보를 얻을 싶어요 어떻게 그것을 구현하는 것이? onAckonFailure의 요소를 버퍼링하는 사용자 지정 스테이지가 필요합니까? 아니면 더 쉬운 방법이 있습니까?

도움 주셔서 감사합니다.

+0

Akka Streams reactive-kafka 드라이버는 다음과 같은 소스 코드를 볼 수 있습니다. https://github.com/akka/reactive-kafka (특히 ProducerStage) – johanandren

+0

감사합니다. 도움이! 이 내일을 시도하십시오 – rincewind

+0

onAck 메서드를 통해 채워진 다른 스트림을 만들 수 있습니까? – monkjack

답변

1

Flow.fromSinkAndSource을 활용하여 Subscriber[T] 싱크대를 '흐리게'만들 수 있습니다. the docs에서 'Sink and Source (소스 및 소스에서 합성물 흐름)'그림을 확인하십시오.

이 경우 사용자 지정 actorPublisher를 소스로 연결하여 onAck()에서 메시지를 보내야합니다. 간단히 말해서

val doStuff = Flow[DocToIndex] 
       .grouped(batchSize) 
       .mapAsync(concurrentRequests)(bulkopFuture) 

을 따로 모든 유용한 추상화, 가입자가 단지 a bulk update request입니다 elastic4s :

이 때문에 당신은 쉬운 방법을 물었다.

+0

이제 'esSink' 접합점이 필요하지 않습니다. 그래프는'mongoSrc ~> doStuff ~> logSink'와 같습니다. –

+0

답장을 보내 주셔서 감사합니다. actorPublisher를 사용한 접근 방식이 잘 작동한다고 생각합니다. 소스가 큐에 들어가고 합성 흐름으로 문제가 발생하면 먼저 소스를 얻습니다. 그래프가 이미 구체화되고 늦게 주입되면 소스를 가져 오기 때문에 프리 머티리얼 액터 게시자 여야합니다. elastic4s 가입자. 이제 다른 스트림을 만들고 onAck 메서드를 source.queue로 푸시하면 모든 것이 잘 작동합니다. 또한 솔루션에 100 % 만족하지 못하고 복합 흐름을 선호합니다. – rincewind