2017-03-29 12 views
1

Debian Wheezy의 Mono 4.8에서 NetMQ 4.0.0.1에 문제가 있습니다.Mono의 NetMQ DealerSocket이 Debian Wheezy의 서버로 아무런 메시지도 보내지 않는 이유는 무엇입니까?

딜러 메시지가 새 메시지를 보내기 위해 멈출 때까지 대리점 소켓이 메시지를 보내지 않습니다. 내가 넣을 때 Thread.Sleep(1000) 사이에 모든 것보다 작업을 만드는 것이 좋습니다. 모든 것이 Windows에서 .Net Framework 4.5 및 .Net Core 1.1에서 어떤 Thread.Sleep()도없이 작동하고 있음을 인정하고 싶습니다. enter image description here

내가 추가 한 디버그 메시지와 나는 루프에서 작업 100 REQ 소켓을 만드는 오전 것을 볼 수 있으며, 라우터보다, 큐에 요청을 받고 :

이 같은 패턴을 가지고 그 (것)들에게 물마루 딜러를 보내고 있습니다. 그리고 REQ 소켓으로 전화를 끊을 때까지 TCP의 다른 쪽에서 아무 일도 일어나지 않습니다. 매 5 개의 작업마다 간단한 Thread.Sleep()이 작동합니다. Poller 버그 또는 Dealer 버그처럼 보입니다. 또는 뭔가 잘못하고 있습니다.

class Program 
{ 
    private static Logger _logger = LogManager.GetCurrentClassLogger(); 


    private static void Main(string[] args) 
    { 
     Console.WriteLine("Client. Please enter message for server. Enter 'QUIT' to turn off server"); 
     Console.ReadKey(); 


     using (var collectorDevice = new CollectorDevice(">tcp://localhost:5556", "inproc://broker", 3)) 
     { 
      collectorDevice.Start(); 

      List<Task> tasks = new List<Task>(); 
      for (int i = 0; i < 100; i++) 
      { 
       Console.WriteLine(i); 
       int j = i;  
       Task t = Task.Factory.StartNew(() => 
       { 
        try 
        { 
         using (var req = new RequestSocket("inproc://broker")) 
         { 
          req.SendFrame(String.Format("Request client: {0} id: {1}", j, Task.CurrentId)); 
          _logger.Debug(String.Format("Request client: {0} id: {1}", j, Task.CurrentId)); 
          Console.WriteLine(String.Format("Request client: {0} id: {1}", j, Task.CurrentId)); 

          string responseMessage = req.ReceiveFrameString(); 
          _logger.Debug(String.Format("Response from server: {0} id: {1} message: {2}", j, Task.CurrentId, responseMessage)); 
          Console.WriteLine(String.Format("Response from server: {0} id: {1} message: {2}", j, Task.CurrentId, responseMessage)); 
         } 
        } 
        catch (Exception e) 
        { 
         Console.WriteLine(e); 
         _logger.Error(e); 
        } 
       }); 
       tasks.Add(t); 
       //Thread.Sleep (100);//<- This thread sleep is fixing problem? 
      } 

      Task.WaitAll(tasks.ToArray()); 
     } 

    } 
} 

그리고 서버 측 :

class Program 
{ 
    private static Logger _logger = LogManager.GetCurrentClassLogger(); 

    static void Main(string[] args) 
    { 
     try{ 
     using (var routerSocket = new RouterSocket("@tcp://*:5556")) 
     { 
      var poller = new NetMQPoller(); 
      routerSocket.ReceiveReady += RouterSocketOnReceiveReady; 
      poller.Add(routerSocket); 
      poller.Run(); 
     } 
     } 
     catch(Exception e) 
     {  
      Console.WriteLine (e); 
     } 

     Console.ReadKey(); 
    } 

    private static void RouterSocketOnReceiveReady(object sender, NetMQSocketEventArgs netMqSocketEventArgs) 
    { 
     NetMQMessage clientMessage = new NetMQMessage(); 
     bool result = netMqSocketEventArgs.Socket.TryReceiveMultipartMessage(new TimeSpan(0, 0, 0, 5), 
      ref clientMessage, 5); 

     if (result == false) 
     { 
      Console.WriteLine ("Something went wrong?!"); 
     } 

     var address = clientMessage[0]; 
     var address2 = clientMessage[1]; 
     var clientMessageString = clientMessage[3].ConvertToString(); 

     //_logger.Debug("Message from client received: '{0}'", clientMessageString); 
     Console.WriteLine (String.Format ("Message from client received: '{0}'", clientMessageString)); 

     netMqSocketEventArgs 
      .Socket.SendMoreFrame(address.Buffer) 
      .SendMoreFrame(address2.Buffer) 
      .SendMoreFrameEmpty() 
      .SendFrame("I have received your message"); 
    } 
} 

누구가 어떤 생각을 가지고

다음
public class CollectorDevice : IDisposable 
{ 
    private NetMQPoller _poller; 
    private RouterSocket _frontendSocket; 
    private DealerSocket _backendSocket; 
    private readonly string _backEndAddress; 
    private readonly string _frontEndAddress; 
    private readonly int _expectedFrameCount; 
    private readonly ManualResetEvent _startSemaphore = new ManualResetEvent(false); 
    private readonly Thread _localThread; 
    private static Logger _logger = LogManager.GetCurrentClassLogger(); 

