SQL 쿼리의 출력을 병렬로 처리하려고합니다. 아래 주어진 내 코드입니다. 나는 Aggregator에서 sysout을 가지고있다. 하지만 Aggregator에서 sysout이 인쇄되지 않는 것을 무작위로 확인합니다. 또한 집계 자의 해제 메소드는 sysouts를 인쇄하지 않습니다. 나는 메시지를 잃어 가고있는 것 같아. 어느 누구도 약간의 빛을 비출 수 있습니까?메시지가 Spring 통합 스플리터 후에 손실되었습니다. 데이터가 Aggregator에 임의로 도달하지 않음
<int:bridge input-channel="inputChannel" output-channel="dbRequestChannel" />
<jdbc:outbound-gateway request-channel="dbRequestChannel"
max-rows-per-poll="0" data-source="dataSource" reply-channel="headerEnricher"
query="select empname, empno, empdob from employee where empno = 1234" />
<int:header-enricher input-channel="headerEnricher"
output-channel="splitterChannel">
<int:header name="payloadSize" value="3"></int:header>
</int:header-enricher>
<int:chain input-channel="splitterChannel" output-channel="splitterOutputChannel">
<int:splitter />
</int:chain>
<int:channel id="splitterOutputChannel">
<int:dispatcher task-executor="sampleTaskExecutor" />
</int:channel>
<bean id="sampleTaskExecutor"
class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="corePoolSize" value="5" />
<property name="maxPoolSize" value="10" />
<property name="queueCapacity" value="25" />
</bean>
<int:service-activator input-channel="splitterOutputChannel"
ref="springIntegrationtest" method="testMethod" output-channel="aggregatePayload">
</int:service-activator>
<int:aggregator input-channel="aggregatePayload"
release-strategy-method="release" output-channel="nullChannel"
send-partial-result-on-expiry="true" ref="springIntegrationtest"
method="aggregateData" />
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = { "classpath:spring-integration.xml" })
public class SpringIntegrationTest {
@Autowired
private MessageChannel inputChannel;
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
@Test
public void testQueue() {
Message<String> quoteMessage = MessageBuilder
.withPayload("testPayload").build();
inputChannel.send(quoteMessage);
}
public Map<String, String> testMethod(Message<?> m) {
System.out.println(sdf.format(new Date()));
return (Map<String, String>) m.getPayload();
}
public boolean release(ArrayList<Map<String, Object>> payload) {
boolean release = false;
int size = payload.size();
if (size == 3) {
release = true;
}
System.out.println(release);
return release;
}
public Message<String> aggregateData(ArrayList<Map<String, Object>> payload) {
System.out.println("In aggregateData " + payload);
Message<String> quoteMessage = MessageBuilder
.withPayload("testPayload").build();
return quoteMessage;
}
}
쿼리 결과는 항상 3 개의 결과 만 반환합니다. 그래서 테스트를 위해이 조건을 추가했습니다. –
확인. org.springframework.integration에 대한 로그를 디버그하십시오. –