2016-06-17 8 views
0

내 애플리케이션에 제작자와 소비자가 있습니다. 내 프로듀서는 불규칙하게 메시지를 보낸다. 언젠가는 내 대기열이 비어있을 것이고 언젠가 나는 몇 가지 메시지를 갖게 될 것입니다. 고객이 대기열을 청취하도록하고 메시지가 들어있는 경우 가져 와서이 메시지를 처리하십시오. 이 프로세스는 몇 시간이 걸릴 수 있으며 현재 메시지 처리가 끝나지 않은 경우 내 고객이 대기열에 대한 다른 메시지를 보내지 않게 할 수 있습니다.소비자/생산자 AWS SQS 동기 소비자가있는 스칼라

AKKA와 AWS SQS가 내 요구를 충족시킬 수 있다고 생각합니다. 문서와 예제를 읽으면 akka-camel이 제 작업을 간소화 할 수 있습니다.

github에이 example이 (가) 있습니다.

class MySqsConsumer extends Consumer { 

    //The SQS URI is an in-only message exchange (autoAck=true) 
    override def endpointUri = "aws-sqs://sqs-akka-camel? amazonSQSClient=#client" 

    override def receive = { 
    case msg: CamelMessage => { 
     println("Start received %s" format msg.bodyAs[String]) 
     Thread.sleep(4000) 
     println("Stop received %s" format msg.bodyAs[String]) 

    } 

    } 
} 

스택 트레이스 : 여기

... 
14:38:53.335 [Camel (sqs-akka-camel) thread #0 - aws-sqs://sqs-akka-camel] DEBUG o.a.camel.processor.SendProcessor - >>>> Endpoint[akka://sqs-akka-camel/user/consumer?autoAck=true&replyTimeout=60000+milliseconds] Exchange[Message: Hello World1!] 
14:38:54.051 [Camel (sqs-akka-camel) thread #0 - aws-sqs://sqs-akka-camel] DEBUG o.a.camel.processor.SendProcessor - >>>> Endpoint[akka://sqs-akka-camel/user/consumer?autoAck=true&replyTimeout=60000+milliseconds] Exchange[Message: Hello World2!] 
14:38:54.753 [Camel (sqs-akka-camel) thread #0 - aws-sqs://sqs-akka-camel] DEBUG o.a.camel.processor.SendProcessor - >>>> Endpoint[akka://sqs-akka-camel/user/consumer?autoAck=true&replyTimeout=60000+milliseconds] Exchange[Message: Hello World3!] 
Start received Hello World1! 
Stop received Hello World1! 
Start received Hello World2! 
Stop received Hello World2! 
Start received Hello World3! 
... 

내 문제가 akka-이다 나는 소비자의 구성에 더 많은 관심

합니다 (Thread.sleep를 그냥 내 처리를 시뮬레이션하는 것입니다) camel은 receive 메소드가 완료되기 전에 대기열에서 메시지를 읽습니다.

소비자가 새로운 메시지를 받기 전에 프로세스가 끝날 때까지 기다리려면 어떻게해야합니까? 잘못된 도구/라이브러리를 사용하고 있다면 새로운 도구로 안내 할 수 있습니까? 당신이이 SinkQueue를 반환합니다 akka 스트림을 사용하지만,이 경우 구체화 할 때, Sink.queue

val graph=yourSource.to(Sink.queue) 

에 (graph.run를) 소스를 전송하는 경우

답변

1

확실하지, 당신은 수동으로 항목을 뽑을 수 그것에서. 배압 메커니즘을 사용하므로 아이템을 가져 오지 않으면 어떤 일이 생길지 걱정할 필요가 없습니다.

참고 : 제네릭을 의도적으로 무시했습니다. 그러나 그들을 잊지 마라.

0

내가 아는 한, 당신은 그것을 할 수 없다. 낙타는 항상 대기열에서 메시지를 소비합니다.

override def autoAck = false을 설정할 수 있습니다. 현재 메시지를 수신 할 때까지 에 다른 메시지를 보내지 않습니다. 그러나 "숨겨진"낙타 배우는 여전히 소비 할 것입니다.

사용자 정의 amazonSQSClient을 제공하고 어떻게 든 제어 할 수 있습니다.