2013-06-21 10 views
0

이것은 아마도 pika를 사용하는만큼 파이썬 콜백에 관한 질문 일 것입니다. RabbitMQ의 대기열에 가입하고 전달 된 메시지의 페이로드를 처리 한 다음 해당 페이로드를 일련의 (디스크) 파일에 쓰는 코드를 개발하려고합니다. 따라서 간단한 "Hello World"예제 인 http://www.rabbitmq.com/tutorials/tutorial-one-python.html을 사용하여 수신 된 메시지 페이로드를 파일에 기록하기 위해 콜백 함수 (공동 호출로 "콜백"이라고 함)에 로직을 추가했습니다.구독 중에 콜백을 추가하여 파일을 쓰는 방법

다음과 같은 주된 문제가 있습니다. 특정 시간이 경과하면 (예 : 300 분 (5 분)), 프로세스가 파일을 닫고 새 파일을 작성하고 이후의 파일을 작성해야하는 추가 코드를 작성하려고합니다. 새 메시지가 수신되었습니다. 그 외에도 ...

하지만이 문제는 메시지가 대기열에 도착할 때만 콜백 함수가 호출된다는 것을 알았습니다. 경과 시간을 측정하는 콜백 함수 외부의 일부 프로세스가 필요하다고 생각합니다.

수신 된 메시지를 포함하는 디스크 파일 세트 (모두 타임 스탬프를 기반으로 고유 한 이름을 가짐)를 만들고 싶습니다. MQ 큐 메시지가 늦어지면 현재 열려있는 파일을 닫습니다 (그래서 더 다운 스트림으로 처리 될 수 있습니다).

시작 소비 통화 (channel.start_consuming)를 발행 한 후 그 아래의 코드에 도달했음을 알 수 있습니다 - 이유는 무엇입니까?

저는 파이썬의 멀티 프로세싱 모듈로 놀았지만 지금까지는 행운이 없습니다.

여기에 의사 코드 주석 일부 골격 코드입니다 : -

#!/usr/bin/env python 
import pika 

connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost')) 
channel = connection.channel() 

channel.queue_declare(queue='hello') 

print ' [*] Waiting for messages. To exit press CTRL+C' 

def callback(ch, method, properties, body): 
    print " [x] Received %r" % (body,) 

    # want to put code here to write message payloads to a file (unique name) 
    # if n secs have elapsed then close the file and create a new file 

channel.basic_consume(callback,queue='hello',no_ack=True) 

channel.start_consuming() 

감사합니다!

답변

0

Pika의 대체 구현을 살펴 보는 것이 좋습니다. 피카 (Pika)는 본질적으로 차단하고 있기 때문에 이와 같은 것을 만드는 것은 어렵습니다. 마지막 5 분 안에 아무 것도 기록되지 않았는지 확인하려면 IO를보기 위해 본질적으로 다른 스레드가 필요합니다.

시간 소인을 유지할 수 있으며 충분한 시간이 경과하면 새 콜백을 받으면 파일을 닫고 새 파일을 만들 수 있습니다. 그러나 이렇게하면 더 긴 기간 동안 파일을 열어 두지 만 데이터가 5 분을 초과하지 않도록 할 수 있습니다.

그러나 Puka을 대신 보시기 바랍니다. Pika에 대한 비 차단 대안으로 문제에 대한 솔루션을보다 쉽게 ​​구현할 수 있습니다.