소켓 프로토콜 구현을 단순화하기 위해 Reactive Extensions를 async/await과 함께 사용하고 있습니다. 특정 메시지가 도착할 때 (예 : 각 '핑'메시지에 'pong'보내기) 수행해야하는 작업이 있으며 특정 응답을 비동기 적으로 기다려야하는 방법도 있습니다.async/await, RX 및 LINQ를 사용한 비동기 메시지 처리
private Subject<string> MessageReceived = new Subject<string>();
//this method gets called every time a message is received from socket
internal void OnReceiveMessage(string message)
{
MessageReceived.OnNext(message);
ProcessMessage(message);
}
public async Task<string> TestMethod()
{
var expectedMessage = MessageReceived.Where(x => x.EndsWith("D") && x.EndsWith("F")).FirstOrDefaultAsync();
await SendMessage("ABC");
//some code...
//if response we are waiting for comes before next row, we miss it
return await expectedMessage;
}
TestMethod() 소켓에 "ABC"를 보내고 예 "DEF"를 수신 할 때 (그 전에 몇 가지 다른 메시지가있을 수 있습니다) 계속 다음 예는이 보여줍니다.
거의 작동하지만 경쟁 조건이 있습니다. 이 코드는 return await expectedMessage;
까지 메시지를 수신하지 않는 것으로 보입니다.이 메시지는 그 전에 메시지가 도착하기 때문에 문제입니다.
'expectedMessage'를 기다리는 동안 pong이 보내지지 않는다는 것을 의미합니까? 이 경우'OnReceiveMessage' 안에'ping' 메시지를'MessageReceived.OnNext()'와 함께 저장하지 말고 처리해야합니다. –
실제로 그 부분을 생략했습니다. 괜찮 았어. ProcessMessages() -method에서 ping-message에 대한 응답으로 pong을 보내지 만, 문제는 비동기 메서드에서 특정 이벤트 (소켓으로부터받은 메시지)를 기다리고 있습니다. – Juha