2017-05-14 8 views
0

kombu를 사용하여 토끼와 함께 사용할 rabbitmq's direct reply-to feature을 사용하고 싶습니다. kombu로 구현할 방법을 찾을 수 없습니다. .rabbitmq 용 kombu로 RPC 클라이언트 구현 및 직접 회신 기능

클라이언트는 메시지를 만들기 전에 'amq.rabbitmq.reply-to'대기열에서 소비해야하며 제작자와 고객은 동일한 채널을 사용해야합니다. 클라이언트가 스레드 환경에서 작성되기 때문에 생성자 풀 (또는 일종의 연결 풀)도 사용해야합니다.

지금까지이 코드를 가지고 있는데, rabbitmq는 PRECONDITION 오류가있는 프로듀서에 대해서는 불평하지 않습니다 (소비자 부분을 제거하면 불평합니다).하지만 제작자는 아무 것도 생산하지 않습니다!

와이어 샤크에서 가끔 rabbitmq는 예외를 발생하지 않을 것이다 전제 조건 오류하지만 다시마로 응답, 내가 왜 모르는 것을 깨달았다의 도움으로
class KombuRpcClient(RpcClientBase): 
    def __init__(self, params): 
     self.future = Queue.Queue() 
     self.logger = logger 
     if isinstance(params, RpcConnectionProperties): 
      self.rpc_connection_properties = params 
     else: 
      self.rpc_connection_properties = RpcConnectionProperties(
       host=params.get('host'), 
       port=5672, 
       username=params.get('username'), 
       password=params.get('password'), 
       vhost=params.get('vhost') if params.has_key('vhost') else '/' 
      ) 
     self.amqp_url = self.rpc_connection_properties.get_kombu_transport_url('pyamqp') 
     self.reply_queue = KombuQueue('direct_reply', exchange=default_exchange, routing_key='amq.rabbitmq.reply-to') 

    def call(self, exchange, key, msg, no_response=False, timeout=5): 
     connection = Connection(self.amqp_url) 
     if exchange is not None: 
      key = exchange + ':' + key 
     with producers_pool[connection].acquire(block=True) as producer: 
      with producer.channel.Consumer(queues=[self.reply_queue], no_ack=True, callbacks=[self._on_message], 
              accept=['ujson']) as consumer: 
       producer.publish(
        msg, 
        exchange=default_exchange, 
        routing_key=key, 
        immediate=True, 
        serializer='ujson', reply_to=self.reply_queue.routing_key) 
       consumer.consume() 
       pass 
     res = self.future.get(block=True, timeout=timeout) 
     print res 

    def cast(self, exchange, key, msg): 
     pass 

    def _on_message(self, body, message): 
     print body 
     self.future.put(body) 

답변

0

! 어쨌든,이 코드는 지금 노력하고 있습니다 : 그래서 호환성

의 이름을 유지했다 변수 이름에 대한 죄송합니다, 이것은 오래된 스톰 RPC 클라이언트를 대체 할 예정이다 : BTW

class KombuRpcClient(RpcClientBase): 
    def __init__(self, params): 
     self.future = Queue.Queue() 
     self.logger = logger 
     if isinstance(params, RpcConnectionProperties): 
      self.rpc_connection_properties = params 
     else: 
      self.rpc_connection_properties = RpcConnectionProperties(
       host=params.get('host'), 
       port=5672, 
       username=params.get('username'), 
       password=params.get('password'), 
       vhost=params.get('vhost') if params.has_key('vhost') else '/' 
      ) 
     self.amqp_url = self.rpc_connection_properties.get_kombu_transport_url('pyamqp') 
     self.reply_queue = KombuQueue('amq.rabbitmq.reply-to', exchange=default_exchange, routing_key='amq.rabbitmq.reply-to') 

    def call(self, exchange, key, msg, no_response=False, timeout=5): 
     connection = Connection(self.amqp_url) 
     if exchange is not None: 
      key = exchange + ':' + key 

     with producers_pool[connection].acquire(block=True) as producer: 
      consumer = producer.channel.Consumer(queues=[self.reply_queue], no_ack=True, auto_declare=True, 
           callbacks=[self._on_message], accept=['ujson']) 
      consumer.consume(no_ack=True) 
      producer.publish(msg, 
          serializer='ujson', 
          exchange=default_exchange, 
          routing_key=key, 
          reply_to='amq.rabbitmq.reply-to') 
      consumer.connection.drain_events() 
      res = self.future.get(block=True, timeout=timeout) 
      response = Response() 
      response.body = res 
      return res 

    def cast(self, exchange, key, msg): 
     connection = Connection(self.amqp_url) 
     if exchange is not None: 
      key = exchange + ':' + key 
     with producers_pool[connection].acquire(block=True) as producer: 
      producer.publish(msg, 
          serializer='ujson', 
          exchange=default_exchange, 
          routing_key=key) 

    def _on_message(self, body, message): 
     print body 
     self.future.put(body)