    /// <summary> 
    /// Constructor 
    /// </summary> 
    /// <param name="backEndAddress"></param> 
    /// <param name="frontEndAddress"></param> 
    /// <param name="expectedFrameCount"></param> 
    public CollectorDevice(string backEndAddress, string frontEndAddress, int expectedFrameCount) 
    { 
     _expectedFrameCount = expectedFrameCount; 

     _backEndAddress = backEndAddress; 
     _frontEndAddress = frontEndAddress; 

     _localThread = new Thread(DoWork) { Name = "IPC Collector Device Thread" }; 
    } 

    public void Start() 
    { 
     _localThread.Start(); 
     _startSemaphore.WaitOne(); 


    } 

    public void Stop() 
    { 
     _poller.Stop(); 
    } 

    #region Implementation of IDisposable 

    public void Dispose() 
    { 
     Stop(); 
    } 

    #endregion 


    #region Private Methods 
    private void DoWork() 
    { 
     try 
     { 
      using (_poller = new NetMQPoller()) 
      using (_frontendSocket = new RouterSocket(_frontEndAddress)) 
      using (_backendSocket = new DealerSocket(_backEndAddress)) 
      { 
       _backendSocket.ReceiveReady += OnBackEndReady; 
       _frontendSocket.ReceiveReady += OnFrontEndReady; 


       _poller.Add(_frontendSocket); 
       _poller.Add(_backendSocket); 

       _startSemaphore.Set(); 

       _poller.Run(); 
      } 
     } 
     catch (Exception e) 
     { 
      _logger.Error(e); 
     } 
    } 

    private void OnBackEndReady(object sender, NetMQSocketEventArgs e) 
    { 
     NetMQMessage message = _backendSocket.ReceiveMultipartMessage(_expectedFrameCount); 
     _frontendSocket.SendMultipartMessage(message); 
    } 

    private void OnFrontEndReady(object sender, NetMQSocketEventArgs e) 
    { 
     NetMQMessage message = _frontendSocket.ReceiveMultipartMessage(_expectedFrameCount); 
     _backendSocket.SendMultipartMessage(message); 
    } 

    #endregion 
} 

가 클라이언트 쪽 : 여기

중간 상자의 코드?

나는 다른 스레드의 소켓을 사용하고 있다고 생각했기 때문에 ThreadLocal 구조체에 패키지를 만들었지 만 도움이되지 않았습니다. NetMQ와의 통일성에 대한 몇 가지 문제를 읽었으므로 'AsyncIO.ForceDotNet .힘();' 모든 소켓 생성자 호출 전에, 그리고 이것도 도움이되지 않았다. 내 모노를 4.4에서 4.8로 업데이트 한 것보다 여전히 동일하게 보입니다.

작업 사이에 Thread.Sleep (100)이 문제를 해결하고 있는지 확인했습니다. 그것은 이상하다.

+0

여러 스레드에서 소켓을 사용하고있을 가능성이 있습니까? – somdoron

+0

또한 요청 소켓의 코드를 공유 할 수 있습니까? – somdoron

+0

ThreadLocal 구조체에 소켓을 넣어 그 실수를하지 않도록합니다. 원하는 코드를 추가했습니다. 그건 그렇고, 빠른 응답을 주셔서 감사합니다 – bzyku

답변

0

나는 코드를 테스트했는데, 많은 시간이 걸리지 만 결국 서버는 모든 메시지를 받는다.

문제는 스레드의 양입니다. 완료 대기 포트에서 완료해야하는 모든 비동기 작업은 스레드가 100 개일 때 많은 시간이 걸립니다. 다음 코드를 사용하여 NetMQ없이 재생할 수있었습니다.

public static void Main(string[] args) 
    { 
     ManualResetEvent resetEvent = new ManualResetEvent(false); 

     List<Task> tasks = new List<Task>(); 

     for (int i = 0; i < 100; i++) 
     { 
      tasks.Add(Task.Run(() => 
      { 
       resetEvent.WaitOne(); 
      })); 
     } 

     Thread.Sleep(100); 

     Socket listener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); 
     listener.Bind(new IPEndPoint(IPAddress.Any, 5556)); 
     listener.Listen(1); 

     SocketAsyncEventArgs args1 = new SocketAsyncEventArgs(); 
     args1.Completed += (sender, eventArgs) => 
     { 
      Console.WriteLine($"Accepted {args1.SocketError}"); 
      resetEvent.Set(); 
     }; 
     listener.AcceptAsync(args1); 

     SocketAsyncEventArgs args2 = new SocketAsyncEventArgs(); 
     args2.RemoteEndPoint = new IPEndPoint(IPAddress.Loopback, 5556); 
     args2.Completed += (sender, eventArgs) => Console.WriteLine("Connected"); 
     Socket client = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); 
     client.ConnectAsync(args2); 

     Task.WaitAll(tasks.ToArray()); 

     Console.WriteLine("all tasks completed"); 
    } 

약 1 분 정도 소요됩니다. 단 5 개의 스레드만으로 즉시 완료되었습니다.

어쨌든 모노 프로젝트에서는 스레드를 줄이거 나 버그를 재발급 할 수 있습니다.

+0

감사합니다, 나는 propably 웹 서버의 스레드 풀에서 스레드의 최대 숫자를 몇 가지 합리적인 번호로 낮추 것입니다 – bzyku