2017-11-08 5 views
0

RabbitMQ에 EventingBasicConsumer의 수신 이벤트 처리기를 추가했습니다. 대기열이 소비되었는지 (처리되고 비어 있음) 확인해야하며 소비자 및 연결을 닫아야합니다. 대기열이 처리되는지 여부를 알 수있는 조건을 찾을 수 없습니다.래빗 MQ 고객 이벤트 중지 큐가 비어있는 경우

public void ProcessQueue(string queueName, Func<string, bool> ProcessMessage) 
    { 
     //lock (this.Model) 
     { 
      this.Model.BasicQos(0, 1, false); 
      EventingBasicConsumer consumer = new EventingBasicConsumer(this.Model); 

      consumer.Received += (model, ea) => 
      { 
       var body = ea.Body; 
       var message = Encoding.UTF8.GetString(body); 
       bool processed = ProcessMessage.Invoke(message); 
       if (processed) 
        this.SendAcknowledgement(ea.DeliveryTag); 
       else 
        this.StopProcessingQueue(consumer.ConsumerTag); 

       // Check if no message for next 2 minutes, 
       //  Stop Consumer and close connection 

      }; 

      this.Model.BasicConsume(queue: queueName, 
          autoAck: false, 
          consumer: consumer); 
     } 
    } 
+1

우리는 큐 길이를 모니터링하고 자동으로 회전 또는 소비자의 수 아래로 응용 프로그램을 . 어쩌면 당신은 그런 것을 원할 것입니까? https://stackoverflow.com/questions/1038318/check-rabbitmq-queue-size-from-client –

답변

0

내가 모든 속성 중 하나 정도 찾을 수 없습니다 도와주세요는 메시지가 수신 경과 시간이 더 이분보다 당신이를 해고 할 수있는 경우에 때마다 초기화됩니다 타이머를 구현했다 소비자를 막고 연결을 닫을 수있는 정리 방법

+0

올바른, 지금 다른 방식으로 구현했습니다. –

0

대기열이 비어있는 경우 Rabbit MQ 소비자 이벤트를 중지 할 방법을 찾지 못했기 때문에 API의 메시지 수를 전달하여 메시지를 처리하기 위해 아래 메소드를 구현했습니다.

"로컬 호스트 :/API/큐"다음

는 큐가 비어있을 때까지 메시지를 처리하는 기능입니다

/// <summary> 
/// (Recommanded) Processes the queue till the number of messages provided. 
/// Added to manage the load (process batches by batches) 
/// </summary> 
/// <param name="queueName">Name of the queue.</param> 
/// <param name="ProcessMessage">The process message.</param> 
/// <param name="count">The count.</param> 
public uint ProcessQueueByMessageCount(string queueName, Func<string, bool> HowToProcessMessage, uint messageCount) 
{ 
    uint messagesToProcess = messageCount; 
    using (var connect = this) 
    { 
     while (messageCount > 0) 
     { 
      BasicGetResult result = connect.Model.BasicGet(queueName, false); 
      bool processed = HowToProcessMessage.Invoke(System.Text.Encoding.UTF8.GetString(result.Body)); 
      if (processed) 
      { 
       this.SendAcknowledgement(result.DeliveryTag); 
       messageCount--; 
      } 
      else 
      { 
       connect.Model.BasicNack(result.DeliveryTag, false, true); 
       break; 
      } 
     } 
    } 
    return messagesToProcess - messageCount; 
}