0

배치에서 병렬 처리를 수행하기 위해 포크 - 조인 패턴을 사용하고 있습니다.병렬 처리를위한 포크 조인 패턴 뮬 esb throwing NullPointer 예외

나는 다음과 같은 질문의 포획 참조가 : Mule File Inbound Flow : Control Number of threads

내 입력 폴더에 너무 많은 파일이 없지만, 내가 병렬 처리를 달성하는 데 필요한입니다. 그러므로이 패턴을 사용할 생각입니다. 다음은 내 설정 흐름입니다.

  <quartz:connector name="Quartz1" validateConnections="true" doc:name="Quartz"> 
        <receiver-threading-profile maxThreadsActive="1"/> 
      </quartz:connector> 
      <flow name="Mainflow" processingStrategy="synchronous"> 
      <quartz:inbound-endpoint jobName="EventGeneration" repeatInterval="1000" connector-ref="Quartz1" responseTimeout="10000" doc:name="Quartz"> 
        <quartz:event-generator-job/> 
       </quartz:inbound-endpoint> 

       <mulerequester:request-collection config-ref="Mule_Requester" resource="file:///FileLocation?connector=FileMRTransformer" count="3" doc:name="Mule Requester"/> 
       <expression-filter expression="#[payload.size() != 0]" doc:name="Expression"/> 
     <request-reply doc:name="Request-Reply" timeout="300000"> 
        <processor-chain doc:name="Processor Chain"> 
         <collection-splitter doc:name="Collection Splitter"/> 
         <vm:outbound-endpoint exchange-pattern="one-way" doc:name="VM" connector-ref="VM" path="Batchinput" /> 
        </processor-chain> 
        <vm:inbound-endpoint exchange-pattern="one-way" doc:name="VM" connector-ref="VM" path="Batchoutput"> 
         <message-properties-transformer> 
          <add-message-property key="MULE_CORRELATION_GROUP_SIZE" value="3" /> 
         </message-properties-transformer> 
         <collection-aggregator /> 
        </vm:inbound-endpoint> 
    </request-reply> 
</flow> 

<batch:job name="BatchDemo" max-failed-records="-1"> 
     <batch:input> 
      <vm:inbound-endpoint exchange-pattern="one-way" path="Batchinput" connector-ref="VM" doc:name="VM"/> 
.... 
required processing..... 
. 
. 
<batch:on-complete> 
<vm:outbound-endpoint exchange-pattern="one-way" doc:name="VM" connector-ref="VM" path="Batchoutput"/> 
</batch:on-complete> 

즉시 제어가 다음과 같은 예외가 발생하는 요청 - 응답 범위 입력 같이

<vm:outbound-endpoint exchange-pattern="one-way" doc:name="VM" connector-ref="VM" path="Batchinput"> 
       <collection-splitter /> 
      </vm:outbound-endpoint> 

:

ERROR 2016-06-23 10:32:56,190 [scheduler-multithreadint019.1.2_productindexing_hybris_fh_Worker-1] org.mule.exception.CatchMessagingExceptionStrategy: 
******************************************************************************** 
Message    : null (java.lang.NullPointerException). Message payload is of type: CopyOnWriteArrayList 
Type     : org.mule.api.MessagingException 
Code     : MULE_ERROR--2 
Payload    : [[[email protected], [[email protected], [[email protected]] 
JavaDoc    : http://www.mulesoft.org/docs/site/current3/apidocs/org/mule/api/MessagingException.html 
******************************************************************************** 
Exception stack is: 
1. null (java.lang.NullPointerException) 
    java.util.concurrent.ConcurrentHashMap:-1 (null) 
2. null (java.lang.NullPointerException). Message payload is of type: CopyOnWriteArrayList (org.mule.api.MessagingException) 
    org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor:32 (http://www.mulesoft.org/docs/site/current3/apidocs/org/mule/api/MessagingException.html) 
******************************************************************************** 
Root Exception stack trace: 
java.lang.NullPointerException 
    at java.util.concurrent.ConcurrentHashMap.hash(Unknown Source) 
    at java.util.concurrent.ConcurrentHashMap.put(Unknown Source) 
    at org.mule.routing.requestreply.AbstractAsyncRequestReplyRequester.process(AbstractAsyncRequestReplyRequester.java:85) 
    at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:24) 
    at org.mule.execution.MessageProcessorNotificationExecutionInterceptor.execute(MessageProcessorNotificationExecutionInterceptor.java:107) 

다른 구성 요청 - 응답 프로세서의 요청 부분에 대한 시도를 그러나 같은 예외가 발생했습니다.

MuleRequester이 예외가 발생합니다. 일부 Java 스 니펫을 사용하여 File 객체의 컬렉션을 반환하는 경우이 예외가 발생하지 않고 컨트롤이 일괄 처리 흐름에 예상대로 입력됩니다. 그러나, 나는 배치의 내 입력 단계에서 변압기 (DataWeave)가 수행 내 변압기 이 파일 객체를 구문 분석 할 수 없습니다 (예를 들어 java.io.File에 또는 java.io.FileInputStream의) . 따라서 MuleRequester을 사용하여 스트리밍을 사용하도록 설정할 수 있습니다.

이 Mulerequester를 사용할 때 무엇이 ​​잘못되었는지 확실하지 않습니까 ??

답변

0

누구든지 관심이있는 경우 대안을 찾을 수 있습니다.

poll 구성 요소와 Java를 사용하여 File 객체가 아닌 fileNames 컬렉션을 반환합니다. 동일한 requestreply 패턴을 사용했습니다. 그러나, 이번에는 입력 단계의 파일에서 페이로드를 검색 중입니다. MuleRequester을 사용합니다. 이 파일 이름을 MuleRequester 입력 경로로 지정하십시오.

이렇게하면 잘 돌아갑니다.