2017-02-08 3 views
0

효과적으로 데이터베이스를 대기열로 사용하여 흐름을 구성하려고 시도했습니다. 그 이유는 다른 프로세스가이 메시지를 읽고 회신 할 것으로 예상되어 이러한 방식으로 설계 되었기 때문입니다. 불행히도, 나는이 다른 프로세스를 제어 할 수 없으며 다른 대기열 시스템에 응답 할 수 없습니다.Mule Request-Reply 데이터베이스 요청으로 응답을 읽지 못하는 범위

HTTP는 데이터베이스에 레코드를 삽입하라는 요청 -> 별도의 응용 프로그램 (뮬 외부)은이 데이터베이스 테이블을 폴링하고 다른 메시지를 다른 테이블에 응답합니다. 5 초 이상 응답) ->이 새 행을 읽고 원래의 http 요청에 응답하십시오.

이 디자인에서 요청 - 응답 범위는 항상 응답을 기다리는 시간 초과입니다. "3e1a7750-ee13-11e6-ae40-0c9920524153"또는이 작업이 중단 된 메시지 응답 ID를 기다리고 (내가 수동으로 상당히 빨리이 보여 20 초로 설정)

응답 (20000ms)를 초과되었습니다. 끝점 : null을 통해 이벤트를 라우팅하지 못했습니다. 메시지 페이로드 유형은 다음과 같습니다. 정수

분명히 뭔가를 놓쳤으므로 노새에서 올바른 설명서를 찾을 수 없습니다. 나는이 사이트의 좋은 사용자 중 한 명이 내 방식의 오류를 바로 잡기를 바랍니다. 메시지 응답 ID 기다리고 응답 타임 아웃 (20000ms) 아래

는 흐름

