2017-12-12 13 views
0

Rxjava2에 내 AsyncTask 코드를 변환하려고했지만 분명히 Rxjava2은 null 값을 처리하지 않으므로 내 응용 프로그램이 충돌합니다.비동기 작업/rx Java 코드를 rxjava2로 변환하려면 어떻게해야합니까?

new AsyncTask<Void, Void, Void>() { 
      @Override 
      protected Void doInBackground(Void... params) { 
       Set<Map.Entry<String, Participant>> entries = pool.entrySet(); 
       for (Map.Entry<String, Participant> entry : entries) { 
        Participant participant = entry.getValue(); 
        participant.release(); 
       } 
       return null; 
      } 

      @Override 
      protected void onPostExecute(Void aVoid) { 
       cb.event(new Spin.Event<Void>()); 
      } 
     }.execute(); 

을 그리고 여기 Rxjava (NOT Rxjava2)로 변환 된 코드입니다 : 여기 AsyncTask 내 코드의

Observable.defer(new Func0<Observable<Void>>() { 
     @Override 
     public Observable<Void> call() { 
      Set<Map.Entry<String, Participant>> entries = pool.entrySet(); 
      for (Map.Entry<String, Participant> entry : entries) { 
       Participant participant = entry.getValue(); 
       participant.release(); 
      } 
      return Observable.just(null); 
     } 
    }).doOnCompleted(new Action0() { 
     @Override 
     public void call() { 
      cb.event(new Spin.Event<Void>()); 
     } 
    }) 
    .subscribeOn(Schedulers.computation()) 
    .subscribe(); 

가 null의 반환에 충돌하지 않고 Rxjava로 변환하는 가장 좋은 방법 일 것입니다 무엇. 또한 .execute()는 Rxjava2과 관련하여 어떻게 재생됩니까? 그것이 심지어 Rxjava에서 작동하는지 확실하지 않은가요?

FATAL EXCEPTION: RxComputationThreadPool-3 

                      io.reactivex.exceptions.OnErrorNotImplementedException: null ObservableSource supplied 
                       at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:704) 
                       at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:701) 
                       at io.reactivex.internal.observers.LambdaObserver.onError(LambdaObserver.java:74) 
                       at io.reactivex.internal.operators.observable.ObservableSubscribeOn$SubscribeOnObserver.onError(ObservableSubscribeOn.java:63) 
                       at io.reactivex.internal.disposables.EmptyDisposable.error(EmptyDisposable.java:63) 
                       at io.reactivex.internal.operators.observable.ObservableDefer.subscribeActual(ObservableDefer.java:35) 
                       at io.reactivex.Observable.subscribe(Observable.java:10842) 
                       at io.reactivex.internal.operators.observable.ObservableSubscribeOn$SubscribeTask.run(ObservableSubscribeOn.java:96) 
                       at io.reactivex.internal.schedulers.ScheduledDirectTask.call(ScheduledDirectTask.java:38) 
                       at io.reactivex.internal.schedulers.ScheduledDirectTask.call(ScheduledDirectTask.java:26) 
                       at java.util.concurrent.FutureTask.run(FutureTask.java:237) 
                       at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:154) 
                       at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:269) 
                       at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1113) 
                       at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:588) 
                       at java.lang.Thread.run(Thread.java:818) 
                      Caused by: java.lang.NullPointerException: null ObservableSource supplied 
                       at io.reactivex.internal.functions.ObjectHelper.requireNonNull(ObjectHelper.java:39) 
                       at io.reactivex.internal.operators.observable.ObservableDefer.subscribeActual(ObservableDefer.java:32) 
                       at io.reactivex.Observable.subscribe(Observable.java:10842)  
                       at io.reactivex.internal.operators.observable.ObservableSubscribeOn$SubscribeTask.run(ObservableSubscribeOn.java:96)  
                       at io.reactivex.internal.schedulers.ScheduledDirectTask.call(ScheduledDirectTask.java:38)  
                       at io.reactivex.internal.schedulers.ScheduledDirectTask.call(ScheduledDirectTask.java:26)  
                       at java.util.concurrent.FutureTask.run(FutureTask.java:237)  
                       at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:154)  
                       at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:269)  
                       at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1113)  
                       at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:588)  
                       at java.lang.Thread.run(Thread.java:818)  
