2017-12-20 14 views
0

나는 스케줄러 작업자 스레드를 정리하는 방법에 대한 질문이 내가 위의 코드가 실행되면 내가 볼 무엇Reactor Scheduler는 주 스레드가 완료된 후에도 계속 실행됩니다. 반응기 (3) 사용하는 동안

Flux.range(1, 10000) 
.publishOn(Schedulers.newElastic("Y")) 
.doOnComplete(() -> { 
    // WHAT should one do to ensure the worker threads are cleaned up 
    logger.info("Shut down all Scheduler worker threads"); 
}) 
.subscribe(x -> logger.debug(x+ "**")); 

이 메인 스레드가 작업자 스레드 실행이 완료되면 (들) 잠시 대기 상태입니다.

sun.misc.Unsafe.park(Native Method) 
java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039) 
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1081) 
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809) 
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067) 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127) 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
java.lang.Thread.run(Thread.java:748) 

이를 제어 할 수있는 방법은 즉 그들이 onComplete()을 배치 할 수있다? 있는가? 나는 Schedulers.shutdownNow()을 시도하고 도움이되지 않습니다. 내가 이렇게 반면에

나는 스케줄러 처리를 제어 할 수 있어요. 선호 /지지하는 방법은 무엇입니까?

reactor.core.scheduler.Scheduler s = Schedulers.newElastic("X"); 
     Flux.range(1, 10000) 
     .concatWith(Flux.empty()) 
     .publishOn(s) 
     .doOnComplete(() -> {   
      s.dispose(); 
      logger.info("Shut down all Scheduler worker threads"); 
     }) 
     .subscribe(x -> logger.debug(x+ "**")); 

답변

1

는, 당신이 그것을 닫으려는 경우 결과 Scheduler을 추적하는 것은 귀하의 책임입니다. 당신은 명시하지 않은 경우 Schedulers.shutdownNow()은 (더 new 접두사를 알 수 없음) Schedulers.elastic() 같은 라이브러리에서 사용하는 기본 스케줄러를 닫습니다.

청소를하기위한 모든 작업을 실행 한 후 가장 좋은 방법은 doFinally을 사용하는 것입니다. |이 비동기onError 후 정리 콜백 을 수행합니다 onComplete | cancel 이벤트. 항상 모든 경우에 마지막으로 실행하려고 시도하지만 체인의 마지막 연산자인지 확인하는 것이 좋습니다.

주의해야 할 점은 종료하려고하는 ... (가) 매우 당신 스레드 즉, 기존 사업자와 동일한 스레드에서 실행되는 doFinally 콜백에서 s.dispose()이다 후 종료 집행자 을 것 그 작업 대기열이 처리되었으므로이 경우 스레드가 사라지기 전에 약간의 지연이 발생합니다. 여기

는 thread 정보를 덤프 샘플입니다 사용자 정의 탄성 스레드로 전환하고 doFinally에 종료됩니다 (추가 필터와 이벤트가 밖으로 재생 방법을 더 잘 볼 수있는 짧은 로그를 제공하기 위해 구체화) :

@Test 
public void schedulerFinallyShutdown() throws InterruptedException { 
    ThreadMXBean threadMxBean = ManagementFactory.getThreadMXBean(); 
    Logger logger = Loggers.getLogger("foo"); 
    CountDownLatch latch = new CountDownLatch(1); 
    reactor.core.scheduler.Scheduler s = Schedulers.newElastic("X"); 
    Flux.range(1, 10000) 
     .publishOn(s) 
     .concatWith(Flux.<Integer>empty().doOnComplete(() -> { 
      for (ThreadInfo ti : threadMxBean.dumpAllThreads(true, true)) { 
       System.out.println("last element\t" + ti.getThreadName() + " " + ti.getThreadState()); 
      } 
     })) 
     .doFinally(sig -> { 
      s.dispose(); 
      logger.info("Shut down all Scheduler worker threads"); 
      latch.countDown(); 
     }) 
     .filter(x -> x % 1000 == 0) 
     .materialize() 
     .subscribe(x -> logger.info(x+ "**")); 

    latch.await(); 
    for (ThreadInfo ti : threadMxBean.dumpAllThreads(true, true)) { 
     System.out.println("after cleanup\t" + ti.getThreadName() + " " + ti.getThreadState()); 
    } 
} 

이 출력합니다 :

11:24:36.608 [X-2] INFO foo - onNext(1000)** 
11:24:36.611 [X-2] INFO foo - onNext(2000)** 
11:24:36.611 [X-2] INFO foo - onNext(3000)** 
11:24:36.612 [X-2] INFO foo - onNext(4000)** 
11:24:36.612 [X-2] INFO foo - onNext(5000)** 
11:24:36.612 [X-2] INFO foo - onNext(6000)** 
11:24:36.612 [X-2] INFO foo - onNext(7000)** 
11:24:36.613 [X-2] INFO foo - onNext(8000)** 
11:24:36.613 [X-2] INFO foo - onNext(9000)** 
11:24:36.613 [X-2] INFO foo - onNext(10000)** 
last element X-2 RUNNABLE 
last element elastic-evictor-1 TIMED_WAITING 
last element Monitor Ctrl-Break RUNNABLE 
last element Signal Dispatcher RUNNABLE 
last element Finalizer WAITING 
last element Reference Handler WAITING 
last element main WAITING 
11:24:36.626 [X-2] INFO foo - onComplete()** 
11:24:36.627 [X-2] INFO foo - Shut down all Scheduler worker threads 
after cleanup Monitor Ctrl-Break RUNNABLE 
after cleanup Signal Dispatcher RUNNABLE 
after cleanup Finalizer WAITING 
after cleanup Reference Handler WAITING 
after cleanup main RUNNABLE 
1

flux가 완료되었는지 확인하려면 blockLast() 메소드를 호출해야합니다. 플럭스를 병렬로 실행하거나 메인 스레드와 다른 스레드에서 실행하는 경우 특히 중요합니다.

NB : 당신은, 체인에 높은 publishOn 전화 publishon의 위치가 중요한 이유를 the reference guide를 참조 할 필요가있다. 당신이 Schedulers.new[Elastic|...]를 사용하는 경우

reactor.core.scheduler.Scheduler s = Schedulers.newElastic("X"); Flux.range(1, 10000) .concatWith(Flux.empty()) .publishOn(s) .doOnComplete(() -> {
logger.info("Shut down all Scheduler worker threads"); }).blocklast(); s.dispose();