0
저는 주가 분석을위한 도구를 만들려고합니다.어떻게 그룹화 된 관측 값을 올바르게 재결합합니까?
다른 주식에 대한 가격 데이터 스트림이 있는데, 새롭고 별개의 완전한 가격 집합을받을 때마다 이벤트를 내 보내고 싶습니다.
내 계획 : 다른 주식에 대한 스트림을 다른 하위 스트림으로 그룹화하고 최신 값을 재결합합니다.
a_source = price_source.filter(lambda x: x['stock'] == 'A').distinct_until_changed()
b_source = price_source.filter(lambda x: x['stock'] == 'B').distinct_until_changed()
c_source = price_source.filter(lambda x: x['stock'] == 'C').distinct_until_changed()
d_source = price_source.filter(lambda x: x['stock'] == 'D').distinct_until_changed()
(Observable
.combine_latest(a_source, b_source, c_source, d_source, lambda *x: x)
.subscribe(print))
이 올바르게 나에게 제공합니다 :
({'stock': 'A', 'price': 18}, {'stock': 'B', 'price': 24}, {'stock': 'C', 'price': 37}, {'stock': 'D', 'price': 42})
({'stock': 'A', 'price': 18}, {'stock': 'B', 'price': 27}, {'stock': 'C', 'price': 37}, {'stock': 'D', 'price': 42})
({'stock': 'A', 'price': 18}, {'stock': 'B', 'price': 27}, {'stock': 'C', 'price': 31}, {'stock': 'D', 'price': 42})
({'stock': 'A', 'price': 18}, {'stock': 'B', 'price': 27}, {'stock': 'C', 'price': 31}, {'stock': 'D', 'price': 44})
을 여기
from rx import Observable
stock_events = [
{'stock': 'A', 'price': 15},
{'stock': 'A', 'price': 16},
{'stock': 'B', 'price': 24},
{'stock': 'C', 'price': 37},
{'stock': 'A', 'price': 18},
{'stock': 'D', 'price': 42},
{'stock': 'B', 'price': 27},
{'stock': 'B', 'price': 27},
{'stock': 'C', 'price': 31},
{'stock': 'D', 'price': 44}
]
price_source = Observable.from_list(stock_events)
내 첫 번째 (순진) 접근 방식 :
의 내가 같은 이벤트 스트림을 가지고 있다고 가정 해 봅시다
그러나 여기서는 몇 가지 필터링 대신 group_by
을 더 잘 처리해야한다고 생각합니다. 내가 얻을
(price_source
.group_by(lambda e: e['stock'])
.map(lambda obs: obs.distinct_until_changed())
.combine_latest(lambda *x: x)
.subscribe(print))
그러나이 시간 :
(<rx.core.anonymousobservable.AnonymousObservable object at 0x000000000105EA20>,)
(<rx.core.anonymousobservable.AnonymousObservable object at 0x000000000776AB00>,)
(<rx.core.anonymousobservable.AnonymousObservable object at 0x000000000776A438>,)
(<rx.core.anonymousobservable.AnonymousObservable object at 0x000000000775E7F0>,)
내가 여기 놓친 무엇 '은 다시 쓰기이야? 중첩 된 관측 값을 "언랩"하는 방법은 무엇입니까?
귀하의 상황에서 GroupBy가 올바른 방법인지 확신 할 수 없습니다. 모든 주식이 발행 될 때까지 요소를 출력하고 싶지 않으므로 주식 시세 표시기가 무엇인지 알 수 있습니다. 나는 Combine 최신이 당신의 유스 케이스에 완벽하다고 생각한다. 게시와 함께 기본 서브를 공유하고 싶을 수도 있습니다. – user630190