2017-02-27 15 views
1

간단한 시나리오를 감안할 때 A가 방에서 납치 되었습니까?하트 비트 패턴을 사용하여 반응 확장

A가 대화 할 때 A는 IObservable 을 제공하고 B는 Talk.Subscribe (string => A가 말한 것을 처리)에 가입합니다. B는 동시에 Observable에 가입 ​​할 수 있습니다 .Interval Heartbeat을 하트 비트 검사로 사용할 수 있습니다.

내 질문은 운영자가 나는/병합 의 두 항목 하트 비트 이상 로부터 항목 토크이없는 경우, B 가정 것이라는 A가 납치되어 있으므로이 IObservable을 결합하기 위해 사용해야하는 것이다.

해당 변수를 올바르게 동기화하지 않으면 부작용이 발생할 수 있으므로 변수를 피하기를 원합니다.

감사합니다,

답변

2

당신이 'A'부터 하트 비트의 수를 나타내는 상태로,에 역할을 할 상태 변수를 상상해 마지막 말했다. 즉,이과 같습니다

var stateObservable = Observable.Merge(     //State represent number of heartbeats since A last spoke 
    aSource.Select(_ => new Func<int, int>(i => 0)),  //When a talks, set state to 0 
    bHeartbeat.Select(_ => new Func<int, int>(i => i + 1)) //when b heartbeats, increment state 
) 
    .Scan(0, (state, func) => func(state)); 

우리는 국가를 증가로 0으로 상태를 재설정하는 기능과 같은 말하기의 사건, 그리고 B의 heartbeatting의 사건을 나타냅니다. 그런 다음 Scan 함수로 누적됩니다.

나머지는 이제 쉽게 :

var isKidnapped = stateObservable 
    .Where(state => state >= 2) 
    .Take(1); 

isKidnapped.Subscribe(_ => Console.WriteLine("A is kidnapped")); 

편집 : 귀하의 답변에 대한

var aSources = new Subject<Tuple<string, Subject<string>>>(); 
var bHeartbeat = Observable.Interval(TimeSpan.FromSeconds(1)).Publish().RefCount(); 

var stateObservable = aSources.SelectMany(t => 
     Observable.Merge(
      t.Item2.Select(_ => new Func<int, int>(i => 0)), 
      bHeartbeat.Select(_ => new Func<int, int>(i => i + 1)) 
     ) 
     .Scan(0, (state, func) => func(state)) 
     .Where(state => state >= 2) 
     .Take(1) 
     .Select(_ => t.Item1) 
    ); 

stateObservable.Subscribe(s => Console.WriteLine($"{s} is kidnapped")); 
aSources 
    .SelectMany(t => t.Item2.Select(s => Tuple.Create(t.Item1, s))) 
    .Subscribe(t => Console.WriteLine($"{t.Item1} says '{t.Item2}'")); 
bHeartbeat.Subscribe(_ => Console.WriteLine("**Heartbeat**")); 

var a = new Subject<string>(); 
var c = new Subject<string>(); 
var d = new Subject<string>(); 
var e = new Subject<string>(); 
var f = new Subject<string>(); 

aSources.OnNext(Tuple.Create("A", a)); 
aSources.OnNext(Tuple.Create("C", c)); 
aSources.OnNext(Tuple.Create("D", d)); 
aSources.OnNext(Tuple.Create("E", e)); 
aSources.OnNext(Tuple.Create("F", f)); 

a.OnNext("Hello"); 
c.OnNext("My name is C"); 
d.OnNext("D is for Dog"); 
await Task.Delay(TimeSpan.FromMilliseconds(1200)); 
e.OnNext("Easy-E here"); 
a.OnNext("A is for Apple"); 
await Task.Delay(TimeSpan.FromMilliseconds(2200)); 
+0

감사 @Shlomo : 여기

N 소스와 예제입니다 ,이 솔루션을 n + 1 wh로 확장 할 수있는 방법이 있습니까? ere n은 런타임에만 결정될 수있는 토커의 수입니까? – LxL

+0

가능성이 큽니다. 문제를 더 잘 정의 할 수 있습니까? – Shlomo

+0

현재 A와 B 만 있습니다. A의 역할과 같은 역할을 할 때 C, D, E, F에 동일한 솔루션을 적용 할 수 있는지 알고 싶습니다. 지금까지 생각할 수있는 솔루션은 IObservable 목록과 런타임에 추가 될 구독 목록입니다. 더 우아한 방법이 있습니까? – LxL