2017-09-21 3 views
0

저는 RxJava를 처음 사용합니다. 그래서 저는 여전히 그 문제를 해결하려고합니다. 나는 버튼 클릭의 흐름을 나타내는 Observable을 가지고있다. 그래서 그것은 뜨겁다. 버튼을 클릭 할 때마다 I/O를하고 싶습니다. 실패하면 반복 할 때까지 I/O를 다시 시도하려고합니다. 이것은 retry() 또는 repeat()을 사용할 수있는 좋은 기회 인 것 같습니다.하지만 이는 열악한 관측소에서만 작동하며 추운 곳에서는 작동하지 않습니다. 여기 이벤트 스트림을 실행하는 작업을 반복하는 적절한 방법

내가 할 노력하고있어에서 얻을 수있는 몇 가지 의사 코드입니다 : 나는 이벤트를 복제 flatMap를 사용하여 생각했습니다

buttonRequests 
    .map(actionEvent -> doIO()) 
    .repeatAboveIfFailedUntilIOSucceeds() 
    .subscribe(...); 

을하고 성공하면 나머지 사람을 무시 skip를 사용하는 것보다,하지만 깨끗하게 내게 불확실한 시도를하지는 않을 것입니다.

이것에 대해 올바른 생각은 무엇입니까?

+0

더 많은 예제를 볼 수 있습니다 성공할 때까지, 각 버튼 클릭에 대한 실패한 작업을 반복 하시겠습니까? 다시 시도하고 새로운 flighes 경우 어떻게해야합니까? –

+0

예, 성공할 때까지 각 버튼 클릭에 대한 작업을 다시 시도합니다. 그 동안 버튼이 다시 클릭되지 않도록 버튼을 비활성화 할 것입니다. – Vultan

답변

1

시험을보십시오. 모든 이벤트에서 새로운 IO 요청이 시작됩니다. 스위치 맵은 평면 맵과 같지만 새로운 업스트림 이벤트가 들어 오면 최근 구독에서 탈퇴됩니다. 평면 맵은 병행 성으로 작업하는 경우 새 맵을 시작합니다. 그래서 당신의 뜨겁게 관찰 할 수있는 이벤트가 발생했다고 가정하고 flatMap이 IO 작업을 다른 스레드 (subscribeOn)에서 실행하기 시작합니다. 마지막 이벤트가 계속 실행되는 동안 다른 이벤트가 들어 오면 다른 IO 작업을 실행하기 시작합니다. Switch-Map은 마지막 이벤트의 구독을 취소하고 현재 이벤트를 시작합니다. retry() 연산자를 살펴 봅시다. 다시 시도는 onComplete로 관찰 가능 항목이 완료 될 때까지 'ioWorkWrapped'가 제공 한 관찰 가능 항목에 다시 등록됩니다. 모든 시도에서 실패 할 것이라고 상상하기 때문에 이것은 매우 위험합니다. 그것은 영원히 돌 것입니다. 'exponential-backoff'를 사용하고 X가 시도한 후에 백업 관찰 가능 실패를 제공하는 것이 좋습니다. 'retryWhen'의 사용을 위해 : Reactive Programming with RxJava

당신은 경우에 대비하여 I/O 작업을 운영자 retryWhen를 사용할 필요가
public class LibraryTest { 
    private AtomicInteger idx; 

    @Before 
    public void setUp() throws Exception { 
     idx = new AtomicInteger(0); 
    } 

    @Test 
    public void name() throws Exception { 
     Observable<String> stringObservable = Observable.just(1) 
       .switchMap(integer -> ioWorkWrapped() 
         .doOnError(throwable -> System.out.println("Something went wrong.")) 
         .retry() 
       ); 

     stringObservable.test() 
       .await() 
       .assertResult("value"); 


    } 

    private Observable<String> ioWorkWrapped() { 
     return Observable.defer(() -> { 
      try { 
       Thread.sleep(500); // IO Work 
       if (idx.getAndIncrement() < 5) { // for testing... 
        return Observable.error(new IllegalStateException("Wurst")); 
       } 
       return Observable.just("value"); 
      } catch (Exception ex) { 
       return Observable.error(ex); 
      } 
     }); 
    } 
} 
+0

고마워요! 이것은 내가 찾고 있었던 바로 그 것이었다. 나는이 아이디어를 내 코드에 통합 할 수 있었다. 나는 또한 그것을 통해 많은 일을 배웠다. – Vultan

0

당신이 운영자에 체크의 Runnable 예외를 던질 수있는 실패이 우수한 책에서 참조하시기 바랍니다 . 그리고 예외 유형의 경우 다시 시도하십시오.

이 예제에서 우리는 4 번 재 시도 할 것입니다. 그러나 그 조건은 우리가받는 쓰레기의 종류에 따라 바뀔 수 있습니다.

int count=0; 

@Test 
public void retryWhenConnectionError() { 
    Subscription subscription = Observable.just(null) 
      .map(connection -> { 
       System.out.println("Trying to open connection"); 
       connection.toString(); 
       return connection; 
      }) 
      .retryWhen(errors -> errors.doOnNext(o -> count++) 
          .flatMap(t -> count > 3 ? Observable.error(t) : 
            Observable.just(null).delay(100, TimeUnit.MILLISECONDS)), 
        Schedulers.newThread()) 
      .subscribe(s -> System.out.println(s)); 
    new TestSubscriber((Observer) subscription).awaitTerminalEvent(500, TimeUnit.MILLISECONDS); 
} 

현재 https://github.com/politrons/reactive/blob/master/src/test/java/rx/observables/errors/ObservableExceptions.java

+0

고마워. 나는 내 요구에 더 잘 맞는 위의 또 다른 응답을 찾았지만,이 시간과 함께 몇 시간을 보냈다. 그리고 그것은 내 이해를 향상시키는 데 도움이되었다. – Vultan