파티션으로
- 스플릿 소스 스트림 아래 코드
- 파티션 당 ParallelFlux 번 "레일"을 생성하고, 별도의 쓰레드로
- 스케줄 "레일"
- 결과를 수집
각 파티션마다 전용 스레드가 있으므로 해당 값은 원래 순서대로 처리됩니다.
@Test
public void partitioning() throws InterruptedException {
final int N = 10;
Flux<Integer> source = Flux.range(1, 10000).share();
// partition source into publishers
Publisher<Integer>[] publishers = new Publisher[N];
for (int i = 0; i < N; i++) {
final int idx = i;
publishers[idx] = source.filter(v -> v % N == idx);
}
// create ParallelFlux each 'rail' containing single partition
ParallelFlux.from(publishers)
// schedule partitions into different threads
.runOn(Schedulers.newParallel("proc", N))
// process each partition in its own thread, i.e. in order
.map(it -> {
String threadName = Thread.currentThread().getName();
Assert.assertEquals("proc-" + (it % 10 + 1), threadName);
return it;
})
// collect results on single 'rail'
.sequential()
// and on single thread called 'subscriber-1'
.publishOn(Schedulers.newSingle("subscriber"))
.subscribe(it -> {
String threadName = Thread.currentThread().getName();
Assert.assertEquals("subscriber-1", threadName);
});
Thread.sleep(1000);
}
코드 및 사용 된 개념에 대한 설명을 추가하면 더욱 도움이 될 것입니다. 예를 들어, 왜 출판사에서'share'라고 부르며, 왜'sequential'과'publishOn'을 부르는 겁니까? –