2015-02-04 3 views
1

은 다음 섹션 인스턴스화 (http://celery.readthedocs.org/en/latest/userguide/tasks.html#custom-task-classes)가 적혀있다 :셀러리 - 작업/프로세스 당 하나의 인스턴스 만 있습니까? 셀러리의 문서에서

작업은 모든 요청에 ​​대해 인스턴스화되지 않지만, 글로벌 인스턴스로 작업 레지스트리에 등록됩니다.

이것은 init 생성자가 프로세스마다 한 번만 호출되고 작업 클래스는 의미 론적으로 액터에 더 가깝다는 것을 의미합니다. 나는 다음과 같은 예제를 실행하면

그럼에도 불구하고, 나는 초기화 방법은 적어도 3 번 호출되는 것을 알 수있다. 설정에서 무엇이 잘못 되었습니까? CELERYD_CONCURRENCY = 1은 작업자 당 프로세스가 하나만 있는지 확인해야합니다.

$ celery -A proj beat

celery beat v3.1.17 (Cipater) is starting. 
init Task1 
40878160 
x=1.0 
init Task1 
40878352 
x=1.0 
init Task1 
40879312 
x=1.0 
__ - ... __ -  _ 
Configuration -> 
    . broker -> amqp://guest:**@localhost:5672// 
    . loader -> celery.loaders.app.AppLoader 
    . scheduler -> celery.beat.PersistentScheduler 
    . db -> celerybeat-schedule 
    . logfile -> [stderr]@%INFO 
    . maxinterval -> now (0s) 
[2015-02-05 23:05:21,875: INFO/MainProcess] beat: Starting... 
[2015-02-05 23:05:21,971: INFO/MainProcess] Scheduler: Sending due task task1-every-5-seconds (proj.tasks.t1) 
[2015-02-05 23:05:26,972: INFO/MainProcess] Scheduler: Sending due task task1-every-5-seconds (proj.tasks.t1) 

celery.py :

from __future__ import absolute_import 
from datetime import timedelta 
from celery import Celery 

app = Celery('proj', 
      broker='amqp://[email protected]//', 
      backend='amqp://', 
      include=['proj.tasks']) 
app.conf.update(
    CELERY_REDIRECT_STDOUTS=True, 
    CELERY_TASK_RESULT_EXPIRES=60, 
    CELERYD_CONCURRENCY = 1, 
    CELERYBEAT_SCHEDULE = { 
     'task1-every-5-seconds': { 
      'task': 'proj.tasks.t1', 
      'schedule': timedelta(seconds=5) 
      }, 
     }, 
    CELERY_TIMEZONE = 'GMT', 
) 

if __name__ == '__main__': 
    app.start() 

tasks.py :

from __future__ import absolute_import 
from proj.celery import app 
from celery import Task 
import time 

class Foo(): 
    def __init__(self, x): 
     self.x = x 

class Task1(Task): 
    abstract = True 
    def __init__(self): 
     print "init Task1" 
     print id(self) 
     self.f = Foo(1.0) 
     print "x=1.0" 

@app.task(base=Task1) 
def t1(): 
    t1.f.x +=1 
    print t1.f.x 
+0

'__init__'을 (를) 한 번 호출하려는 이유를 설명하십시오. 주변에는 다른 방법이있을 수 있습니다. '__init__'는 생성자가 아니며 인스턴스의 시작 자입니다. 똑같은 작업 인스턴스가'__init__'을 여러 번 호출하도록하려면'id (self)'를 출력하십시오. –

+0

@KrzysztofSzularz '__init__'호출시 소켓과의 연결을 열고이 작업은 인스턴스 당 한 번만 수행되어야합니다. 수표를 넣어야합니까 (예 : 연결이 이미 열려 있거나 아무것도하지 않는 경우) '__new__'에 추가해야합니까? 나는'__init__'을 사용하고 있었는데 왜냐하면 문서에서는 한 번만 호출되기 때문입니다. 인쇄 ID (자체)를 확인하고 곧 질문을 업데이트 할 것입니다. 감사합니다. – b3rtz

+0

@KrzysztofSzularz'__init__ '을 호출하는 동안 id (self)를 편집하여 클래스의 세 가지 인스턴스가 있습니다. 단지 1이 없어야합니까? 나는 약간의 설명서와 설정과 혼동 스럽다. 이견있는 사람? – b3rtz

답변

0

그래서, 당신의 의견에 따라, 당신이 스레드 당 하나의 연결을 유지해야합니다.

왜 스레드 저장소를 사용하지 않을까요? 귀하의 경우에는 안전한 솔루션이되어야합니다.

from threading import local 

thread_storage = local() 

def get_or_create_conntection(*args, **kwargs): 
    if not hasattr(thread_storage, 'connection'): 
     thread_storage.connection = Connection(*args, **kwargs) 
    return thread_storage.connection 

@app.task() 
def do_stuff(): 
    connection = get_or_create_connection('some', connection='args') 
    connection.ping() 
+0

굉장! 고마워요. 정말 좋은 생각이야. – b3rtz