2017-03-28 3 views
0

소켓 프로토콜 구현을 단순화하기 위해 Reactive Extensions를 async/await과 함께 사용하고 있습니다. 특정 메시지가 도착할 때 (예 : 각 '핑'메시지에 'pong'보내기) 수행해야하는 작업이 있으며 특정 응답을 비동기 적으로 기다려야하는 방법도 있습니다.async/await, RX 및 LINQ를 사용한 비동기 메시지 처리

private Subject<string> MessageReceived = new Subject<string>(); 

//this method gets called every time a message is received from socket 
internal void OnReceiveMessage(string message) 
{ 
    MessageReceived.OnNext(message); 
    ProcessMessage(message); 
} 

public async Task<string> TestMethod() 
{ 
    var expectedMessage = MessageReceived.Where(x => x.EndsWith("D") && x.EndsWith("F")).FirstOrDefaultAsync(); 
    await SendMessage("ABC"); 

    //some code... 

    //if response we are waiting for comes before next row, we miss it 
    return await expectedMessage; 
} 

TestMethod() 소켓에 "ABC"를 보내고 예 "DEF"를 수신 할 때 (그 전에 몇 가지 다른 메시지가있을 수 있습니다) 계속 다음 예는이 보여줍니다.

거의 작동하지만 경쟁 조건이 있습니다. 이 코드는 return await expectedMessage;까지 메시지를 수신하지 않는 것으로 보입니다.이 메시지는 그 전에 메시지가 도착하기 때문에 문제입니다.

+0

'expectedMessage'를 기다리는 동안 pong이 보내지지 않는다는 것을 의미합니까? 이 경우'OnReceiveMessage' 안에'ping' 메시지를'MessageReceived.OnNext()'와 함께 저장하지 말고 처리해야합니다. –

+0

실제로 그 부분을 생략했습니다. 괜찮 았어. ProcessMessages() -method에서 ping-message에 대한 응답으로 pong을 보내지 만, 문제는 비동기 메서드에서 특정 이벤트 (소켓으로부터받은 메시지)를 기다리고 있습니다. – Juha

답변

4

FirstOrDefaultAsync 여기서는 잘 작동하지 않습니다. 귀하가 지적한대로 경쟁 조건을 벗어나는 await 라인까지 구독하지 않습니다. 여기 당신이 그것을 대체 할 수있는 방법은 다음과 같습니다

var expectedMessage = MessageReceived 
     .Where(x => x.EndsWith("D") && x.EndsWith("F")) 
     .Take(1) 
     .Replay(1) 
     .RefCount(); 

    using (var dummySubscription = expectedMessage.Subscribe(i => {})) 
    { 
     await SendMessage("ABC"); 

     //Some code... goes here. 

     return await expectedMessage; 
    } 

.Replay(1) 하나가있는 가정, 새 구독은 가장 최근의 항목을 얻을 수 있는지 확인합니다. 듣고있는 가입자가있는 경우에만 작동하지만, 따라서 dummySubscription입니다.