2016-09-26 4 views
0

다음과 같은 문제가 있습니다. partitionId 속성 (예 : 0-10)이있는 이벤트 목록이 있으면 수신 이벤트를 paritionId에 따라 분할해야합니다. 동일한 partitionId를 가진 이벤트는 수신 된 순서로 처리됩니다. 다소 균등하게 분배되면 병렬로 처리되는 10 개의 이벤트 (각 파티션에 대한 이벤트)가 발생합니다.기준에 따라 이벤트를 분할하고 순서대로 처리하십시오.

단일 스레드 디스패처를 10 개 만들고 해당 디스패처로 이벤트를 보내는 것 외에 Project Reactor를 사용하여 위의 작업을 수행 할 수 있습니까?

감사합니다.

답변

0

파티션으로

  • 스플릿 소스 스트림 아래 코드
  • 파티션 당 ParallelFlux 번 "레일"을 생성하고, 별도의 쓰레드로
  • 스케줄 "레일"
  • 결과를 수집

각 파티션마다 전용 스레드가 있으므로 해당 값은 원래 순서대로 처리됩니다.

@Test 
public void partitioning() throws InterruptedException { 
    final int N = 10; 
    Flux<Integer> source = Flux.range(1, 10000).share(); 
    // partition source into publishers 
    Publisher<Integer>[] publishers = new Publisher[N]; 
    for (int i = 0; i < N; i++) { 
     final int idx = i; 
     publishers[idx] = source.filter(v -> v % N == idx); 
    } 
    // create ParallelFlux each 'rail' containing single partition 
    ParallelFlux.from(publishers) 
      // schedule partitions into different threads 
      .runOn(Schedulers.newParallel("proc", N)) 
      // process each partition in its own thread, i.e. in order 
      .map(it -> { 
       String threadName = Thread.currentThread().getName(); 
       Assert.assertEquals("proc-" + (it % 10 + 1), threadName); 
       return it; 
      }) 
      // collect results on single 'rail' 
      .sequential() 
      // and on single thread called 'subscriber-1' 
      .publishOn(Schedulers.newSingle("subscriber")) 
      .subscribe(it -> { 
       String threadName = Thread.currentThread().getName(); 
       Assert.assertEquals("subscriber-1", threadName); 
      }); 
    Thread.sleep(1000); 
} 
+0

코드 및 사용 된 개념에 대한 설명을 추가하면 더욱 도움이 될 것입니다. 예를 들어, 왜 출판사에서'share'라고 부르며, 왜'sequential'과'publishOn'을 부르는 겁니까? –