프로젝트 설정에서 사전 목록 (django.conf.settings
을 통해 가져옴)을 기반으로 정기적으로 샐러리 작업을 동적으로 추가하는 모듈을 작성했습니다. 나는 함수 add_tasks
일정 기능을 사용하는 것은 설정에서 주어진 특정 uuid
호출 할 것을 수행Celery add_periodic_task blocks 장고가 uwsgi 환경에서 실행 중임
def add_tasks(celery):
for new_task in settings.NEW_TASKS:
celery.add_periodic_task(
new_task['interval'],
my_task.s(new_task['uuid']),
name='My Task %s' % new_task['uuid'],
)
처럼 내가 내 celery.py
에서 함수를 호출 할 on_after_configure.connect
신호를 사용 here 제안
app = Celery('my_app')
@app.on_after_configure.connect
def setup_periodic_tasks(celery, **kwargs):
from add_tasks_module import add_tasks
add_tasks(celery)
이 설정은 모두 celery beat
및 celery worker
을 위해 잘 작동하지만 난 내 장고 응용 프로그램을 제공하기 위해 uwsgi
를 사용하여 내 설치를 나누기. Uwsgi
은보기 코드가 셀러리의 .delay()
메소드를 사용하여 작업을 처음 전송할 때까지 원활하게 실행됩니다. 그 시점에서 그것은 셀러리가 uwsgi
으로 초기화되었지만 위의 코드에서 영원히 차단 된 것처럼 보입니다. 내가 다음 명령 줄에서 수동으로 실행하는 경우 차단, 나는 다음 (단축) 스택 추적을 얻을 때 중단 : 뮤텍스를 획득에 문제가있는 것처럼
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/kombu/utils/objects.py", line 42, in __get__
return obj.__dict__[self.__name__]
KeyError: 'tasks'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/kombu/utils/objects.py", line 42, in __get__
return obj.__dict__[self.__name__]
KeyError: 'data'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/kombu/utils/objects.py", line 42, in __get__
return obj.__dict__[self.__name__]
KeyError: 'tasks'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
(SHORTENED HERE. Just contained the trace from the console through my call to this function)
File "/opt/my_app/add_tasks_module/__init__.py", line 42, in add_tasks
my_task.s(new_task['uuid']),
File "/usr/local/lib/python3.6/site-packages/celery/local.py", line 146, in __getattr__
return getattr(self._get_current_object(), name)
File "/usr/local/lib/python3.6/site-packages/celery/local.py", line 109, in _get_current_object
return loc(*self.__args, **self.__kwargs)
File "/usr/local/lib/python3.6/site-packages/celery/app/__init__.py", line 72, in task_by_cons
return app.tasks[
File "/usr/local/lib/python3.6/site-packages/kombu/utils/objects.py", line 44, in __get__
value = obj.__dict__[self.__name__] = self.__get(obj)
File "/usr/local/lib/python3.6/site-packages/celery/app/base.py", line 1228, in tasks
self.finalize(auto=True)
File "/usr/local/lib/python3.6/site-packages/celery/app/base.py", line 507, in finalize
with self._finalize_mutex:
것 같다.
은 현재 내가sys.argv[0]
이
uwsgi
가 포함되어있는 경우 감지하는 해결 방법을 사용하고 있습니다 만
beat
이 작업을 필요로 다음,주기적인 작업을 추가하지,하지만 난 더 이상 이러한 문제를 해결하기 위해 잘못된 여기에 무슨 일이 일어나고 있는지 이해하고 싶습니다.
이 문제는 하나의 스레드/프로세스가 다른 필요가있는 뮤텍스를 보유하고있는 uwsgi 멀티 스레드 또는 다중 프로세스를 사용하는 것과 관련이 있습니까?
문제를 해결하는 데 도움이되는 힌트를 주시면 감사하겠습니다. 고맙습니다.
내가 사용하고 있습니다 : 장고 1.11.7와 셀러리를 4.1.0
나는이 문제에 대한 최소한의 설정을 만든 1
편집 :
celery.py :
import os
from celery import Celery
from django.conf import settings
from myapp.tasks import my_task
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'my_app.settings')
app = Celery('my_app')
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
sender.add_periodic_task(
60,
my_task.s(),
name='Testtask'
)
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
tasks.py :
from celery import shared_task
@shared_task()
def my_task():
print('ran')
CELERY_TASK_ALWAYS_EAGER = False이고 작동중인 메시지 대기열이 있는지 확인하십시오.
실행 :
./manage.py shell -c 'from myapp.tasks import my_task; my_task.delay()'
위의 오류를 볼 중단까지 약 10 초 정도 기다립니다.
왜 장고 pr에서 초기화할까요? 대신에 헌신적 인 프로세스를 갖는 것입니까? –
전용 셀레 리 비트 프로세스가 있지만 장고 프로세스 내에서'.delay() '를 호출해야합니다. 그것이 바로 코드 블록입니다. – Tim
그러면 add_tasks가 장고 프로세스에서 호출되지 않습니까? –