Observable이 processedMessages
입니다. 메시지를 버퍼링하고 10 개의 메시지마다 또는 30 초마다 외부 큐에서 메시지를 삭제해야합니다. 이 코드는 내 서비스의 생성자에 의해 불려각 간격마다 내 onNext 값을 제공하는 대신 내 버퍼 된 관찰 가능 항목을 완료하는 이유는 무엇입니까?
this.processedMessages.Where(m => m != null && some criteria here)
.Buffer(TimeSpan.FromSeconds(30), 10, this.schedulerProvider.Concurrent)
.Subscribe(
(observer) => { DeleteMessages(observer); });
:이 작업을 수행하는을 가정 입니다 다음과 같은 코드가 있습니다. 첫 번째 호출시 processedMessages
은 빈 관측 가능합니다. Messages
은 서비스가 실행되는 동안 processedMessages
에 잠재적으로 추가됩니다. processedMessages
에 메시지를 추가하는 한 계속 삭제해야합니다. 대신이 코드는 한 번 실행되고 (30 초 지연을 고려하지 않음) 즉시 DeleteMessages를 호출 한 다음 완료됩니다.
나는 Observable.Create()
에서 모든 것을 래핑하고 나중에 등록하려고했습니다. 그것은 코드를 한 번 실행하지만 결코 반복하지 않습니다. 나는 다양한 하위 관측에 Repeat()
을 추가하려고 시도했지만 (무언가를 거기에 싣고 다니는화물에 대한 확고부동 함), 무한 루프에 빠지거나 동일한 메시지를 계속해서 반복해서 삭제합니다.
내 문제는 .Subscribe()
을 processedMessages
에 절대로 호출하지 않는다고 판단됩니다. Buffered 투영에서만. 그러나 내가 그 관측 가능 물을 구독해야한다면, 버퍼 확장 방법의 핵심은 무엇입니까?
편집 : 슐 로모의 답변에 따라
가, 여기에 내가 processedMessages
을 만든 방법입니다.
this.processedMessages.Concat(Observable.Return(newMessage));
this.processedMessages.Concat(new List<IMessage>() { newMessage }).ToObservable();
새 메시지를 processedMessages
에 추가하려면 어떻게해야합니까?