2017-01-23 5 views
1

GRPC 스트림을 통해 관찰 가능 항목을 표시하려고합니다. 내 간단한 코드는 다음과 같습니다Grpc - rx observable 사용

public override async Task Feed(Request request, IServerStreamWriter<Response> responseStream, ServerCallContext context) 
     { 
      var result = new Result(); 
      try 
      { 
       await Observable.ForEachAsync(async value => 
       { 
        await responseStream.WriteAsync(value); 
       }); 

      } 
      catch (Exception ex) 
      { 
       Log.Info("Session ended:" + ex); 
      } 
     } 

나는 다음과 같은 오류가 나타납니다

W:30:59.709715 Grpc.Core.Internal.ServerStreamingServerCallHandler 2 Exception occured in handler. System.ArgumentException: Der Wert liegt außerhalb des erwarteten Bereichs. bei Grpc.Core.Internal.ServerStreamingServerCallHandler 2.d__4.MoveNext() W:30:59.732716 Grpc.Core.Server Exception while handling RPC. System.InvalidOperationException: Der Vorgang ist aufgrund des aktuellen Zustands des Objekts ungültig. bei Grpc.Core.Internal.AsyncCallServer 2.SendStatusFromServerAsync(Status status, Metadata trailers, Tuple 2 optionalWrite) bei Grpc.Core.Internal.ServerStreamingServerCallHandler`2.d__4.MoveNext() --- Ende der Stapelüberwachung vom vorhergehenden Ort, an dem die Ausnahme ausgelöst wurde --- bei System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task) bei System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) bei Grpc.Core.Server.d__34.MoveNext()

가 어떻게이 문제를 처리하기 위해 추천을? 같은 스레드에서 ForEachAsync를 처리해야한다고 생각합니다.

+0

rx 대신 동기화 메커니즘으로 BufferBlock으로 이동했습니다. 이것은 스케쥴링과 함께 rx를 사용하는 것보다 더 깔끔하고 쉽습니다. – weismat

답변

3

gRPC 스트리밍 API를 사용하면 한 번에 하나의 항목 만 쓸 수 있습니다. 이전 WriteAsync() 작업이 완료되기 전에 다른 WriteAsync() 작업을 시작하면 예외가 발생합니다. 또한 메서드 처리기 (이 경우 Feed 메서드)에서 반환하기 전에 모든 쓰기 작업을 완료해야합니다. 한 번에 하나의 쓰기 만 허용되는 이유는 gRPC의 흐름 제어가 제대로 작동하는지 확인하기 위해서입니다.

Rx API가이를 보장 할 수없는 것 같습니다.이를 해결할 수있는 한 가지 방법은 중간 버퍼를 사용하는 것입니다.

+0

나는이면에서 동의한다. 또 다른 질문은 IDisposable을 구현하지 않으므로 IServerStreamWriter의 수명이 결정되는 방법입니다. 주석으로 나는 비동기 Fifo 대기열로 BufferBlock을 동기화 도구로 사용하기 시작했습니다. – weismat

+0

@ weismat : gRPC 스트림에서 관찰 가능 항목을 만들 수 있었습니까? BufferBlock이 어떻게이 작업을 수행하는 데 도움이되었는지 이해할 수 없습니다. 예를 들어 주시겠습니까? 고마워. –

+0

비트 관련 https://stackoverflow.com/questions/44859534/grpc-async-response-stream-c-sharp –