2017-12-18 15 views
1

alpakka를 사용하여 여러 jmsSource (다른 대기열에 대해)를 시작하는 시나리오가 있습니다. 또한 어느 시점에서든 대기열을 분리해야합니다. 그래서 아래와 같이 jms akka 스트림에 KillSwitch를 추가했습니다. -Akka Streams Alpakka jms의 KickSwitch

.. 등등.

응용 프로그램을 시작한 후에 모든 대기열이 연결되어 정상적으로 작동합니다. 그러나 stop 메소드를 사용하여 큐를 분리하려고 시도 할 때 모든 큐가 연결이 끊어지지 않고 동작이 무작위 인 것은 아닙니다. 나는 또한 killSwitch가 모든 청취자에게 다르다는 것을 확인했다.

누군가가 나에게 무엇이 잘못되었는지 말해 줄 수 있습니까?

답변

0

로그는 고유 한 스트림을 사용하는 여러 대기열에 연결되어 있지만 다른 대기열에 연결되어있을 가능성이 높은 스트림을 가지고 있다는 환상을 뒷받침합니다. 두 수신기 개체 모두에서 로거는 재정의 된 queue 이름을 기록하지만이 큐 이름은 jmsSource을 구성하는 데 사용되지 않습니다.

jmsSource의 정의는 표시하지 않습니다. 외관상으로는 MessageListener 형질의 외부에 정의되어 있는데,이 경우 ListenerAListenerB은 모두 jmsSource을 사용합니다. 즉, ListenerAListenerB 동안 jmsPipeline의 고유 인스턴스 (킬 스위치가 다른 이유이다),이 jmsPipeline 인스턴스 모두 jmsSource가 호출 할 때마다 다른 Source을 만드는 def 아닌 경우 (동일한 jmsSource 인스턴스로부터 도출되는이 이 경우에도 기본적인 문제는 남아 있습니다 : queue은 구성에서 사용되지 않습니다). Alpakka에서

는 JMS 큐는 너무 jmsSource 아마 같이 보입니다, JmsSourceSettings에 구성되어있는 다음

ListenerA.start(), 예를 들어,라고
val jmsSource: Source[String, NotUsed] = JmsSource.textSource(
    JmsSourceSettings(connectionFactory).withBufferSize(10).withQueue("MyQueue") 
)      // the queue is configured here^

, 다음이 기록됩니다

Invoking listener : Queue_A 
listener : Queue_A started 

다시 위의 로그 문에서 "Queue_A"def queue: String 멤버의 값이 ListenerA입니다. 위의 예에서 실제로 jmsSource ("MyQueue")에 구성된 대기열 일 필요는 없습니다. ListenerB의 경우와연결자에 로그인하는 경우에도 마찬가지입니다.

직접적인 수정은 MessageListener 특성 내부 jmsSource하고 JmsSourceSettings의 정의를 이동하고 실제로 이러한 설정에 queue을 사용하는 것입니다.