2017-10-03 16 views
0

은 내가 RabbitMQ 소비자로 새앙 토끼 트위스트 연결을 사용하고, 여기에 내 코드입니다 :pika를 위해 꼬인 연결로 소비자를 다시 연결하는 방법은 무엇입니까?

@defer.inlineCallbacks 
def run(connection): 
    queue_name = 'aaa' 
    channel = yield connection.channel() 
    queue = yield channel.queue_declare(queue=queue_name, auto_delete=False, exclusive=False) 
    yield channel.queue_bind(exchange='amq.direct',queue=queue_name,routing_key=queue_name) 
    yield channel.basic_qos(prefetch_count=1) 
    queue_object, consumer_tag = yield channel.basic_consume(queue=queue_name,no_ack=False) 
    logger.info('[room server]start consume queue %s', queue_name) 

    l = task.LoopingCall(read, queue_object) 
    l.start(0.1) 


@defer.inlineCallbacks 
def read(queue_object): 
    ch,method,properties,body = yield queue_object.get() 
    try: 
     data = json.loads(body) 
     head_code = data['head_code'] 
     openid = data['openid'] 
     message_content = data['message_content'] 
     conn_id = -1 
     try: 
      conn_id = data['conn_id'] 
     except: 
      pass 
     message_dispatcher(head_code, openid, message_content, conn_id) 
     yield ch.basic_ack(delivery_tag=method.delivery_tag) 
    except ValueError as e: 
     logger.error('[error!]error body %s' % body) 
     yield ch.basic_ack(delivery_tag=method.delivery_tag) 

credentials = pika.PlainCredentials(config.RABBITMQ_USERNAME, config.RABBITMQ_PASSWD) 
parameters = pika.ConnectionParameters(credentials=credentials) 
cc = protocol.ClientCreator(reactor, twisted_connection.TwistedProtocolConnection, parameters) 

def got_error(failure, d): 
    logger.error(failure) 
    d = cc.connectTCP(config.RABBITMQ_HOST, config.RABBITMQ_PORT) 


def start(): 
    d = cc.connectTCP(config.RABBITMQ_HOST, config.RABBITMQ_PORT) 
    d.addCallback(lambda protocol: protocol.ready) 
    d.addCallback(run) 
    d.addErrback(got_error, d) 

연결 나누기, 재 연결 프로세스가 작동하지 않을 때 내 문제는 다음과 같습니다 enter image description here

어떻게 만들 재 연결 작업?

답변

0

TwistedProtocolConnection docstring에 따르면, on_close_callback 기능을 제공하여 연결 종료를 처리 할 수 ​​있습니다. 이 함수에서 reason_code and reason_text은 args 여야합니다. 그래서 당신은 연결 단자를 처리하는 또 다른 ON_CLOSE 콜백을 생성하고이 일어난 이유는 다음 RabbitMQ에 연결하는 데 필요한 로직을 실행하면 다음 ClientCreator 코드의 예를 따라야한다, 대신에 그 코드가 있으면

def connection_termination(reason_code, reason_text): 
    """ 
    Log the reasons why the connection terminates and then reconnect 
    """ 
    # put your connection code here 
    # incrementally space out your reconnections, eg. 2 seconds, if fail, 5 seconds, if fail 10 seconds, etc... 

아래 :

cc = protocol.ClientCreator(reactor, twisted_connection.TwistedProtocolConnection, parameters, connection_termination) 

불행히도 나는 이것을 시험 할 수 없지만 작동 할 것입니다. 당신은 이미 대부분의 논리를 가지고 있으므로 나머지는 운동으로 남겨 둘 것입니다.) 문제가 있다면 의견을 말하십시오. 해결책이 있다면 다른 사람들에게 최종 결과를 보여주십시오. 시작 D = cc.connectTCP (config.RABBITMQ_HOST, config.RABBITMQ_PORT) 파일 "/Library/Python/2.7/site-packages/twisted/internet/protocol.py"에서 :

+0

나는 코드 만 예외가 발생했습니다 , 줄 292 in connectTCP bindAddress = bindAddress) 파일 "/Library/Python/2.7/site-packages/twisted/internet/protocol.py", 줄 274, _connect에 있음 self.reactor, self.protocolClass (* self .args, ** self.kwargs), d) TypeError : __init __()은 정확히 2 개의 인수 (주어진 3 개)를 취합니다. –