<flow name="mainFlow"> 
    <http:listener config-ref="HTTP_Listener_Configuration" path="hello" doc:name="HTTP"/> 
    <cxf:jaxws-service doc:name="CXF" configuration-ref="CXF_Configuration" serviceClass="kansas.MuleTestServiceImpl"/> 
    <request-reply doc:name="Request-Reply" timeout="20000"> 
     <db:insert config-ref="Oracle_Configuration" doc:name="Database"> 
      <db:parameterized-query><![CDATA[insert into tblRequest (id, correlationId, replyTo) values (#[message.id], #[message.correlationId], #[message.replyTo])]]></db:parameterized-query> 
     </db:insert> 
     <jms:inbound-endpoint queue="test.response" connector-ref="syncJms" doc:name="JMS"> 
      <jms:transaction action="JOIN_IF_POSSIBLE"/> 
     </jms:inbound-endpoint> 
    </request-reply> 
    <logger message="payload is #[payload]" level="INFO" doc:name="Logger"/> 
</flow> 
<flow name="databasePoller"> 
    <poll doc:name="Poll"> 
     <fixed-frequency-scheduler frequency="5000"/> 
     <db:select config-ref="Oracle_Configuration" doc:name="Database"> 
      <db:parameterized-query><![CDATA[select id,correlationId,msgresponse,replyto from tblResponse]]></db:parameterized-query> 
     </db:select> 
    </poll> 
    <foreach collection="#[payload]" doc:name="For Each"> 
     <set-variable variableName="storedPayload" value="#[payload]" doc:name="storePayload"/> 
     <db:delete config-ref="Oracle_Configuration" doc:name="Database"> 
      <db:parameterized-query><![CDATA[delete from tblResponse where correlationId = #[storedPayload.correlationId]]]></db:parameterized-query> 
     </db:delete> 
     <set-payload value="#[flowVars.storedPayload]" doc:name="restorePayload"/> 
     <message-properties-transformer overwrite="true" doc:name="Message Properties"> 
      <add-message-property key="MULE_CORRELATION_ID" value="#[payload.ID]"/> 
      <add-message-property key="MULE_REPLYTO" value="#[payload.REPLYTO]"/> 
     </message-properties-transformer> 
     <set-payload value="#[payload.MSGRESPONSE]" doc:name="Set Payload"/> 
     <jms:outbound-endpoint queue="test.response" connector-ref="syncJms" doc:name="JMS"/> 
     <logger level="INFO" doc:name="Logger"/> 
    </foreach> 
</flow> 

enter image description here

아래 EXCEPTION


메시지보기 샘플이고 "b9a93d10-efa4-11e6-808b-0c9920524153"또는이 작업이 중단되었습니다. 끝점 : null을 통해 이벤트를 라우팅하지 못했습니다. 메시지 페이로드 유형 인 : 정수 유형 : org.mule.api.routing.ResponseTimeoutException 코드 : MULE_ERROR - 2 의 JavaDoc : http://www.mulesoft.org/docs/site/current3/apidocs/org/mule/api/routing/ResponseTimeoutException.html 페이로드 : 1

루트 예외 스택 추적 : org.mule.api. routing.ResponseTimeoutException : 메시지 응답 ID "b9a93d10-efa4-11e6-808b-0c9920524153"을 기다리는 응답 시간이 초과되었습니다 (20000ms). 또는이 작업이 중단되었습니다. 끝점 : null을 통해 이벤트를 라우팅하지 못했습니다. 메시지 페이로드 유형이다 : org.mule.routing.requestreply.AbstractAsyncRequestReplyRequester.process (AbstractAsyncRequestReplyRequester.java:89)에서 org.mule.routing.requestreply.AbstractAsyncRequestReplyRequester.receiveAsyncReply (AbstractAsyncRequestReplyRequester.java:283)에서 정수

+0

세 가지가 도움이 될 수 있습니다. 먼저 달성하고자하는 것을 좀 더 자세하게 설명 할 수 있습니다. 두 번째는 전체 로그 항목을 추가하고 세 번째는 리팩터링을 시도 할 수 있습니다. DB 부분을 VM 큐를 통해 호출하는 하위 흐름으로 이동하십시오. VM 큐를 아웃 바운드 (Exchange 패턴 요청 - 응답)로 요청 응답 범위에 추가하고 VM 및 DB 커넥터로 새 플로우를 호출하십시오. 그렇게하면 Mule은 현재 구성에서 응답 (jMS 시나리오의 임시 대기열)을 찾지 못할 수도 있으므로 실제로 응답 할 수 있습니다. – brazo

+0

그 정보가 더 계몽 수도 있습니다 : https://www.ricston.com/blog/usecase-explaining-behaviour-requestreply-block/ – brazo

답변

0

나는 이것을 풀 었다고 생각합니다. 데이터베이스 폴러가 아닌 HTTP 요청 엔드 포인트로이 작업을 완벽하게 처리했습니다. 메시지를 검토 할 때 내 데이터베이스 폴러를 통한 차이점이 하나 있는데, 노새는 2 개의 추가 속성을 배치합니다. MULE_CORRELATION_SEQUENCE 및 MULE_CORRELATION_GROUP_SIZE.

jms에 메시지를 보내기 바로 전에 이러한 속성을 제거하면 요청 - 응답 범위에서 jms 대기열의 응답을 올바르게 식별 할 수있었습니다.

<remove-property propertyName="MULE_CORRELATION_SEQUENCE" doc:name="Property"/> <remove-property propertyName="MULE_CORRELATION_GROUP_SIZE" doc:name="Property"/>

0

나는 그것을 작동시키기 위해서 약간의 흐름을 단순화했다. 별도의 흐름에서 DB를 폴링하는 대신 VM 대기열로 트리거하는 경우 적합한 솔루션 일 수 있습니다.

<flow name="mainFlow"> 
    <http:listener config-ref="HTTP_Listener_Configuration" 
     path="hello" doc:name="HTTP" /> 
    <dw:transform-message doc:name="Transform Message"> 
     <dw:set-payload> 
      <![CDATA[%dw 1.0 %output application/java 
      --- 
      { 
      name: "abc", 
      euro: 130, 
      usd: 123 
      }]]></dw:set-payload> 
    </dw:transform-message> 
    <request-reply doc:name="Request-Reply" timeout="20000"> 
     <vm:outbound-endpoint exchange-pattern="one-way" path="db" doc:name="VM"> 
      <message-properties-transformer scope="outbound"> <delete-message-property key="MULE_REPLYTO"/> </message-properties-transformer> 
      </vm:outbound-endpoint> 
     <vm:inbound-endpoint exchange-pattern="one-way" path="test.response" doc:name="VM"/> 
    </request-reply> 
    <logger message="#[message.payloadAs(java.lang.String)]" level="INFO" doc:name="Logger"/> 
</flow> 
<flow name="soFlow"> 
    <vm:inbound-endpoint exchange-pattern="one-way" path="db" doc:name="VM"/> 
    <db:insert config-ref="MySQL_Configuration" doc:name="Database"> 
     <db:parameterized-query><![CDATA[insert into item (
      item_name, 
      price_euro, 
      price_usd) 
       values (#[payload.name], #[payload.euro], #[payload.usd])]]>    </db:parameterized-query> 
    </db:insert> 
    <vm:outbound-endpoint exchange-pattern="one-way" path="call-db-query" doc:name="VM"/> 
</flow> 
<flow name="databasePoller"> 
    <vm:inbound-endpoint exchange-pattern="one-way" path="call-db-query" doc:name="VM"/> 
    <db:select config-ref="MySQL_Configuration" doc:name="Database"> 
     <db:parameterized-query><![CDATA[select item_name, 
     price_euro, 
     price_usd from item]]></db:parameterized-query> 
    </db:select> 
    <vm:outbound-endpoint exchange-pattern="one-way" path="test.response" doc:name="VM"/> 
</flow> 
+0

불행히도 나는이 시스템을 재 작업을 요청하는 소프트웨어를 요청할 수 없습니다. . 아마도 레거시 소프트웨어 요구 때문에 동기식 http 요청에 대한 요구 사항이있었습니다. 나는 그 능력을 가지고 있었으면 좋겠다. – Beta033