2017-10-30 5 views
0

중단을 재개 할 수있는 네트워크 클라이언트가 있지만 재 시도가있을 때 마지막 메시지가 필요합니다. 코 틀린에RxJava2에서 재시도 연산자를 사용하여 상태를 기억하는 방법

예 :

fun requestOrResume(last: Message? = null): Flowable<Message> = 
    Flowable.create({ emitter -> 
     val connection = if (last != null) 
          client.start() 
         else 
          client.resumeFrom(last.id) 

     while (!emitter.isDisposed) { 
      val msg = connection.nextMessage() 
      emitter.onNext(msg) 
     } 
    }, BackpressureStrategy.MISSING) 

requestOrResume() 
    .retryWhen { it.flatMap { Flowable.timer(5, SECONDS) } } 
    // how to pass the resume data when there is a retry? 

질문 : 당신이 볼 수 있듯이, 나는 이력서 호출을 준비하기 위해 마지막으로 수신 된 메시지가 필요합니다. 재시도가있을 때 재개 요청을 할 수 있도록 어떻게 추적 할 수 있습니까?

가능한 해결책 중 하나는 마지막 메시지에 대한 참조를 보유하고 새 메시지를 받으면 업데이트되는 홀더 클래스를 만드는 것입니다. 재 시도가있을 때이 방법으로 마지막 메시지를 소유자로부터 얻을 수 있습니다. 예 :

class MsgHolder(var last: Message? = null) 

fun request(): Flowable<Message> { 
    val holder = MsgHolder() 
    return Flowable.create({ emitter -> 
     val connection = if (holder.last != null) 
          client.start() 
         else 
          client.resumeFrom(holder.last.id) 

     while (!emitter.isDisposed) { 
      val msg = connection.nextMessage() 
      holder.last = msg // <-- update holder reference 
      emitter.onNext(msg) 
     } 
    }, BackpressureStrategy.MISSING) 
} 

나는이 방법이 효과가있을 것이라고 생각하지만, 해킹 (스레드 동기화 문제?)이라고 느낍니다.

다시 시도 할 수 있도록 상태를 추적하는 더 좋은 방법이 있습니까? link 당신이처럼 사용할 수 있습니다 :

답변

1

주 객체 마지막 요소를 감싸는 래퍼 (기존의 "해킹"과는 기능적으로 다르지 않지만 솔루션이 좋지는 않지만 못 생겼음), 오류 처리 연산자는 외부 요소없이 마지막 요소를 복구 할 수 있습니다. Throwable. 대신 다음 재귀 접근 방식은 사용자의 요구에 맞는 있는지 확인 :

fun retryWithLast(seed: Flowable<Message>): Flowable<Message> { 
    val last$ = seed.last().cache(); 
    return seed.onErrorResumeNext { 
    it.flatMap { 
     retryWithLast(last$.flatMap { 
     requestOrResume(it) 
     }) 
    } 
    }; 
} 
retryWithLast(requestOrResume()); 

가장 큰 차이는 오히려 값을 수동으로하는 것보다 cache와 피 감시에 마지막 시도에서 후행 값을 캐싱. 오류 처리기에서의 재귀는 후속 시도가 계속 실패 할 경우 스트림을 계속 확장한다는 것을 의미합니다 (retryWithLast).

0

buffer() 운영자에게 가까운보세요

requestOrResume() 
    .buffer(2) 

지금부터, 당신의 Flowable이 latests와 List<Message>를 반환합니다 당신이 다시 발생하지 않는 것을

+0

이 상황에서'buffer' 연산자가 어떻게 도움이되는지 나는 알지 못합니다. – ESala