2017-03-22 3 views
0

항목 스트림이 있고 조건이 충족 될 때까지 속성별로 그룹화하는 구독을 만들고 싶습니다.RX 제목, 조건이 충족 될 때까지 그룹화/연결

예를 들어 Id이 나타날 때까지 EventItem을 이름으로 그룹화한다고 가정 해 봅시다. 요점은 더 이상 관련 이벤트가 없다는 것을 알리는 특정 Id을 볼 때까지 이벤트를 상관시키는 것입니다. 그런 다음 상호 연관된 이벤트의 각 그룹에 대해 몇 가지 작업을 수행하려고합니다.

public class EventItem 
{ 
    public int Id { get; set } 
    public string Name { get; set; } 
} 

// Using a Subject since it seems the simplest way 
Subject<EventItem> eventStream; 
... 

// A seperate thread pushes EventItem objects into the Subject 
eventStream.OnNext(eventItem); 

// Struggling here... 
IDisposable subscription = eventStream.???? 

나는 TakeWhileTakeUntil, GroupByUntil, GroupBy으로 여러 조합을 시도했습니다,하지만 난

+0

내가 부탁 할 수있는 경우에'는 IDisposable 가입 = eventStream.TakeWhile (! 에이 => ei.Id = 42) .GroupBy (X => x.Name) .Subscribe (Y => {}); '당신이 찾고있는 것이 아닙니까? – Enigmativity

+0

@Enigmativity는 작동하는 것처럼 보이지만 한 번만; 즉 그룹을 본 후에는 더 이상 볼 수 없습니다 (구독이 종료 되었습니까?) – Cocowalla

+0

무슨 뜻인지 보여주는 샘플 데이터를 제공 할 수 있습니까? – Enigmativity

답변

1

편집 (I은 수신과 함께 오히려 경험이 있어요)이 작업을 수행하는 방법을 알아낼 수 없습니다 @ 바탕으로

슐 로모의 대답하고 싶은에 대한 영업 이익의 의견을 완료 할 수 있습니다

var subscription = producer 
     .TakeWhile(ei => ei.Id != 42) 
     .GroupBy(x => x.Name) 
     .Select(o => o) 
     .SelectMany(o => o.ToList()) //If you want y to be IList<EventItem>, use this line. If you prefer y to be IObservable<EventItem>, comment out. 
     .Repeat() 
     .Subscribe(); 
,

원본 응답

이 방법이 유용합니까?

 Subject<Notification> producer = new Subject<Notification>(); 

     //This way there's only one producer feeding the group and the duration 
     var connectedProducer = 
      producer 
       .Publish() 
       .RefCount(); 

     connectedProducer 
      .GroupByUntil(
       item => item.Name, 
       item => connectedProducer.Where(x=> x.Id == 3)) 
      .SelectMany(result => 
      { 
       return result.Aggregate<Notification, List<Notification>>(new List<Notification>(), (dict, item) => 
       { 
        dict.Add(item); 
        return dict; 
       }); 
      }) 
      //not sure if you need this but just a way to filter out the signal 
      .Where(item => item.First().Id != 3) 
      .Subscribe(results => 
      { 
       //This will only run after a "3" is passed in and then you get a list of everything passed in with the names 
       //not sure if you wanted intermediate results or for it all to just come through once the signal indicates processing 
      }); 

저는 이전 답변에서 벗어나 약간 수정했습니다. Rx grouped throttling

2

이미 알고있는 답변이 있지만 복잡한 것 같습니다. Enigmativity의 코멘트 @ 떨어져 사용 :

var subscription = eventStream 
    .GroupBy(x => x.Name) 
    .Select(o => o.TakeWhile(ei => ei.Id != 42)) 
    .SelectMany(o => o.ToList()) //If you want y to be IList<EventItem>, use this line. If you prefer y to be IObservable<EventItem>, comment out. 
    .Subscribe(y => {}); 
+0

나는 서면으로이 하나를 시도하고 난 그냥 y에 빈 목록을 얻고있다. 나는 TakeWhile을 eventStream과 GroupBy 사이에서 옮겼다. 그것은 또한 완료하지만 그것은 OP가 계속 가고 싶어 반복하면 쉽게 처리 할 수 ​​있습니다. –

+0

이 방법이 효과적이지만 결과에 마지막 이벤트가 없습니다 (예 :'Id == 42'가있는 경우). 나는 당신과 답을 결합하여 간단한 것을 얻을 수 있습니다. (형식 상 죄송합니다 ...) : SelectMany (x => x.ToList()) 'This.subscription = this.eventStream.GroupByUntil (x => x.Name, x => this.eventStream.Where (y => y.Id == 42) .Subscribe (x => {}); ' – Cocowalla

+0

너무 좋아 보인다 :-) 이러한 질문을 유도하는 데 도움이되는 한 가지는 입력과 출력에 대해 기대하는 것을 보여줍니다. 예를 들어이 질문에 대한 http://stackoverflow.com/questions/42694010/rx-grouped-throttling/42703750#42703750. 질문에 대한 몇 가지 해석이 있음을 알 수 있습니다. –