+0

널 여기서 뭐 분명하지 않다. Stacktrace를 추가하십시오. –

+0

스택 추적을 게시하십시오. 또한 .observeOn (AndroidSchedulers.mainThread())을 추가해야합니다. – AndroidRuntimeException

+0

곧 게시하겠습니다. 또한 Observable.just (null)을 반환합니다. 이것은 rxjava 2에서 허용됩니까? –

답변

1
Observable.defer(new Callable<ObservableSource<?>>() { 
      @Override 
      public ObservableSource<?> call() throws Exception { 
       Set<Map.Entry<String, Participant>> entries = pool.entrySet(); 
       for (Map.Entry<String, Participant> entry : entries) { 
        Participant participant = entry.getValue(); 
        participant.release(); 
       } 
       return Completable.complete().toObservable(); 
      } 
     }).doOnComplete(new Action() { 
      @Override 
      public void run() throws Exception { 
       Log.d("Complete", "Complete"); 
      } 
     }) 
      .subscribeOn(Schedulers.computation()) 
      .observeOn(AndroidSchedulers.mainThread()).subscribe(); 

이 코드도 작동합니다

다음은 충돌 로그입니다. subscribe() 메서드를 호출하면 작업이 시작됩니다. 다시 메인 스레드에 게시 할 값을 가지고 있지 않기 때문에

+0

내 코드를 수정하고 작동 시키려면 완료 가능 코드를 사용하지 않고 다른 방법이 있습니까? 나는 subscribeon (androidschedulers.mainthread())을 추가하려고했지만 그 원인은 null 크래시입니다. 이미 옳은 것을 향상시킬 수있는 방법이 있어야합니까? –

+0

위 코드의 Completeable을 Observable로 바꿀 수 있습니다. Geros

+0

주 기능 콜백 cb 안에 관측 가능이라고 부르는 부분이므로 실제로는 무언가에있는 함수 밖에서 사용할 수 없습니다. 현재 내가하고있는 것처럼 한 곳에서 모든 것을 사용하지만 내부에서 오류 처리의 일부 형식을 사용합니까? 그것은 예외 처리되지 않은 오류를 throw하고 항목이 null 자주입니다. 또한 어떻게 변환합니까 .execute() 내 asyntask rx로? –

1

, 당신은 Completable를 사용할 수 있습니다

Completable.fromAction(() -> { 
    Set<Map.Entry<String, Participant>> entries = pool.entrySet(); 
    for (Map.Entry<String, Participant> entry : entries) { 
     Participant participant = entry.getValue(); 
     participant.release(); 
    } 
}) 
.subscribeOn(Schedulers.computation()) 
.observeOn(AndroidSchedulers.mainThread()) 
.subscribe(
    () -> { 
     cb.event(new Spin.Event<Void>()); 
    }, 
    error -> { /* show error toast */ } 
); 
0
Observable.defer(new Callable<ObservableSource<?>>() { 

//This method is replacing doInBackground 
     @Override 
     public ObservableSource<?> call() throws Exception { 
      Set<Map.Entry<String, Participant>> entries = pool.entrySet(); 
      for (Map.Entry<String, Participant> entry : entries) { 
       Participant participant = entry.getValue(); 
       participant.release(); 
      } 
      return Completable.complete().toObservable(); 
     } 
    }).doOnComplete(new Action() { 
    //This is onPostExecute 
     @Override 
     public void run() throws Exception { 
      Log.d("Complete", "Complete"); 
     } 
    }) 
     .subscribeOn(Schedulers.computation()) 
     .observeOn(AndroidSchedulers.mainThread()).subscribe()