1

다른 시스템에서 메시지 브로커 (현재 JMS)를 통해 메시지를 수신하는 시스템을 구축 중입니다. 모든 보낸 사람 시스템의 모든 메시지에는 deviceId가 있으며 메시지를 수신하는 순서는 없습니다. 예를 들어 시스템 A는 deviceId = 1 인 메시지를 보낼 수 있고 시스템 b는 deviceId = 2 인 메시지를 보낼 수 있습니다.통합 패턴 : 여러 시스템에서받은 메시지 처리 방법을 동기화하는 방법

동일한 deviceId를 가진 모든 발신자로부터 모든 메시지를받지 않은 한 내 목표는 동일한 deviceId와 관련된 메시지 처리를 시작하지 않는 것입니다. 예를 들어

, 나는 3 개 시스템이있는 경우, 내 시스템에 메시지 전송 B와 C :

System A sends messageA1 with deviceId=1 
System B sends messageB1 with deviceId=1 
System C sends messageC1 with deviceId=3 
System C sends messageC2 with deviceId=1 <--- here I should start processing of messageA1, messageB1 and messageC2 because they are having the same deviceID 1. 

이 문제는 메시지 브로커 또는하여 내 시스템의 일부 동기화 메커니즘을 사용하여 해결되어야한다 스프링 통합/아파치 낙타와 같은 통합 프레임 워크?

+0

서비스가 시스템 중 하나에서 동일한 deviceId를 사용하여 몇 개의 메시지를받는 경우 중요합니까? 별도의 메시지를 1 씩 처리 하시겠습니까? 예 (deviceId = 1) :'messageA1, messageA1, messageA1, messageC1, messageC1, messageB1' -> 프로세스 시작 – mgyongyosi

+0

아니요 상관 없어요 ...중요한 것은 처리해야하는 데이터가 포함되어있어 3 개의 모든 메시지가 도착할 때 처리를 트리거하는 것입니다 –

답변

2

Aggregator (@Artem Bilan은 무엇을 언급 했는가?)와 유사한 솔루션은 속성을 사용하여 사용자 정의 AggregationStrategy 및 Aggregator 완료 제어로 Camel에서 구현할 수도 있습니다.

다음은 좋은 출발점 일 수 있습니다. (You can find the sample project with tests here)

경로 :

from("direct:start") 
    .log(LoggingLevel.INFO, "Received ${headers.system}${headers.deviceId}") 
    .aggregate(header("deviceId"), new SignalAggregationStrategy(3)) 
    .log(LoggingLevel.INFO, "Signaled body: ${body}") 
    .to("direct:result"); 

SignalAggregationStrategy.java는

public class SignalAggregationStrategy extends GroupedExchangeAggregationStrategy implements Predicate { 

    private int numberOfSystems; 

    public SignalAggregationStrategy(int numberOfSystems) { 
     this.numberOfSystems = numberOfSystems; 
    } 

    @Override 
    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { 
     Exchange exchange = super.aggregate(oldExchange, newExchange); 

     List<Exchange> aggregatedExchanges = exchange.getProperty("CamelGroupedExchange", List.class); 

     // Complete aggregation if we have "numberOfSystems" (currently 3) different messages (where "system" headers are different) 
     // https://github.com/apache/camel/blob/master/camel-core/src/main/docs/eips/aggregate-eip.adoc#completing-current-group-decided-from-the-aggregationstrategy 
     if (numberOfSystems == aggregatedExchanges.stream().map(e -> e.getIn().getHeader("system", String.class)).distinct().count()) { 
      exchange.setProperty(Exchange.AGGREGATION_COMPLETE_CURRENT_GROUP, true); 
     } 

     return exchange; 
    } 

    @Override 
    public boolean matches(Exchange exchange) { 
     // make it infinite (4th bullet point @ https://github.com/apache/camel/blob/master/camel-core/src/main/docs/eips/aggregate-eip.adoc#about-completion) 
     return false; 
    } 
} 

는 희망이 도움이!

1

캐싱 구성 요소를 사용하여 Apache Camel에서이를 수행 할 수 있습니다. EHCache 구성 요소가 있다고 생각합니다. 기본적으로

는 :

  1. 당신은 deviceId1 말 주어진의 DeviceID와 메시지가 나타납니다.
  2. 캐시에서 검색하여 deviceId1에 대해 수신 된 메시지를 확인합니다.
  3. 세 가지를 모두받지 못한 경우 현재 시스템/메시지를 캐시에 추가하십시오.
  4. 모든 메시지가 있으면 캐시를 처리하고 지 웁니다.

그런 다음 수신 메시지를 임시 저장을 위해 특정 deviceId 기반 대기열로 경로를 지정할 수 있습니다. 이것은 JMS, ActiveMQ 또는 이와 유사한 것일 수 있습니다.

1

스프링 통합은 이러한 유형의 작업에 대한 구성 요소를 제공합니다. 전체 그룹을 수집 할 때까지 방출하지 마십시오. 이름은 Aggregator입니다. deviceId은 확실히 correlationKey입니다. releaseStrategy은 실제로 시스템의 수를 기반으로 할 수 있습니다 - 얼마나 많은 메시지가 다음 단계로 진행되기 전에 기다리는 중입니까 deviceId1.