2017-12-16 34 views
0

각 메시지가 JMS로 성공적으로 전송되거나 실패 할 경우 콜백을 수행하는 방법은 무엇입니까?Alpakka - 메시지 처리시 콜백

val jmsSink = JmsSink.textSink(
    JmsSinkSettings(connectionFactory).withQueue("My_Queue") 
) 
Source(Stream.from(1)) 
    .map(_.toString) 
    .runWith(jmsSink) 

보다 구체적인 예

// creating a sourceQueue which is bound to jmsSink 
val sourceQueue: SourceQueueWithComplete[String] = 
    Source.queue[String](bufferSize, OverflowStrategy.backpressure) 
     .to(jmsSink) 
     .run() 

클라이언트가 항목을 보내는 sourceQueue 행 :

val result: Future[QueueOfferResult] = sourceQueue offer "my-item" 

val resultsourceQueue으로 아이템을 삽입의 결과이며, 그 의미는 아니다 그것은 아직 JMS로 보내졌다. 항목이 싱크 프로세스를 거쳐 JMS 큐에 삽입 될 때 이벤트를 트리거해야합니다. 각 성공적인 메시지 (기준 경우 Unit을 반환하는 함수를 의미한다 "콜백") 콜백을 호출하는

답변

1

한 가지 방법은 같은 JMS 대기열로 구독 해당 Source를 생성하고 사용하는 것입니다 runForeach :

val jmsSink = JmsSink.textSink(
    JmsSinkSettings(connectionFactory).withQueue("My_Queue") 
) 

Source(...) 
    .map(_.toString) 
    .runWith(jmsSink) 

val jmsSource = JmsSource(
    JmsSourceSettings(connectionFactory).withQueue("My_Queue") 
) 

jmsSource.runForeach(println) 

위의 예제는 싱크를 통해 큐에 게시 된 각 메시지를 인쇄합니다.

오류의 경우 예외가 발생하면 현재 스트림이 종료됩니다. 예를 들어, 발생한 예외의 경우에 당신이 그것을 종료하는 대신 스트림을 예외를 인쇄하고 다시 시작하려는 경우, 당신은 당신의 원래 Sourcesupervision strategy을 첨부 할 수 있습니다 : @chunjef 응답에 대한

val decider: Supervision.Decider = { 
    case e: Exception => 
    println(s"Exception thrown: ${e.getMessage}") 
    Supervision.Resume 
} 

val flow = Flow[String] 
    .withAttributes(ActorAttributes.supervisionStrategy(decider)) 

Source(...) 
    .map(_.toString) 
    .via(flow) 
    .runWith(jmsSink) 

val jmsSource = ... 
+0

감사합니다. 'jmsSource.runForeach (println)'는 대기열에서 항목을 대기열에서 제외합니다. 그렇지 않습니까? 정확하게 필요한 것이 무엇인지 명확히하기 위해 몇 가지 세부 사항을 추가했습니다. – Feyyaz