2017-01-27 3 views
0

Observable이 processedMessages입니다. 메시지를 버퍼링하고 10 개의 메시지마다 또는 30 초마다 외부 큐에서 메시지를 삭제해야합니다. 이 코드는 내 서비스의 생성자에 의해 불려각 간격마다 내 onNext 값을 제공하는 대신 내 버퍼 된 관찰 가능 항목을 완료하는 이유는 무엇입니까?

this.processedMessages.Where(m => m != null && some criteria here) 
    .Buffer(TimeSpan.FromSeconds(30), 10, this.schedulerProvider.Concurrent) 
    .Subscribe(
     (observer) => { DeleteMessages(observer); }); 

:이 작업을 수행하는을 가정 입니다 다음과 같은 코드가 있습니다. 첫 번째 호출시 processedMessages은 빈 관측 가능합니다. Messages은 서비스가 실행되는 동안 processedMessages에 잠재적으로 추가됩니다. processedMessages에 메시지를 추가하는 한 계속 삭제해야합니다. 대신이 코드는 한 번 실행되고 (30 초 지연을 고려하지 않음) 즉시 DeleteMessages를 호출 한 다음 완료됩니다.

나는 Observable.Create()에서 모든 것을 래핑하고 나중에 등록하려고했습니다. 그것은 코드를 한 번 실행하지만 결코 반복하지 않습니다. 나는 다양한 하위 관측에 Repeat()을 추가하려고 시도했지만 (무언가를 거기에 싣고 다니는화물에 대한 확고부동 함), 무한 루프에 빠지거나 동일한 메시지를 계속해서 반복해서 삭제합니다.

내 문제는 .Subscribe()processedMessages에 절대로 호출하지 않는다고 판단됩니다. Buffered 투영에서만. 그러나 내가 그 관측 가능 물을 구독해야한다면, 버퍼 확장 방법의 핵심은 무엇입니까?

편집 : 슐 로모의 답변에 따라

가, 여기에 내가 processedMessages을 만든 방법입니다.

나는 모두를 시도 다음과 그들 만이 두 옵션은 단일 메시지 다음 완료와 관측 만드는 것이 무슨 일을하는지 구체적으로 고려하여 실현 : 그럼 올바른 후속 질문을 할

this.processedMessages.Concat(Observable.Return(newMessage)); 

this.processedMessages.Concat(new List<IMessage>() { newMessage }).ToObservable(); 

새 메시지를 processedMessages에 추가하려면 어떻게해야합니까?

답변

0

귀하의 관찰 3 가지 조건에 DeleteMessages을 트리거한다 게시 된 :

  1. 10 개의 메시지가
  2. 30 초 가입 또는
  3. 관찰 가능한 마지막 트리거 이후 경과 가입 또는 마지막 트리거 이후에 ​​도착했습니다 processedMessages 완료.

processedMessages에 대한 가시성이 없으므로 더 이상 디버깅 할 수 없습니다.