2017-11-03 8 views
0

메시지를 읽거나 쓰는 방법이 있습니다.다중 호출을 피하는 공통 관찰 가능을 만듭니다.

fun sendMessage(message: String): Observable<MESSAGE> { 
    return readMessage().doOnSubscribe { 
      socket.write(message)  
    } 
} 

readMessage()는 스트림 (socket.read())에서 뜨거운 관찰에서 값을 방출하는 게시 주제를 다시 제공합니다.

protected fun readMessage(): Observable<MESSAGE> { 
    if (messageSubject == null) { 
     messageSubject = PublishSubject.create() 
     socket.read() 
       .flatMap { 
        [email protected] flowTransformer.runLoop(it) 
       } 
       .flatMap { 
        //Do some stuff 
       } 
       .subscribe(messageSubject) 
    } 
    return messageSubject 
} 

나는 같은 체인의 다른 점과 여러 번에 sendMessage()를 호출합니다. 나는 아직 게시자에 가입하지 않을 수 있습니다 (그래서 메시지 응답 드롭입니다) sendMessage()를 호출 할 때

 sendMessage("Message1").flatMap { 
     sendMessage("Message2") 
    }.flatMap { 
     sendMessage("Message 3") 
    }.subscribe({ 
     //next 
    }, { 
     //error 
    }) 

문제

입니다. ReplaySubject를 사용하면 너무 많은 메시지가 방출 될 것입니다. sendMessage()을 많이 사용하기 때문에 두려워합니다.

일부 sendMessage에서 readObservable은 다음 메시지를 모두 읽습니다. 그리고 파스 작업이 CPU를 많이 사용하기 때문에 문제가됩니다.

어떻게 체인을 개선 할 수 있습니까?

+0

첫 번째 메시지를 보내 자마자 바로 구독을 준비 할 수 있습니다. – tynn

+0

어떻게하면됩니까? 나는 sendMessage()를 구독 할 때만 소켓에 쓰는 것으로 이미 이것을했다고 생각한다. – Timo

답변

0

당신은 버퍼 크기

public void ReplaySubjectBufferExample() 
{ 
    var bufferSize = 2; 
    var subject = new ReplaySubject<string>(bufferSize); 
    subject.OnNext("a"); 
    subject.OnNext("b"); 
    subject.OnNext("c"); 
    subject.Subscribe(Console.WriteLine); 
    subject.OnNext("d"); 
} 

그래서, 이런 식으로, 당신이 재생 될 항목의 수를 제어 할 수 있습니다와 PublishSubject을 만들 수 있습니다. 소스 ReplaySubject