2017-11-13 57 views
3

프로젝트 설정에서 사전 목록 (django.conf.settings을 통해 가져옴)을 기반으로 정기적으로 샐러리 작업을 동적으로 추가하는 모듈을 작성했습니다. 나는 함수 add_tasks 일정 기능을 사용하는 것은 설정에서 주어진 특정 uuid 호출 할 것을 수행Celery add_periodic_task blocks 장고가 uwsgi 환경에서 실행 중임

:

def add_tasks(celery): 
    for new_task in settings.NEW_TASKS: 
     celery.add_periodic_task(
      new_task['interval'], 
      my_task.s(new_task['uuid']), 
      name='My Task %s' % new_task['uuid'], 
     ) 

처럼 내가 내 celery.py에서 함수를 호출 할 on_after_configure.connect 신호를 사용 here 제안

app = Celery('my_app') 

@app.on_after_configure.connect 
def setup_periodic_tasks(celery, **kwargs): 
    from add_tasks_module import add_tasks 
    add_tasks(celery) 

이 설정은 모두 celery beatcelery worker을 위해 잘 작동하지만 난 내 장고 응용 프로그램을 제공하기 위해 uwsgi를 사용하여 내 설치를 나누기. Uwsgi은보기 코드가 셀러리의 .delay() 메소드를 사용하여 작업을 처음 전송할 때까지 원활하게 실행됩니다. 그 시점에서 그것은 셀러리가 uwsgi으로 초기화되었지만 위의 코드에서 영원히 차단 된 것처럼 보입니다. 내가 다음 명령 줄에서 수동으로 실행하는 경우 차단, 나는 다음 (단축) 스택 추적을 얻을 때 중단 : 뮤텍스를 획득에 문제가있는 것처럼

Traceback (most recent call last): 
    File "/usr/local/lib/python3.6/site-packages/kombu/utils/objects.py", line 42, in __get__ 
    return obj.__dict__[self.__name__] 
KeyError: 'tasks' 

During handling of the above exception, another exception occurred: 

Traceback (most recent call last): 
    File "/usr/local/lib/python3.6/site-packages/kombu/utils/objects.py", line 42, in __get__ 
    return obj.__dict__[self.__name__] 
KeyError: 'data' 

During handling of the above exception, another exception occurred: 

Traceback (most recent call last): 
    File "/usr/local/lib/python3.6/site-packages/kombu/utils/objects.py", line 42, in __get__ 
    return obj.__dict__[self.__name__] 
KeyError: 'tasks' 

During handling of the above exception, another exception occurred: 
Traceback (most recent call last): 

    (SHORTENED HERE. Just contained the trace from the console through my call to this function) 

    File "/opt/my_app/add_tasks_module/__init__.py", line 42, in add_tasks 
    my_task.s(new_task['uuid']), 
    File "/usr/local/lib/python3.6/site-packages/celery/local.py", line 146, in __getattr__ 
    return getattr(self._get_current_object(), name) 
    File "/usr/local/lib/python3.6/site-packages/celery/local.py", line 109, in _get_current_object 
    return loc(*self.__args, **self.__kwargs) 
    File "/usr/local/lib/python3.6/site-packages/celery/app/__init__.py", line 72, in task_by_cons 
    return app.tasks[ 
    File "/usr/local/lib/python3.6/site-packages/kombu/utils/objects.py", line 44, in __get__ 
    value = obj.__dict__[self.__name__] = self.__get(obj) 
    File "/usr/local/lib/python3.6/site-packages/celery/app/base.py", line 1228, in tasks 
    self.finalize(auto=True) 
    File "/usr/local/lib/python3.6/site-packages/celery/app/base.py", line 507, in finalize 
    with self._finalize_mutex: 

것 같다.

은 현재 내가 sys.argv[0]uwsgi가 포함되어있는 경우 감지하는 해결 방법을 사용하고 있습니다 만 beat이 작업을 필요로 다음,주기적인 작업을 추가하지,하지만 난 더 이상 이러한 문제를 해결하기 위해 잘못된 여기에 무슨 일이 일어나고 있는지 이해하고 싶습니다.

이 문제는 하나의 스레드/프로세스가 다른 필요가있는 뮤텍스를 보유하고있는 uwsgi 멀티 스레드 또는 다중 프로세스를 사용하는 것과 관련이 있습니까?

문제를 해결하는 데 도움이되는 힌트를 주시면 감사하겠습니다. 고맙습니다.

내가 사용하고 있습니다 : 장고 1.11.7와 셀러리를 4.1.0

나는이 문제에 대한 최소한의 설정을 만든 1

편집 :

celery.py :

import os 
from celery import Celery 
from django.conf import settings 
from myapp.tasks import my_task 

# set the default Django settings module for the 'celery' program. 
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'my_app.settings') 

app = Celery('my_app') 

@app.on_after_configure.connect 
def setup_periodic_tasks(sender, **kwargs): 
    sender.add_periodic_task(
     60, 
     my_task.s(), 
     name='Testtask' 
    ) 

