2012-03-20 4 views
5

주제 교환을 설정하려면 내 구성이 어떻게되어 있는지 혼동 스럽습니다. 다음Celery와 RabbitMQ를 이용한 주제 교환

Task1 -> send to QueueOne and QueueFirehose 
Task2 -> sent to QueueTwo and QueueFirehose 

을 :

http://www.rabbitmq.com/tutorials/tutorial-five-python.html

이것은 내가 달성하고 싶은 것입니다

Task1 -> consume from QueueOne 
Task2 -> consume from QueueTwo 
TaskFirehose -> consume from QueueFirehose 

가 난 단지 작업 1이 QueueTwo에서 소비 QueueOne 및 Task2에서 소비하고 싶다.

이제 Task1과 2가 실행될 때 QueueFirehose가 고갈되고 TaskFirehose 작업이 실행되지 않습니다.

내 설정에 문제가 있습니까? 아니면 내가 잘못 이해하고 있습니까?

Task1 -> send to QueueOne 
Task2 -> sent to QueueTwo 
TaskFirehose -> send to QueueFirehose 

다음 :

Worker1 -> consume from QueueOne, QueueFirehose 
Worker2 -> consume from QueueTwo, QueueFirehose 
WorkerFirehose -> consume from QueueFirehose 

이것은 당신이 무엇을 의미하는지 정확히 일치하지 않을 수 있습니다,하지만 난 그것을 많은 시나리오를 커버해야한다고 생각하고

CELERY_QUEUES = { 
    "QueueOne": { 
     "exchange_type": "topic", 
     "binding_key": "pipeline.one", 
    }, 
    "QueueTwo": { 
     "exchange_type": "topic", 
     "binding_key": "pipeline.two", 
    }, 
    "QueueFirehose": { 
     "exchange_type": "topic", 
     "binding_key": "pipeline.#", 
    }, 
} 

CELERY_ROUTES = { 
     "tasks.task1": { 
      "queue": 'QueueOne', 
      "routing_key": 'pipeline.one', 
     }, 
     "tasks.task2": { 
      "queue": 'QueueTwo', 
      "routing_key": 'pipeline.two', 
     }, 
     "tasks.firehose": { 
      'queue': 'QueueFirehose', 
      "routing_key": 'pipeline.#', 
     }, 
} 
+0

어쩌면 이것은 단지 용어 일 뿐이지 만 사용자가 작업과 직원이 분쟁중인 것처럼 들릴 수 있습니다. 예를 들어, "Task2가 Queue2로 보낸"이라고 말한 다음 나중에 "Task2를 사용하여 Queue2에서 소비"라고 말합니다. 작업은 사용하지 않습니다. 그들은 (노동자들에 의해) 소비된다. 또한 "TaskFirehose 작업을 실행하지 마십시오"라고 말하지만 설명에 따르면 TaskFirehose가 대기열로 보내지지 않습니다. 기본 개념은 다음과 같습니다. 작업자는 할당 된 대기열에서 작업을 실행합니다. 작업! = 그들을 실행하는 작업자. –

답변

0

실제로 같은 것을 의미한다고 가정 잘하면 당신도. 이 같은 뭔가 작업을해야합니다 :

# Advanced example starting 10 workers in the background: 
# * Three of the workers processes the images and video queue 
# * Two of the workers processes the data queue with loglevel DEBUG 
# * the rest processes the default' queue. 

$ celery multi start 10 -l INFO -Q:1-3 images,video -Q:4,5 data 
-Q default -L:4,5 DEBUG 

더 많은 옵션과 참고 : http://celery.readthedocs.org/en/latest/reference/celery.bin.multi.html

는이 문서에서 바로했다.

나는 비슷한 상황을 겪었고, 나는 약간 다른 방식으로 그것을 다루었 다. 나는 supervisord와 함께 셀러리 멀티를 사용할 수 없었다. 대신에 저는 각 근로자에 ​​대한 감독자로 여러 프로그램을 만들었습니다. 어쨌든 직원들은 서로 다른 프로세스를 사용하게되므로 감독자가 모든 것을 처리하도록하십시오. worker2에와 WorkerFirehose에 대해 확인하기 위해 해당 행을 편집,

; ================================== 
; celery worker supervisor example 
; ================================== 

[program:Worker1] 
; Set full path to celery program if using virtualenv 
command=celery worker -A proj --loglevel=INFO -Q QueueOne, QueueFirehose 

directory=/path/to/project 
user=nobody 
numprocs=1 
stdout_logfile=/var/log/celery/worker1.log 
stderr_logfile=/var/log/celery/worker1.log 
autostart=true 
autorestart=true 
startsecs=10 

; Need to wait for currently executing tasks to finish at shutdown. 
; Increase this if you have very long running tasks. 
stopwaitsecs = 600 

; When resorting to send SIGKILL to the program to terminate it 
; send SIGKILL to its whole process group instead, 
; taking care of its children as well. 
killasgroup=true 

; if rabbitmq is supervised, set its priority higher 
; so it starts first 
priority=998 

마찬가지로 :

[program:Worker2] 
; Set full path to celery program if using virtualenv 
command=celery worker -A proj --loglevel=INFO -Q QueueTwo, QueueFirehose 

[program:WorkerFirehose] 
; Set full path to celery program if using virtualenv 
command=celery worker -A proj --loglevel=INFO -Q QueueFirehose 

이 supervisord에 모두 포함 - : 설정 파일이 같이 보입니다 .conf 파일을 작성해야합니다.