2017-09-18 4 views
0

RxJava의 성능을 순차적으로 계산하는 것과 비교하여 계산을 차단하고 있습니다.병렬로 RxJava 계산

나는 this postthis SO question을보고있었습니다. 경험상으로, System.currentTimeMillis() 및 Thread.sleep()을 사용한 벤치마킹은 I/O 대신 계산을 처리 할 때 일관된 결과를 산출하지 않으므로 간단한 JMH 벤치 마크를 대신 설정해 보았습니다.

내 벤치 마크는 두 가지의 int를 계산하고 그들을 추가 : 나는 결과에 의해 의아해하고

public class MyBenchmark { 

    private Worker workerSequential; 
    private Worker workerParallel; 

    private int semiIntenseCalculation(int i) { 
     Double d = Math.tan(Math.atan(Math.tan(Math.atan(Math.tan(Math.tan(Math.atan(Math.tan(Math.atan(Math.tan(Math.atan(Math.tan(Math.tan(Math.atan(Math.tan(Math.atan(Math.tan(i))))))))))))))))); 
     return d.intValue() + i; 
    } 

    private int nonIntenseCalculation(int i) { 
     Double d = Math.tan(Math.atan(Math.tan(Math.atan(Math.tan(Math.tan(Math.atan(i))))))); 
     return d.intValue() + i; 
    } 


    private Observable<Object> intensiveObservable() { 
     return Observable.fromCallable(new Callable<Object>() { 
      @Override 
      public Object call() throws Exception { 

       int randomNumforSemi = ThreadLocalRandom.current().nextInt(0, 101); 
       Integer i = semiIntenseCalculation(randomNumforSemi); 
       int randomNumforNon = ThreadLocalRandom.current().nextInt(0, 101); 
       Integer j = nonIntenseCalculation(randomNumforNon); 

       return i+j; 
      } 
     }); 
    }; 

    private Observable<Object> semiIntensiveObservable() { 
     return Observable.fromCallable(new Callable<Object>() { 
      @Override 
      public Object call() throws Exception { 
       int randomNumforSemi = ThreadLocalRandom.current().nextInt(0, 101); 
       return semiIntenseCalculation(randomNumforSemi); 
      } 
     }); 
    }; 

    private Observable<Object> nonIntensiveObservable() { 
     return Observable.fromCallable(new Callable<Object>() { 
      @Override 
      public Object call() throws Exception { 
       int randomNumforNon = ThreadLocalRandom.current().nextInt(0, 101); 
       return nonIntenseCalculation(randomNumforNon); 
      } 
     }); 
    }; 

    public interface Worker { 
     void work(); 
    } 

    @Setup 
    public void setup(final Blackhole bh) { 

     workerSequential = new Worker() { 

      @Override 
      public void work() { 


       Observable.just(intensiveObservable()) 
        .subscribe(new Subscriber<Object>() { 

        @Override 
        public void onError(Throwable error) { 

        } 

        @Override 
        public void onCompleted() { 

        } 

        @Override 
        public void onNext(Object arg) { 
         bh.consume(arg); 

        } 
       }); 
      } 
     }; 

     workerParallel = new Worker() { 
      @Override 
      public void work() { 
       Observable.zip(semiIntensiveObservable().subscribeOn(Schedulers.computation()), 
           nonIntensiveObservable().subscribeOn(Schedulers.computation()), 
           new Func2<Object, Object, Object>() { 

        @Override 
        public Object call(Object semiIntensive, Object nonIntensive) { 
         return (Integer)semiIntensive + (Integer)nonIntensive; 
        } 

       }).subscribe(bh::consume); 
      } 
     }; 

    } 

    @Benchmark 
    public void calculateSequential() { 
     workerSequential.work(); 
    } 

    @Benchmark 
    public void calculateParallel() { 
     workerParallel.work(); 
    } 
} 

:

# Run complete. Total time: 00:00:21 

Benchmark      Mode Cnt  Score  Error Units 
MyBenchmark.calculateParallel avgt 5 15602,176 ± 1663,650 ns/op 
MyBenchmark.calculateSequential avgt 5 288,128 ± 6,982 ns/op 

는 분명 내가 더 빨리 될 병렬 계산을 기다리고 있었다. RxJava는 병렬 I/O에만 적합합니까? 아니면 왜 이러한 결과를 얻고 있습니까?

답변

1

벤치 마크를 잘못하고 있습니다. 병렬 작업이 끝나기를 기다려야합니다. (blockingSubscribe을 통해) 꽤 많은 GC 오버 헤드를 추가하고 집행자의 내부 대기열을 부 풀리게됩니다.

Here은 다양한 병렬 작업을 측정하기위한 참조 벤치 마크입니다. 파견 작업에는 자체적 인 오버 헤드가 있으며 병렬 설정에서 작업 항목 당 500 개 이상의 사이클이 없으면 이러한 포크 - 조인트 유형 병렬 작업 부하가 향상되지 않을 수도 있습니다.

+0

굉장! 그것은 나에게 관심있는 멀티 스레딩의 일반적인 Java 구현과의 오버 헤드 및 비교와 ParallelPerf.java의 시작점입니다! – ChopperOnDick