2017-03-22 3 views

항목 스트림이 있고 조건이 충족 될 때까지 속성별로 그룹화하는 구독을 만들고 싶습니다.RX 제목, 조건이 충족 될 때까지 그룹화/연결

예를 들어 Id이 나타날 때까지 EventItem을 이름으로 그룹화한다고 가정 해 봅시다. 요점은 더 이상 관련 이벤트가 없다는 것을 알리는 특정 Id을 볼 때까지 이벤트를 상관시키는 것입니다. 그런 다음 상호 연관된 이벤트의 각 그룹에 대해 몇 가지 작업을 수행하려고합니다.

public class EventItem 
    public int Id { get; set } 
    public string Name { get; set; } 

// Using a Subject since it seems the simplest way 
Subject<EventItem> eventStream; 

// A seperate thread pushes EventItem objects into the Subject 

// Struggling here... 
IDisposable subscription = eventStream.???? 

나는 TakeWhileTakeUntil, GroupByUntil, GroupBy으로 여러 조합을 시도했습니다,하지만 난


내가 부탁 할 수있는 경우에'는 IDisposable 가입 = eventStream.TakeWhile (! 에이 => ei.Id = 42) .GroupBy (X => x.Name) .Subscribe (Y => {}); '당신이 찾고있는 것이 아닙니까? – Enigmativity


@Enigmativity는 작동하는 것처럼 보이지만 한 번만; 즉 그룹을 본 후에는 더 이상 볼 수 없습니다 (구독이 종료 되었습니까?) – Cocowalla


무슨 뜻인지 보여주는 샘플 데이터를 제공 할 수 있습니까? – Enigmativity



편집 (I은 수신과 함께 오히려 경험이 있어요)이 작업을 수행하는 방법을 알아낼 수 없습니다 @ 바탕으로

슐 로모의 대답하고 싶은에 대한 영업 이익의 의견을 완료 할 수 있습니다

var subscription = producer 
     .TakeWhile(ei => ei.Id != 42) 
     .GroupBy(x => x.Name) 
     .Select(o => o) 
     .SelectMany(o => o.ToList()) //If you want y to be IList<EventItem>, use this line. If you prefer y to be IObservable<EventItem>, comment out. 

원본 응답

이 방법이 유용합니까?

 Subject<Notification> producer = new Subject<Notification>(); 

     //This way there's only one producer feeding the group and the duration 
     var connectedProducer = 

       item => item.Name, 
       item => connectedProducer.Where(x=> x.Id == 3)) 
      .SelectMany(result => 
       return result.Aggregate<Notification, List<Notification>>(new List<Notification>(), (dict, item) => 
        return dict; 
      //not sure if you need this but just a way to filter out the signal 
      .Where(item => item.First().Id != 3) 
      .Subscribe(results => 
       //This will only run after a "3" is passed in and then you get a list of everything passed in with the names 
       //not sure if you wanted intermediate results or for it all to just come through once the signal indicates processing 

저는 이전 답변에서 벗어나 약간 수정했습니다. Rx grouped throttling


이미 알고있는 답변이 있지만 복잡한 것 같습니다. Enigmativity의 코멘트 @ 떨어져 사용 :

var subscription = eventStream 
    .GroupBy(x => x.Name) 
    .Select(o => o.TakeWhile(ei => ei.Id != 42)) 
    .SelectMany(o => o.ToList()) //If you want y to be IList<EventItem>, use this line. If you prefer y to be IObservable<EventItem>, comment out. 
    .Subscribe(y => {}); 

나는 서면으로이 하나를 시도하고 난 그냥 y에 빈 목록을 얻고있다. 나는 TakeWhile을 eventStream과 GroupBy 사이에서 옮겼다. 그것은 또한 완료하지만 그것은 OP가 계속 가고 싶어 반복하면 쉽게 처리 할 수 ​​있습니다. –


이 방법이 효과적이지만 결과에 마지막 이벤트가 없습니다 (예 :'Id == 42'가있는 경우). 나는 당신과 답을 결합하여 간단한 것을 얻을 수 있습니다. (형식 상 죄송합니다 ...) : SelectMany (x => x.ToList()) 'This.subscription = this.eventStream.GroupByUntil (x => x.Name, x => this.eventStream.Where (y => y.Id == 42) .Subscribe (x => {}); ' – Cocowalla


너무 좋아 보인다 :-) 이러한 질문을 유도하는 데 도움이되는 한 가지는 입력과 출력에 대해 기대하는 것을 보여줍니다. 예를 들어이 질문에 대한 http://stackoverflow.com/questions/42694010/rx-grouped-throttling/42703750#42703750. 질문에 대한 몇 가지 해석이 있음을 알 수 있습니다. –