2017-01-15 5 views
0

HTTP 인바운드 채널 어댑터를 통해 HTTP 포스트 요청을 수신하여 흐름이 시작되는 간단한 플로우를 빌드하고이를 'SubscribableChannel'에 게시하려고합니다. 이 채널에 가입 한 'N'명의 소비자가있을 수 있습니다. 아래 그림은 흐름을 보여줍니다. Spring 통합 "Publish Subscribe Channel"with Spring DSL

enter image description here

나는이 흐름을 구성하는 봄 DSL을 사용하려고하고 그것을 작동하게하는 데 문제가있다. 아래는 제 코드입니다.

@Bean 
public IntegrationFlow receiveHttpPost() { 
    return IntegrationFlows.from(Http.inboundChannelAdapter("/receive") 
      .mappedRequestHeaders("*") 
      .requestChannel(httpInAdapterPubSubChannel())) 
      .transform(new ObjectToStringTransformer()) 
      .get(); 
} 

@Bean 
public SubscribableChannel httpInAdapterPubSubChannel() 
{ 
    return MessageChannels.publishSubscribe("httpInAdapterPubSubChannel") 
    .get(); 
} 

@Bean 
public IntegrationFlow subscriber1() { 
    return IntegrationFlows.from(httpInAdapterPubSubChannel()) 
      .handle(message -> System.out.println("Enrich Headers based on Payload....")) 
      .get(); 
} 

@Bean 
public IntegrationFlow subscriber2() { 
    return IntegrationFlows.from(httpInAdapterPubSubChannel()) 
       .handle(message -> System.out.println("Save Payload to Audit Table...")) 
       .get(); 
} 

나는이 흐름을 실행하면, 내가 할 "메시지를 처리하지 못했습니다; 중첩 된 예외는 org.springframework.messaging.core.DestinationResolutionException : 총 출력 채널 또는 사용 가능한 replyChannel 헤더".

o.s.i.channel.PublishSubscribeChannel : preSend on channel 'httpInAdapterPubSubChannel', message: GenericMessage [payload=Test, headers={content-length=4, http_requestMethod=POST, accept-language=en-US,en;q=0.8, accept=*/*, host=localhost:8080, http_requestUrl=http://localhost:8080/receive, connection=keep-alive, content-type=text/plain;charset=UTF-8, id=2c6ee729-96ee-1ae5-be31-a9bc56092758, cache-control=no-cache, accept-encoding=gzip, deflate, br, user-agent=Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/55.0.2883.87 Safari/537.36, timestamp=1484457726393}] 
o.s.i.t.MessageTransformingHandler  : org.springframework.integration.transformer.MessageTransformingHandler#0 received message: GenericMessage [payload=Test, headers={content-length=4, http_requestMethod=POST, accept-language=en-US,en;q=0.8, accept=*/*, host=localhost:8080, http_requestUrl=http://localhost:8080/receive, connection=keep-alive, content-type=text/plain;charset=UTF-8, id=2c6ee729-96ee-1ae5-be31-a9bc56092758, cache-control=no-cache, accept-encoding=gzip, deflate, br, user-agent=Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/55.0.2883.87 Safari/537.36, timestamp=1484457726393}] 
o.a.c.c.C.[.[.[/].[dispatcherServlet] : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is org.springframework.messaging.MessagingException: Failed to handle Message; nested exception is org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available] with root cause 

org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available 
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:287) ~[spring-integration-core-4.3.6.RELEASE.jar:4.3.6.RELEASE] 
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:212) ~[spring-integration-core-4.3.6.RELEASE.jar:4.3.6.RELEASE] 
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:129) ~[spring-integration-core-4.3.6.RELEASE.jar:4.3.6.RELEASE] 
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115) ~[spring-integration-core-4.3.6.RELEASE.jar:4.3.6.RELEASE] 
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) ~[spring-integration-core-4.3.6.RELEASE.jar:4.3.6.RELEASE] 
    at org.springframework.integration.dispatcher.BroadcastingDispatcher.invokeHandler(BroadcastingDispatcher.java:236) ~[spring-integration-core-4.3.6.RELEASE.jar:4.3.6.RELEASE] 
    at org.springframework.integration.dispatcher.BroadcastingDispatcher.dispatch(BroadcastingDispatcher.java:185) ~[spring-integration-core-4.3.6.RELEASE.jar:4.3.6.RELEASE] 
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:89) ~[spring-integration-core-4.3.6.RELEASE.jar:4.3.6.RELEASE] 
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:423) ~[spring-integration-core-4.3.6.RELEASE.jar:4.3.6.RELEASE] 
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) ~[spring-messaging-4.3.5.RELEASE.jar:4.3.5.RELEASE] 
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) ~[spring-messaging-4.3.5.RELEASE.jar:4.3.5.RELEASE] 

나는 여기에서 매우 잘못된 것을하고있다. 나는 "게시 구독 채널"을 보여주는 예제를 찾으려고 노력했습니다. Spring 통합 DSL 또는 Java 구성입니다. 불행히도, 나는 아무 것도 찾을 수 없었다 : - /. 어떤 사람이 나에게 모범을 보이고 제가 가진 흐름에 무엇이 잘못되었는지를 찾도록 도와 주면 진심으로 감사 할 것입니다.

나는 "구독자 'subscriber1'과 'subscriber2'를 제거했을 때, 나는 여전히 같은 오류가 발생했다. 즉, y HttpInboundAdapter를 구성 할 때 제가하고있는 일이 잘못되었습니다.

또한 'httpInAdapterPubSubChannel'을 직접 전환하고 단일 경로 흐름 (분기를 사용하지 않음) 만 있으면 문제가 없습니다.

답변

1

.transform(new ObjectToStringTransformer())은 결과를 어딘지로 보내려고하지만 인바운드 어댑터는 응답을 기대하지 않으며 트랜스포머는 아무데도 데이터를 보내지 않습니다.

은 아마 당신은이 같은 ...
@Bean 
public IntegrationFlow receiveHttpPost() { 
    return IntegrationFlows.from(Http.inboundChannelAdapter("/receive") 
     .mappedRequestHeaders("*")) 
     .transform(new ObjectToStringTransformer()) 
     .channel(httpInAdapterPubSubChannel()) 
     .get(); 
} 

술집/서브 채널에 변압기의 결과를 전송을 의미했다.

DSL 참조에는 몇 가지 예가 있습니다. 예 : herehere.

+0

응답 및 게리 링크에 감사드립니다. 당신은 그것을 올바르게 추측했습니다. 나는 입력을 변환하고 변환 된 데이터를 PUB-Sub 채널로 보내고 실수를 범했습니다. 당신이 권장대로 그것을 변경하고 올바르게 작동합니다. –