2017-05-15 6 views
1

오류시 구독자를 변경하는 방법? 데이터베이스에서 차가운 스트림을 사용합니다. 다음 사례를 참조하십시오.오류시 관찰 가능한 변경 가입자

return coctailBundleStream 
     .doOnNext(c -> { 
      hostnames.add(c.get(KEY_HOSTNAME)); // [A] 

      sendToOutboundQueue(c.get(KEY_CREDS)); 
      archiveSentMessage(c.get(KEY_CREDS), c.get(KEY_MESSAGE_ID)); 
     }) 
     .doOnComplete(this::saveCutOffTime) 
     .doOnError(e -> informUserImpactedHostnames(hostnames, 
      theRestOfHostnamesInside(credsXmlStream, e))) // I don't think this is right 
     .onErrorResumeNext(Flowable.empty()) 
     .count(); 

오류의 영향을받은 모든 호스트 이름을 보내려고합니다. 그러나 위의 내 의견을 참조하십시오. 스트림이 두 번 소비되므로 이것이 정확하다고 생각하지 않습니다. 예를 들어 theRestOfHostnamesInside의 구현 credsStream.map(c -> c.getHostname()), e) 내가 생각

가 이상적으로, 에러 핸들러는 다음 이전 목록과 목록을 추가 목록에 호스트 이름의 나머지 부분을 추출하는 다른 구독을 사용하여 스트림을 계속해야한다 (라인 표시된 참조하는 경우 [A]와).

답변

0

onErrorResumeNext은 폴백 할 유동성을 제공하는 데 사용해야합니다.

그러나 주된 어려움은 중복을 피하는 것입니다. 앞에서 말한 것처럼 소스가 차가운 경우 DB 요청을 다시 수행하고 원래 구독 된 시퀀스가 ​​오류가 발생하기 전에 일부 데이터를 내보내는 경우 동일한 데이터가 다시 방출됩니다.

onErrorResumeNext 뒤에 distinct을 연결하여이를 완화 할 수 있습니다 (중복 검색 방법을 나타내려면 keySelector을 입력해야합니다). 하지만 소스에서 두 요소를 중복으로 표시하지 않는 기준을 사용해야합니다 (재 시도로 생성 된 중복을 제거하기위한 것만). 그래서하지 ...

는 그 주변

다른 방법은 이미 키를 저장할 모음에서 자신을 처리하고 onErrorResumeNext에 다음을 필터링하는 것입니다,하지만 당신은 확인이 컬렉션에 count()의 다운 스트림 만든 각 subscribe에 고유 말했다 확인해야 그거 쉽지.

+0

나는 동일한 DB 요청을 다시하는 생각을 좋아하지 않습니다. 그것은 나에게 효율적이지 않은 것 같습니다. – sancho21

+0

원래 오류가 무엇인지에 따라 다르지만 대신 누락 된 키만 쿼리하는 두 번째 시퀀스에서 폴백 할 수 있습니다. 'onErrorResumeNext'는 오류가 발생한 시퀀스의 데이터와 오류가 발생한 시퀀스의 데이터를 폴백의 데이터와 연결하기 때문에 순서,이 법안에 맞는 –

0

당신은 단지 flatMap flatMap의 내부 그래서

 Observable.fromIterable(yourList) 
      .flatMap(x ->{ 
       Observable.just(x) 
         .map(data -> yourNormalSave(data)) 
         .onErrorReturn(errorResult) 
      }) 
      .subscribe(result ->{ 
         if(result != errorResult) 
          count++; 
         eles{ 
         //error received here 
         } 
      } 
      ); 

의 내부에 그것을 할 수, 오류 사항을 충족하는 경우 정상 형으로 변경하지만, 다운 스트림의 아무 생각이없는 것입니다. 따라서 다운 스트림 가입자는 onNext에서도 소비합니다. x -> Observable.just(x) 대신 적절하게 처리 할 수 ​​있습니다. 예제로 들어 있습니다.