2017-09-28 9 views
0

사용자 정의 PublisherpublishOn 기능을 함께 사용하는 리 액티브 스트림 (https://github.com/reactor/reactor-core)을 사용할 때 항상 NPE를 얻습니다. 내 코드에 어떤 문제가 있습니까? Publisher을 잘못된 방법으로 사용합니까?ReactiveStreams 사용자 지정 게시자와 함께 publishOn을 사용할 때 NPE

Exception in thread "main" java.lang.NullPointerException 
    at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.onNext(FluxPublishOn.java:212) 
    at org.guenhter.kotlin.hello.MyPublisher.subscribe(HelloWorld.kt:18) 
    at reactor.core.publisher.FluxSource.subscribe(FluxSource.java:52) 
    at reactor.core.publisher.FluxPublishOn.subscribe(FluxPublishOn.java:96) 
    at reactor.core.publisher.Flux.subscribe(Flux.java:6447) 
    at reactor.core.publisher.Flux.subscribeWith(Flux.java:6614) 
    at reactor.core.publisher.Flux.subscribe(Flux.java:6440) 
    at reactor.core.publisher.Flux.subscribe(Flux.java:6404) 
    at reactor.core.publisher.Flux.subscribe(Flux.java:6347) 
    at org.guenhter.kotlin.hello.HelloWorldKt.main(HelloWorld.kt:11) 
+0

그냥 downvote하지 말고 도움이되는 의견을 남겨주세요. 왜이 질문이 좋지 않다고 생각하십니까? – guenhter

답변

2

Publisher은 표준 "반응성 스트림"에 의해 정의 및 요구들을 가지고

Flux.from(MyPublisher()) 
      .publishOn(Schedulers.single()) 
      .subscribe { println("<-- $it received") } 

class MyPublisher : Publisher<Int> { 
    override fun subscribe(sub: Subscriber<in Int>) { 
     while (true) { 
      Thread.sleep(300) 
      sub.onNext(1) 
     } 
    } 
} 

예외이다. 이러한 요구 사항 중 하나는 Subscriber.onSubscribe 프로토콜을 따르기 위해 다른 방법보다 먼저 호출되어야한다는 것입니다.

이 작업을 수행하지 않았기 때문에 원자로 클래스 내부에 NPE가 생성되어 제대로 초기화되지 않은 것일 수 있습니다.

그러나이 문제를 해결하더라도 표준은 입니다. 즉, 구독자가 요청할 때만 데이터를 방출합니다. 그럼에도 불구하고 아마 나중에 라인에서 예외가 발생할 것입니다. Flux.create을 사용하여 고유 한 Publisher 구현을 작성하는 대신 요청을 적절히 처리 할 수있는 이미 터를 작성하십시오.