이것은 아마도 pika를 사용하는만큼 파이썬 콜백에 관한 질문 일 것입니다. RabbitMQ의 대기열에 가입하고 전달 된 메시지의 페이로드를 처리 한 다음 해당 페이로드를 일련의 (디스크) 파일에 쓰는 코드를 개발하려고합니다. 따라서 간단한 "Hello World"예제 인 http://www.rabbitmq.com/tutorials/tutorial-one-python.html을 사용하여 수신 된 메시지 페이로드를 파일에 기록하기 위해 콜백 함수 (공동 호출로 "콜백"이라고 함)에 로직을 추가했습니다.구독 중에 콜백을 추가하여 파일을 쓰는 방법
다음과 같은 주된 문제가 있습니다. 특정 시간이 경과하면 (예 : 300 분 (5 분)), 프로세스가 파일을 닫고 새 파일을 작성하고 이후의 파일을 작성해야하는 추가 코드를 작성하려고합니다. 새 메시지가 수신되었습니다. 그 외에도 ...
하지만이 문제는 메시지가 대기열에 도착할 때만 콜백 함수가 호출된다는 것을 알았습니다. 경과 시간을 측정하는 콜백 함수 외부의 일부 프로세스가 필요하다고 생각합니다.
수신 된 메시지를 포함하는 디스크 파일 세트 (모두 타임 스탬프를 기반으로 고유 한 이름을 가짐)를 만들고 싶습니다. MQ 큐 메시지가 늦어지면 현재 열려있는 파일을 닫습니다 (그래서 더 다운 스트림으로 처리 될 수 있습니다).
시작 소비 통화 (channel.start_consuming)를 발행 한 후 그 아래의 코드에 도달했음을 알 수 있습니다 - 이유는 무엇입니까?
저는 파이썬의 멀티 프로세싱 모듈로 놀았지만 지금까지는 행운이 없습니다.
여기에 의사 코드 주석 일부 골격 코드입니다 : -
#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
print ' [*] Waiting for messages. To exit press CTRL+C'
def callback(ch, method, properties, body):
print " [x] Received %r" % (body,)
# want to put code here to write message payloads to a file (unique name)
# if n secs have elapsed then close the file and create a new file
channel.basic_consume(callback,queue='hello',no_ack=True)
channel.start_consuming()
감사합니다!