2017-03-19 7 views
0

나는 로컬 파일 시스템에 문자 메시지를 저장하는 헬퍼 클래스가 있습니다. 이 메서드는 Task 개체를 반환하며 정의에 따라 비동기입니다.Rx.Net : 관찰 비동기 이벤트는 indifinitely

나는이 메소드가 호출 될 때 관찰 할 수 있도록하려면, 그래서 지속적으로 그 기반으로 결정을 버퍼의 크기와 길이를 모니터링 할 수 있습니다.

나는 .NET에 대한 반응성 확장을 사용하여이를 구현하기 위해 노력하고 있습니다. 그러나 나는 버퍼에 메시지를 지속적으로들을 수있는 디자인을 생각해 낼 수 없다. 내가 관찰에 가입하는 방법

다음
public IObservable<Unit> Receive(InternalMessage message) 
     { 
      var observable = FileBuffer.BufferMessage(message.MessageId.ToString(), message, DateTime.UtcNow).ToObservable(); //This returns a Task, which I convert into an Observable 
      return observable; 
     } 

은 다음과 같습니다 :

IObservable<Unit> receiverObservable = batchHandler.Receive(message); 
      receiverObservable.Subscribe(
       x => Console.WriteLine("On next"), 
       ex => //TODO, 
       () => // Completed); 

나는 방법 Receive가 호출 될 때마다 호출 할 가입자를 원하는 아래에 내 현재의 구현입니다. 그러나 AFAIK는 일단이 메서드가 호출되면 관찰 가능 항목이 완료되고 시퀀스가 ​​종료되므로 향후 Receive에 대한 호출은 수신되지 않습니다.

Rx.Net 라이브러리를 사용하여 내가 관찰 할 수있는 관찰 가능한 패턴을 구현하는 방법, 즉 시퀀스를 열린 상태로 유지하고 비동기 메소드에 결과를 제공하는 방법을 권장 할 수 있습니까?

+0

가 왜 그냥 메시지가 버퍼링 될 때마다 호출됩니다 FileBuffer에서 이벤트를 만든 다음이 이벤트에 가입? – Evk

+0

FileBuffer 코드를 변경할 수있는 권한이 없습니다. BatchHandler 클래스에 이벤트를 추가 할 수는 있지만, Rx를 사용하여 문제를 직접 해결할 수 있는지 확인하고 싶습니다. 시퀀스를 관리하고 제공하는 도구가 앞으로 나에게 도움이 될 수 있습니다. –

+0

BatchHandler에 이벤트를 추가 한 다음 Observable.FromEvent를 사용하여 시퀀스를 만들 수 있습니다. 추가 혜택은 이해하기 쉽고 사용하기 쉬운 이벤트이며 그대로 사용하거나 관찰 가능한 시퀀스를 생성 할 수 있습니다. – Evk

답변

1

Receive 코드 한대로 IObservable<Unit>을 반환하며 단일 작업의 완료를 나타냅니다. 작업 완료의 스트림을 나타내는 IObservable<IObservable<Unit>>을 반환하는 항목에 가입하려고합니다.

은 아마 당신의 클래스를 설정하고 어떻게 당신이 그것을 호출하는 방법에 의존하는 최선이 작업을 수행하는 방식이 될 것입니다.

여기에 게으른 하나 : 새 호출이있을 때 그런 다음, 당신은 단지에 추가

Subject<IObservable<Unit>> subject = new Subject<IObservable<Unit>>(); 
subject.Merge().Subscribe(
    x => Console.WriteLine("On next"), 
    ex => { }, //TODO 
    () => { } // Completed 
); 

:

당신은 통화의 흐름을 나타내는 subject 변수 클래스 수준의 선언 제목. 이건 정말 게으른

IObservable<Unit> receiverObservable = batchHandler.Receive(message); 
subject.OnNext(receiverObservable); 

이유는 수신이 변경 가능한 상태 변수를 아래로 보는 경향이 핵심에서 작동한다는 것입니다. Subjects은 기본적으로 변경할 수있는 상태입니다.

더 나은 당신이 Receive를 호출하는 이유 때/파악하는 것입니다 할 수있는 방법과 구조가 관찰로. 작업이 완료되면 다음 작업을 수행 할 수 있습니다.

IObservable<Unit> sourceReasonsToCallReceive; // Most likely sourced from event 

sourceReasonsToCallReceive.SelectMany(_ => batchHandler.Receive(message)) 
    .SubScribe(
    x => Console.WriteLine("On next"), 
    ex => { }, //TODO 
    () => { } // Completed 
); 

희망이 있습니다.