app.config_from_object('django.conf:settings', namespace='CELERY') 
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS) 

tasks.py :

from celery import shared_task 
@shared_task() 
def my_task(): 
    print('ran') 

CELERY_TASK_ALWAYS_EAGER = False이고 작동중인 메시지 대기열이 있는지 확인하십시오.

실행 :

./manage.py shell -c 'from myapp.tasks import my_task; my_task.delay()' 

위의 오류를 볼 중단까지 약 10 초 정도 기다립니다.

+1

왜 장고 pr에서 초기화할까요? 대신에 헌신적 인 프로세스를 갖는 것입니까? –

+0

전용 셀레 리 비트 프로세스가 있지만 장고 프로세스 내에서'.delay() '를 호출해야합니다. 그것이 바로 코드 블록입니다. – Tim

+0

그러면 add_tasks가 장고 프로세스에서 호출되지 않습니까? –

답변

0

당신이 그 신호 @app.on_after_finalize.connect를 시도해 수 :

프로젝트 celery==4.1.0, Django==2.0, django-celery-beat==1.1.0

@app.on_after_finalize.connect 
def setup_periodic_tasks(sender, **kwargs): 
    """ setup of periodic task :py:func:shopify_data_fetcher.celery.fetch_shopify 
    based on the schedule defined in: settings.CELERY_BEAT_SCHEDULE 
    """ 
    for task_name, task_config in settings.CELERY_BEAT_SCHEDULE.items(): 
     sender.add_periodic_task(
      task_config['schedule'], 
      fetch_shopify.s(**task_config['kwargs']['resource_name']), 
      name=task_name 
     ) 

조각 CELERY_BEAT_SCHEDULEdjango-celery-results==1.0.1 작업에서 일부 빠른 조각 : 그래서

CELERY_BEAT_SCHEDULE = { 
    'fetch_shopify_orders': { 
     'task': 'shopify.tasks.fetch_shopify', 
     'schedule': crontab(hour="*/3", minute=0), 
     'kwargs': { 
      'resource_name': shopify_constants.SHOPIFY_API_RESOURCES_ORDERS 
     } 
    } 
} 
+0

답변 해 주셔서 감사합니다. 나는 이것을 시도했지만 도움이되지 않았지만 올바른 방향으로 나를 가리켰다. @shared_task 데코레이터의 사용법이 문제인 것 같다. 내 질문을 수정하겠습니다. – Tim

+0

'fetch_shopify' 셀러리 작업을 정의하기 위해 어떤 데코레이터를 사용합니까? – Tim

0

을, 내가 찾은 그 @shared_task 데코레이터 문제를 만듭니다. 내가 지금처럼 신호에 의해 호출 한 함수에서 작업의 권리를 선언 할 때 나는 문제를 회피 할 수

def add_tasks(celery): 
    @celery.task 
    def my_task(uuid): 
     print(uuid) 

    for new_task in settings.NEW_TASKS: 
     celery.add_periodic_task(
      new_task['interval'], 
      my_task.s(new_task['uuid']), 
      name='My Task %s' % new_task['uuid'], 
     ) 

이 솔루션은 실제로 나를 위해 노력하고 있습니다,하지만 난이 하나 더 문제가 : 나는이 코드를 사용 플러그 형 앱이므로 신호 처리기 외부에서 셀러리 앱에 직접 액세스 할 수 없지만 다른 코드 내에서 my_task 함수를 호출 할 수 있기를 원합니다. 함수 내에서 정의함으로써 함수 밖에서 사용할 수 없으므로 다른 곳에서는 가져올 수 없습니다.

아마도 신호 함수 외부에서 태스크 함수를 정의하여 여기에서 다른 데코레이터 및 tasks.py에서 사용할 수 있습니다. 문제가되지 않는 tasks.py에서 사용할 수있는 장식자인 @shared_task 데코레이터를 제외하고는 데코레이터가 있으면 궁금합니다.

현재 최상의 솔루션이 될 수 :

task_app.__init__.py :

def my_task(uuid): 
    # do stuff 
    print(uuid) 

def add_tasks(celery): 
    celery_my_task = celery.task(my_task) 
    for new_task in settings.NEW_TASKS: 
     celery.add_periodic_task(
      new_task['interval'], 
      celery_my_task(new_task['uuid']), 
      name='My Task %s' % new_task['uuid'], 
     ) 

task_app.tasks.py :

from celery import shared_task 
from task_app import my_task 
shared_my_task = shared_task(my_task) 

myapp.celery.py :

import os 
from celery import Celery 
from django.conf import settings 


# set the default Django settings module for the 'celery' program. 
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'my_app.settings') 

app = Celery('my_app') 

@app.on_after_configure.connect 
def setup_periodic_tasks(sender, **kwargs): 
    from task_app import add_tasks 
    add_tasks(sender) 


app.config_from_object('django.conf:settings', namespace='CELERY') 
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)