2

나는 반응적인 스트림을 배우고 publc-subcribe 유틸리티를 사용하고 Publisher (내 Flux) 및 구독자의 기본 동작을 사용하고 있습니다.가입자가 다른 경우에 다른 수의 요소를 요청하는 이유는 무엇입니까?

두 가지 시나리오가 있습니다. 둘 다 Flux에서 같은 수의 요소를 사용합니다. 하지만 로그를 분석 할 때 onSubscribe 메서드는 다른 수의 요소를 요청하고 있습니다. 한 가지 경우에는 무한 요소를 요청하고 다른 요소는 32 개 요소를 요청하는 경우입니다. 로그

 System.out.println("*********Calling MapData************"); 
     List<Integer> elements = new ArrayList<>(); 
     Flux.just(1, 2, 3, 4) 
      .log() 
      .map(i -> i * 2) 
      .subscribe(elements::add); 
     //printElements(elements); 
     System.out.println("-------------------------------------"); 

     System.out.println("Inside Combine Streams"); 
     List<Integer> elems = new ArrayList<>(); 
     Flux.just(10,20,30,40) 
      .log() 
      .map(x -> x * 2) 
      .zipWith(Flux.range(0, Integer.MAX_VALUE), 
       (two, one) -> String.format("First : %d, Second : %d \n", one, two)) 
      .subscribe(new Consumer<String>() { 
       @Override 
       public void accept(String s) { 

       } 
      }); 
     System.out.println("-------------------------------------"); 

여기에 있습니다 : :

이유에서 다음 내가 모든 사용자 정의 가입자 구현을 사용하지 않은 것처럼
*********Calling MapData************ 
[warn] LoggerFactory has not been explicitly initialized. Default system-logger will be used. Please invoke StaticLoggerBinder#setLog(org.apache.maven.plugin.logging.Log) with Mojo's Log instance at the early start of your Mojo 
[info] | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription) 
[info] | request(unbounded) 
[info] | onNext(1) 
[info] | onNext(2) 
[info] | onNext(3) 
[info] | onNext(4) 
[info] | onComplete() 
------------------------------------- 
Inside Combine Streams 
[info] | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription) 
[info] | request(32) 
[info] | onNext(10) 
[info] | onNext(20) 
[info] | onNext(30) 
[info] | onNext(40) 
[info] | onComplete() 
[info] | cancel() 
------------------------------------- 

, "MapData"경우 여기

는 두 경우와 로그입니다 , "[정보] | 요청 (무한)"을 로깅하고 "내부 결합 스트림" " "[정보] | 요청 (32) "을 로깅합니다.

좋습니다.

답변

2

먼저 예상되는 동작임을 알아야합니다. 당신이 사용하고있는 사업자에 따라

, 원자로는 다른 프리 페치 전략을 적용됩니다

  • 일부 사업자 32 또는 256
  • 같은 기본값을 사용하는 몇 가지 조치가 추가 한 경우 사용자가 제공 한 값을 사용합니다 특정 값을 갖는 완충 연산자
  • 원자로 값의 스트림이 한정된 것을 추측 할 수 및 바운드 값
,174에게 요청할

int prefetch 메서드 인수가있는 연산자 변형을 사용하거나 BaseSubscriber을 사용하여 자신의 Subscriber을 구현하면이 동작을 언제든지 변경할 수 있습니다.이 방법에는 여러 가지 유용한 방법이 있습니다.

결론은 종종 특정 값에주의를 기울일 필요가 없다는 것입니다. 특정 데이터 소스에 대해 선행 전략을 최적화하려는 경우에만 유용 할 수 있습니다.