0

저는 주가 분석을위한 도구를 만들려고합니다.어떻게 그룹화 된 관측 값을 올바르게 재결합합니까?

다른 주식에 대한 가격 데이터 스트림이 있는데, 새롭고 별개의 완전한 가격 집합을받을 때마다 이벤트를 내 보내고 싶습니다.

내 계획 : 다른 주식에 대한 스트림을 다른 하위 스트림으로 그룹화하고 최신 값을 재결합합니다.

a_source = price_source.filter(lambda x: x['stock'] == 'A').distinct_until_changed() 
b_source = price_source.filter(lambda x: x['stock'] == 'B').distinct_until_changed() 
c_source = price_source.filter(lambda x: x['stock'] == 'C').distinct_until_changed() 
d_source = price_source.filter(lambda x: x['stock'] == 'D').distinct_until_changed() 

(Observable 
    .combine_latest(a_source, b_source, c_source, d_source, lambda *x: x) 
    .subscribe(print)) 

이 올바르게 나에게 제공합니다 :

({'stock': 'A', 'price': 18}, {'stock': 'B', 'price': 24}, {'stock': 'C', 'price': 37}, {'stock': 'D', 'price': 42}) 
({'stock': 'A', 'price': 18}, {'stock': 'B', 'price': 27}, {'stock': 'C', 'price': 37}, {'stock': 'D', 'price': 42}) 
({'stock': 'A', 'price': 18}, {'stock': 'B', 'price': 27}, {'stock': 'C', 'price': 31}, {'stock': 'D', 'price': 42}) 
({'stock': 'A', 'price': 18}, {'stock': 'B', 'price': 27}, {'stock': 'C', 'price': 31}, {'stock': 'D', 'price': 44}) 
을 여기
from rx import Observable 

stock_events = [ 
    {'stock': 'A', 'price': 15}, 
    {'stock': 'A', 'price': 16}, 
    {'stock': 'B', 'price': 24}, 
    {'stock': 'C', 'price': 37}, 
    {'stock': 'A', 'price': 18}, 
    {'stock': 'D', 'price': 42}, 
    {'stock': 'B', 'price': 27}, 
    {'stock': 'B', 'price': 27}, 
    {'stock': 'C', 'price': 31}, 
    {'stock': 'D', 'price': 44} 
] 

price_source = Observable.from_list(stock_events) 

내 첫 번째 (순진) 접근 방식 :

의 내가 같은 이벤트 스트림을 가지고 있다고 가정 해 봅시다

그러나 여기서는 몇 가지 필터링 대신 group_by을 더 잘 처리해야한다고 생각합니다. 내가 얻을

(price_source 
.group_by(lambda e: e['stock']) 
.map(lambda obs: obs.distinct_until_changed()) 
.combine_latest(lambda *x: x) 
.subscribe(print)) 

그러나이 시간 :

(<rx.core.anonymousobservable.AnonymousObservable object at 0x000000000105EA20>,) 
(<rx.core.anonymousobservable.AnonymousObservable object at 0x000000000776AB00>,) 
(<rx.core.anonymousobservable.AnonymousObservable object at 0x000000000776A438>,) 
(<rx.core.anonymousobservable.AnonymousObservable object at 0x000000000775E7F0>,) 

내가 여기 놓친 무엇 '은 다시 쓰기이야? 중첩 된 관측 값을 "언랩"하는 방법은 무엇입니까?

+0

귀하의 상황에서 GroupBy가 올바른 방법인지 확신 할 수 없습니다. 모든 주식이 발행 될 때까지 요소를 출력하고 싶지 않으므로 주식 시세 표시기가 무엇인지 알 수 있습니다. 나는 Combine 최신이 당신의 유스 케이스에 완벽하다고 생각한다. 게시와 함께 기본 서브를 공유하고 싶을 수도 있습니다. – user630190

답변

0

groupby를 사용하려는 경우 C#에서 아래와 같이됩니다. 이것은 "완전한"세트의 요구 사항을 충족시키지 못합니다. 의견에 따라 CombineLatest가 더 좋을 것 같습니다.

price_source.GroupBy(x => x.Stock) 
      .Select(gp => gp.DistinctUntilChanged(x => x.Price)) 
      .SelectMany(x => x) 
      .Subscribe(s => Console.WriteLine($"{s.Stock} : {s.Price}"));