RxJava의 성능을 순차적으로 계산하는 것과 비교하여 계산을 차단하고 있습니다.병렬로 RxJava 계산
나는 this post과 this SO question을보고있었습니다. 경험상으로, System.currentTimeMillis() 및 Thread.sleep()을 사용한 벤치마킹은 I/O 대신 계산을 처리 할 때 일관된 결과를 산출하지 않으므로 간단한 JMH 벤치 마크를 대신 설정해 보았습니다.
내 벤치 마크는 두 가지의 int를 계산하고 그들을 추가 : 나는 결과에 의해 의아해하고
public class MyBenchmark {
private Worker workerSequential;
private Worker workerParallel;
private int semiIntenseCalculation(int i) {
Double d = Math.tan(Math.atan(Math.tan(Math.atan(Math.tan(Math.tan(Math.atan(Math.tan(Math.atan(Math.tan(Math.atan(Math.tan(Math.tan(Math.atan(Math.tan(Math.atan(Math.tan(i)))))))))))))))));
return d.intValue() + i;
}
private int nonIntenseCalculation(int i) {
Double d = Math.tan(Math.atan(Math.tan(Math.atan(Math.tan(Math.tan(Math.atan(i)))))));
return d.intValue() + i;
}
private Observable<Object> intensiveObservable() {
return Observable.fromCallable(new Callable<Object>() {
@Override
public Object call() throws Exception {
int randomNumforSemi = ThreadLocalRandom.current().nextInt(0, 101);
Integer i = semiIntenseCalculation(randomNumforSemi);
int randomNumforNon = ThreadLocalRandom.current().nextInt(0, 101);
Integer j = nonIntenseCalculation(randomNumforNon);
return i+j;
}
});
};
private Observable<Object> semiIntensiveObservable() {
return Observable.fromCallable(new Callable<Object>() {
@Override
public Object call() throws Exception {
int randomNumforSemi = ThreadLocalRandom.current().nextInt(0, 101);
return semiIntenseCalculation(randomNumforSemi);
}
});
};
private Observable<Object> nonIntensiveObservable() {
return Observable.fromCallable(new Callable<Object>() {
@Override
public Object call() throws Exception {
int randomNumforNon = ThreadLocalRandom.current().nextInt(0, 101);
return nonIntenseCalculation(randomNumforNon);
}
});
};
public interface Worker {
void work();
}
@Setup
public void setup(final Blackhole bh) {
workerSequential = new Worker() {
@Override
public void work() {
Observable.just(intensiveObservable())
.subscribe(new Subscriber<Object>() {
@Override
public void onError(Throwable error) {
}
@Override
public void onCompleted() {
}
@Override
public void onNext(Object arg) {
bh.consume(arg);
}
});
}
};
workerParallel = new Worker() {
@Override
public void work() {
Observable.zip(semiIntensiveObservable().subscribeOn(Schedulers.computation()),
nonIntensiveObservable().subscribeOn(Schedulers.computation()),
new Func2<Object, Object, Object>() {
@Override
public Object call(Object semiIntensive, Object nonIntensive) {
return (Integer)semiIntensive + (Integer)nonIntensive;
}
}).subscribe(bh::consume);
}
};
}
@Benchmark
public void calculateSequential() {
workerSequential.work();
}
@Benchmark
public void calculateParallel() {
workerParallel.work();
}
}
:
# Run complete. Total time: 00:00:21
Benchmark Mode Cnt Score Error Units
MyBenchmark.calculateParallel avgt 5 15602,176 ± 1663,650 ns/op
MyBenchmark.calculateSequential avgt 5 288,128 ± 6,982 ns/op
는 분명 내가 더 빨리 될 병렬 계산을 기다리고 있었다. RxJava는 병렬 I/O에만 적합합니까? 아니면 왜 이러한 결과를 얻고 있습니까?
굉장! 그것은 나에게 관심있는 멀티 스레딩의 일반적인 Java 구현과의 오버 헤드 및 비교와 ParallelPerf.java의 시작점입니다! – ChopperOnDick