2013-03-26 2 views
3

필자는 전화를 걸었을 때 비틀린 반응기 내부에서 일부 병렬 코드의 실행을 간단하게 시작하는 셀러리 작업을 가지고 있습니다. 다음은 몇 가지 샘플입니다 (실행 가능하지 않음) 코드를 설명하기 :원자로에서 병렬 코드로 셀러리 작업을 ACK하는 방법은 무엇입니까?

def run_task_in_reactor(): 
    # this takes a while to run 
    do_something() 
    do_something_more() 


@celery.task 
def run_task(): 
    print "Started reactor" 
    reactor.callFromThread(run_task_in_reactor) 

(간결함을 위해 작업이 작업자에 의해 수신 될 때 원자로가 이미 실행되고 있다고 가정하십시오, 나는 시작 신호 @worker_process_init.connect를 사용 내 작업자가 오자마자 다른 스레드의 원자로)

run_task.delay()으로 전화하면 작업이 꽤 빨리 끝납니다 (run_task_in_reactor()이 완료 될 때까지 기다리지 않고 반응기에서 실행 만 예약 함). 그리고 run_task_in_reactor()이 마침내 실행될 때 do_something() 또는 do_something_more()이 예외를 throw 할 수 있습니다.

내 대기열에서 사용하려면 pika을 사용하여 do_something_more() 내부의 ACK를 사용하여 작업자가 작업을 올바르게 완료하도록 할 수 있습니다. 그러나 셀러리 내부에서는 이것이 불가능한 것 같습니다 (또는 적어도 동일한 효과를 얻는 방법을 모르겠습니다).

또한 반응기를 제거 할 수 없습니다. - 파티 용 코드를 사용하고 있습니다. 동일한 결과를 얻는 다른 방법도 잘 알려져 있습니다.

답변

0

대신 reactor.blockingCallFromThread을 사용하십시오.

+0

질문의 코드를 포함하여 어디서 어떻게 사용하는지 설명하십시오. –

+0

@MauricioGracia 질문의 코드에서'reactor.callFromThread' 대신'reactor.blockingCallFromThread' 만 사용하십시오. –