이 질문은 rxpy에 관한 것입니다.rxpy는 항목을 관찰 할 수있게 넣습니다.
소스에서 관찰 할 수있는 메시지를 처리하는 반응적인 시스템을 구축하려고합니다. 그 외에도, 나는 그것을 사육사를 기반으로 한 리더 선거 시스템과 통합하려고 노력하고있다.
이 조합을 사용하면 프로세스 팜의 한 리더 만 메시지 스트림을 처리 할 수 있습니다. 아래는 내가 작성하려고하는 코드의 요지입니다.
# event_source is an observable of messages
# manager.leaders is an observable of leader election events
# manager.followers is an observable of leader relinquish events
event_source\
.skip_until(manager.leaders)\
.take_until(manager.followers)\
.subscribe(observer)
이 잘 모든 작동하지만, 나는 skip_until
및 take_until
백필 처리 할 수있는 조각 사이에 삽입해야합니다. 이는 리더 프로세스 실패와 리더십을 전제로하는 다른 프로세스 간의 잠재적 갭을 처리하도록 설계되었습니다. 처리 된 모든 메시지는 레코드를 남겨 두어 스트림을 진행하기 전에 새 리더가 누락 된 메시지를 포착 할 수 있도록합니다.
나는 성공한 start_with
연산자를 시도했다. 내가 사용하지 않는 방식으로 접근하지 않습니까?
궁극적으로, 내가 찾고있는 솔루션은 다른 스트림의 이벤트에 의해 트리거되는 스트림에 특정 수의 항목을 주입하는 것입니다. 이것에 대해