내 애플리케이션에 제작자와 소비자가 있습니다. 내 프로듀서는 불규칙하게 메시지를 보낸다. 언젠가는 내 대기열이 비어있을 것이고 언젠가 나는 몇 가지 메시지를 갖게 될 것입니다. 고객이 대기열을 청취하도록하고 메시지가 들어있는 경우 가져 와서이 메시지를 처리하십시오. 이 프로세스는 몇 시간이 걸릴 수 있으며 현재 메시지 처리가 끝나지 않은 경우 내 고객이 대기열에 대한 다른 메시지를 보내지 않게 할 수 있습니다.소비자/생산자 AWS SQS 동기 소비자가있는 스칼라
AKKA와 AWS SQS가 내 요구를 충족시킬 수 있다고 생각합니다. 문서와 예제를 읽으면 akka-camel이 제 작업을 간소화 할 수 있습니다.
github에이 example이 (가) 있습니다.
이class MySqsConsumer extends Consumer {
//The SQS URI is an in-only message exchange (autoAck=true)
override def endpointUri = "aws-sqs://sqs-akka-camel? amazonSQSClient=#client"
override def receive = {
case msg: CamelMessage => {
println("Start received %s" format msg.bodyAs[String])
Thread.sleep(4000)
println("Stop received %s" format msg.bodyAs[String])
}
}
}
스택 트레이스 : 여기
...
14:38:53.335 [Camel (sqs-akka-camel) thread #0 - aws-sqs://sqs-akka-camel] DEBUG o.a.camel.processor.SendProcessor - >>>> Endpoint[akka://sqs-akka-camel/user/consumer?autoAck=true&replyTimeout=60000+milliseconds] Exchange[Message: Hello World1!]
14:38:54.051 [Camel (sqs-akka-camel) thread #0 - aws-sqs://sqs-akka-camel] DEBUG o.a.camel.processor.SendProcessor - >>>> Endpoint[akka://sqs-akka-camel/user/consumer?autoAck=true&replyTimeout=60000+milliseconds] Exchange[Message: Hello World2!]
14:38:54.753 [Camel (sqs-akka-camel) thread #0 - aws-sqs://sqs-akka-camel] DEBUG o.a.camel.processor.SendProcessor - >>>> Endpoint[akka://sqs-akka-camel/user/consumer?autoAck=true&replyTimeout=60000+milliseconds] Exchange[Message: Hello World3!]
Start received Hello World1!
Stop received Hello World1!
Start received Hello World2!
Stop received Hello World2!
Start received Hello World3!
...
내 문제가 akka-이다 나는 소비자의 구성에 더 많은 관심
합니다 (Thread.sleep를 그냥 내 처리를 시뮬레이션하는 것입니다) camel은 receive 메소드가 완료되기 전에 대기열에서 메시지를 읽습니다.
소비자가 새로운 메시지를 받기 전에 프로세스가 끝날 때까지 기다리려면 어떻게해야합니까? 잘못된 도구/라이브러리를 사용하고 있다면 새로운 도구로 안내 할 수 있습니까? 당신이이 SinkQueue를 반환합니다 akka 스트림을 사용하지만,이 경우 구체화 할 때, Sink.queue
val graph=yourSource.to(Sink.queue)
에 (graph.run
를) 소스를 전송하는 경우