2016-07-18 15 views
2

SQL 쿼리의 출력을 병렬로 처리하려고합니다. 아래 주어진 내 코드입니다. 나는 Aggregator에서 sysout을 가지고있다. 하지만 Aggregator에서 sysout이 인쇄되지 않는 것을 무작위로 확인합니다. 또한 집계 자의 해제 메소드는 sysouts를 인쇄하지 않습니다. 나는 메시지를 잃어 가고있는 것 같아. 어느 누구도 약간의 빛을 비출 수 있습니까?메시지가 Spring 통합 스플리터 후에 손실되었습니다. 데이터가 Aggregator에 임의로 도달하지 않음

<int:bridge input-channel="inputChannel" output-channel="dbRequestChannel" /> 

     <jdbc:outbound-gateway request-channel="dbRequestChannel" 
      max-rows-per-poll="0" data-source="dataSource" reply-channel="headerEnricher" 
      query="select empname, empno, empdob from employee where empno = 1234" /> 

     <int:header-enricher input-channel="headerEnricher" 
      output-channel="splitterChannel"> 
      <int:header name="payloadSize" value="3"></int:header> 
     </int:header-enricher> 

     <int:chain input-channel="splitterChannel" output-channel="splitterOutputChannel"> 
      <int:splitter /> 
     </int:chain> 


     <int:channel id="splitterOutputChannel"> 
      <int:dispatcher task-executor="sampleTaskExecutor" /> 
     </int:channel> 

     <bean id="sampleTaskExecutor" 
      class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"> 
      <property name="corePoolSize" value="5" /> 
      <property name="maxPoolSize" value="10" /> 
      <property name="queueCapacity" value="25" /> 
     </bean> 

     <int:service-activator input-channel="splitterOutputChannel" 
      ref="springIntegrationtest" method="testMethod" output-channel="aggregatePayload"> 
     </int:service-activator> 

     <int:aggregator input-channel="aggregatePayload" 
      release-strategy-method="release" output-channel="nullChannel" 
      send-partial-result-on-expiry="true" ref="springIntegrationtest" 
      method="aggregateData" /> 

    @RunWith(SpringJUnit4ClassRunner.class) 
@ContextConfiguration(locations = { "classpath:spring-integration.xml" }) 
public class SpringIntegrationTest { 


    @Autowired 
    private MessageChannel inputChannel; 

    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); 



    @Test 
    public void testQueue() { 
     Message<String> quoteMessage = MessageBuilder 
       .withPayload("testPayload").build(); 
     inputChannel.send(quoteMessage); 
    } 


    public Map<String, String> testMethod(Message<?> m) { 

     System.out.println(sdf.format(new Date())); 
     return (Map<String, String>) m.getPayload(); 
    } 

    public boolean release(ArrayList<Map<String, Object>> payload) { 
     boolean release = false; 
     int size = payload.size(); 
     if (size == 3) { 
      release = true; 
     } 
     System.out.println(release); 
     return release; 
    } 

    public Message<String> aggregateData(ArrayList<Map<String, Object>> payload) { 

     System.out.println("In aggregateData " + payload); 
     Message<String> quoteMessage = MessageBuilder 
       .withPayload("testPayload").build(); 
     return quoteMessage; 
    } 

} 

답변

1

글쎄, 당신의 문제는 주와 옵션의 조합에 있다고 생각합니다. 그래서, 당신의 집계가 3 항목은 도착 직후 그룹을 출시 할 예정이다

int size = payload.size(); 
if (size == 3) { 
    release = true; 
} 

, 그 사이에 당신은 분할 후 더 많은 항목을 가질 수있다 :

는이 있습니다.

release 신호 처리기가 그룹을 종료하고 종료합니다. 기본적으로 expireGroupsUponCompletion = false과 같은 옵션이 있습니다. 즉 그룹을 보유하지만 저장소에는 completed 상태입니다.

애그리 게이터 튜플을 3으로 방금 내보내려면 expireGroupsUponCompletiontrue으로 전환해야합니다. 자세한 내용은 집계 자 manual을 참조하십시오.

+0

쿼리 결과는 항상 3 개의 결과 만 반환합니다. 그래서 테스트를 위해이 조건을 추가했습니다. –

+0

확인. org.springframework.integration에 대한 로그를 디버그하십시오. –