필자는 전화를 걸었을 때 비틀린 반응기 내부에서 일부 병렬 코드의 실행을 간단하게 시작하는 셀러리 작업을 가지고 있습니다. 다음은 몇 가지 샘플입니다 (실행 가능하지 않음) 코드를 설명하기 :원자로에서 병렬 코드로 셀러리 작업을 ACK하는 방법은 무엇입니까?
이def run_task_in_reactor():
# this takes a while to run
do_something()
do_something_more()
@celery.task
def run_task():
print "Started reactor"
reactor.callFromThread(run_task_in_reactor)
(간결함을 위해 작업이 작업자에 의해 수신 될 때 원자로가 이미 실행되고 있다고 가정하십시오, 나는 시작 신호 @worker_process_init.connect
를 사용 내 작업자가 오자마자 다른 스레드의 원자로)
run_task.delay()
으로 전화하면 작업이 꽤 빨리 끝납니다 (run_task_in_reactor()
이 완료 될 때까지 기다리지 않고 반응기에서 실행 만 예약 함). 그리고 run_task_in_reactor()
이 마침내 실행될 때 do_something()
또는 do_something_more()
이 예외를 throw 할 수 있습니다.
내 대기열에서 사용하려면 pika
을 사용하여 do_something_more()
내부의 ACK를 사용하여 작업자가 작업을 올바르게 완료하도록 할 수 있습니다. 그러나 셀러리 내부에서는 이것이 불가능한 것 같습니다 (또는 적어도 동일한 효과를 얻는 방법을 모르겠습니다).
또한 반응기를 제거 할 수 없습니다. - 파티 용 코드를 사용하고 있습니다. 동일한 결과를 얻는 다른 방법도 잘 알려져 있습니다.
질문의 코드를 포함하여 어디서 어떻게 사용하는지 설명하십시오. –
@MauricioGracia 질문의 코드에서'reactor.callFromThread' 대신'reactor.blockingCallFromThread' 만 사용하십시오. –