배치에서 병렬 처리를 수행하기 위해 포크 - 조인 패턴을 사용하고 있습니다.병렬 처리를위한 포크 조인 패턴 뮬 esb throwing NullPointer 예외
나는 다음과 같은 질문의 포획 참조가 : Mule File Inbound Flow : Control Number of threads
내 입력 폴더에 너무 많은 파일이 없지만, 내가 병렬 처리를 달성하는 데 필요한입니다. 그러므로이 패턴을 사용할 생각입니다. 다음은 내 설정 흐름입니다.
<quartz:connector name="Quartz1" validateConnections="true" doc:name="Quartz">
<receiver-threading-profile maxThreadsActive="1"/>
</quartz:connector>
<flow name="Mainflow" processingStrategy="synchronous">
<quartz:inbound-endpoint jobName="EventGeneration" repeatInterval="1000" connector-ref="Quartz1" responseTimeout="10000" doc:name="Quartz">
<quartz:event-generator-job/>
</quartz:inbound-endpoint>
<mulerequester:request-collection config-ref="Mule_Requester" resource="file:///FileLocation?connector=FileMRTransformer" count="3" doc:name="Mule Requester"/>
<expression-filter expression="#[payload.size() != 0]" doc:name="Expression"/>
<request-reply doc:name="Request-Reply" timeout="300000">
<processor-chain doc:name="Processor Chain">
<collection-splitter doc:name="Collection Splitter"/>
<vm:outbound-endpoint exchange-pattern="one-way" doc:name="VM" connector-ref="VM" path="Batchinput" />
</processor-chain>
<vm:inbound-endpoint exchange-pattern="one-way" doc:name="VM" connector-ref="VM" path="Batchoutput">
<message-properties-transformer>
<add-message-property key="MULE_CORRELATION_GROUP_SIZE" value="3" />
</message-properties-transformer>
<collection-aggregator />
</vm:inbound-endpoint>
</request-reply>
</flow>
<batch:job name="BatchDemo" max-failed-records="-1">
<batch:input>
<vm:inbound-endpoint exchange-pattern="one-way" path="Batchinput" connector-ref="VM" doc:name="VM"/>
....
required processing.....
.
.
<batch:on-complete>
<vm:outbound-endpoint exchange-pattern="one-way" doc:name="VM" connector-ref="VM" path="Batchoutput"/>
</batch:on-complete>
즉시 제어가 다음과 같은 예외가 발생하는 요청 - 응답 범위 입력 같이
<vm:outbound-endpoint exchange-pattern="one-way" doc:name="VM" connector-ref="VM" path="Batchinput">
<collection-splitter />
</vm:outbound-endpoint>
:
ERROR 2016-06-23 10:32:56,190 [scheduler-multithreadint019.1.2_productindexing_hybris_fh_Worker-1] org.mule.exception.CatchMessagingExceptionStrategy:
********************************************************************************
Message : null (java.lang.NullPointerException). Message payload is of type: CopyOnWriteArrayList
Type : org.mule.api.MessagingException
Code : MULE_ERROR--2
Payload : [[[email protected], [[email protected], [[email protected]]
JavaDoc : http://www.mulesoft.org/docs/site/current3/apidocs/org/mule/api/MessagingException.html
********************************************************************************
Exception stack is:
1. null (java.lang.NullPointerException)
java.util.concurrent.ConcurrentHashMap:-1 (null)
2. null (java.lang.NullPointerException). Message payload is of type: CopyOnWriteArrayList (org.mule.api.MessagingException)
org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor:32 (http://www.mulesoft.org/docs/site/current3/apidocs/org/mule/api/MessagingException.html)
********************************************************************************
Root Exception stack trace:
java.lang.NullPointerException
at java.util.concurrent.ConcurrentHashMap.hash(Unknown Source)
at java.util.concurrent.ConcurrentHashMap.put(Unknown Source)
at org.mule.routing.requestreply.AbstractAsyncRequestReplyRequester.process(AbstractAsyncRequestReplyRequester.java:85)
at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:24)
at org.mule.execution.MessageProcessorNotificationExecutionInterceptor.execute(MessageProcessorNotificationExecutionInterceptor.java:107)
다른 구성 요청 - 응답 프로세서의 요청 부분에 대한 시도를 그러나 같은 예외가 발생했습니다.
MuleRequester이 예외가 발생합니다. 일부 Java 스 니펫을 사용하여 File 객체의 컬렉션을 반환하는 경우이 예외가 발생하지 않고 컨트롤이 일괄 처리 흐름에 예상대로 입력됩니다. 그러나, 나는 배치의 내 입력 단계에서 변압기 (DataWeave)가 수행 내 변압기 이 파일 객체를 구문 분석 할 수 없습니다 (예를 들어 java.io.File에 또는 java.io.FileInputStream의) . 따라서 MuleRequester을 사용하여 스트리밍을 사용하도록 설정할 수 있습니다.
이 Mulerequester를 사용할 때 무엇이 잘못되었는지 확실하지 않습니까 ??