2012-08-27 3 views
5

일부 컨텍스트 : 사용자가 작업을 미리 저장하고 정확한 날짜를 예약 할 수있게 해주는 장고 앱을 구축하고 있습니다./미래에 그들은 행동을 실행하기를 원합니다. 예를 들어, 다음 주 오후 5시 30 분에 프로그래밍 방식으로 Facebook 벽에 푸시되도록 게시 일정을 예약합니다.Django-Celery를 통해 거의 동시 적으로 수행 할 수있는 일회성 (재발생하지 않는) 작업 수천 개를 예약합니다.

일회성 작업의 수천 건을 처리 할 수있는 작업 스케줄링 시스템을 찾고 있는데, 모두 거의 동시에 실행되도록 설정되었습니다 (오류 마진 더하기 또는 분).

장고 셀러리/Rabbitmq를 고려하고 있지만, Celery docs은 일회용으로 사용되는 작업에 대해서는 언급하지 않았습니다. Django-Celery는 CrontabSchedule의 하위 클래스로 선택되어 있습니까? 아니면 다른 방법을 연구하는 데 더 많은 에너지를 소비하고 있습니까? 아마도 Sched Module 및 Cron과 함께 뭔가를 해킹했을 것입니다.

+0

최근 편집 내용에 대한 알림을 받을지 여부를 알지 못하므로 여기에서 의견을 말하고 싶습니다. – dokkaebi

답변

7

편집 2 : 어떤 이유

은, 내 머리는 원래 작업을 반복의 영역에 갇혀 있었다. 다음은 더 간단한 해결책입니다.

정말 필요한 것은 각 사용자 작업에 대해 하나의 작업을 정의하는 것입니다. 데이터베이스에서 실행될 저장 작업을 건너 뛸 수 있습니다. 바로 셀러리가 여기에 있습니다!

다시 페이스 북 게시물 예제를 다시 사용하고 사용자와 텍스트를 사용하고 마법을 사용하여 해당 사용자의 페이스 북에 텍스트를 게시하는 어딘가에 post_to_facebook 어딘가에 있다고 가정하면 다시 정의 할 수 있습니다. 이 같은 작업 :

# Task to send one update. 
@celery.task(ignore_result=True) 
def post_to_facebook(user, text): 
    # perform magic 
    return whatever_you_want 

사용자는 게시물을 대기열에 준비가되면, 당신은 그냥 작업을 실행하는 셀러리 알려주기 :

post_to_facebook.apply_async(
    (user, text), # args 
    eta=datetime.datetime(2012, 9, 15, 11, 45, 4, 126440) # pass execution options as kwargs 
) 

이 모두의 전체 무리들, 여기에 설명입니다 사용 가능한 통화 옵션 : http://docs.celeryproject.org/en/latest/userguide/calling.html#eta-and-countdown

호출 결과가 필요한 경우 작업 정의에서 ignore_result 매개 변수를 건너 뛰고 AsyncResult 개체를 다시 가져온 다음 호출 결과를 검사 할 수 있습니다. 여기에 더 많은 것 : http://docs.celeryproject.org/en/latest/getting-started/first-steps-with-celery.html#keeping-results

아래 답변 중 일부는 여전히 관련이 있습니다. 여전히 각 사용자 작업에 대한 작업을 원하지만 작업 디자인 등에 대해 생각하고 싶지 만 요청한 작업을 수행하는 데 훨씬 간단한 방법입니다. 반복 작업을 사용하여

원래 대답은 다음과 같습니다

Dannyroa이 올바른 생각을 가지고있다. 나는 그것을 여기에서 약간 만들 것이다.

편집/TLDR : 대답은 , 셀러리는 여러분의 필요에 적합하다. 당신은 단지 당신의 업무 정의를 재고 할 필요가 있습니다.

사용자가 임의의 파이썬 코드를 작성하여 태스크를 정의하도록 허용하지 않는다고 가정합니다. 즉, 사용자가 일정을 잡을 수있는 동작을 미리 정의한 다음 원하는 동작을 예약 할 수 있어야합니다. 그런 다음 각 사용자 작업에 대해 하나의 예약 된 작업을 실행하고 항목을 확인하고 각 항목에 대한 작업을 수행 할 수 있습니다.

한 사용자 작업 :

페이스 북 예제를 사용하여 테이블에 사용자의 업데이트를 저장하는 것입니다 :

class ScheduledPost(Model): 
    user = ForeignKey('auth.User') 
    text = TextField() 
    time = DateTimeField() 
    sent = BooleanField(default=False) 

그런 다음 해당 항목에 대한 점검 작업을 매분 실행됩니다 테이블 (마지막으로 언급 한 오류 마진에 따라) 게시 예정. 1 분짜리 창을 치는 것이 매우 중요하다면 30 초마다 작업을 더 자주 예약 할 수 있습니다. 작업 수 (MyApp를/tasks.py에서) 다음과 같이 :

@celery.task 
def post_scheduled_updates(): 
    from celery import current_task 
    scheduled_posts = ScheduledPost.objects.filter(
     sent=False, 
     time__gt=current_task.last_run_at, #with the 'sent' flag, you may or may not want this 
     time__lte=timezone.now() 
    ) 
    for post in scheduled_posts: 
     if post_to_facebook(post.text): 
      post.sent = True 
      post.save() 

는 config는 다음과 같습니다

CELERYBEAT_SCHEDULE = { 
    'fb-every-30-seconds': { 
     'task': 'tasks.post_scheduled_updates', 
     'schedule': timedelta(seconds=30), 
    }, 
} 

추가 사용자 작업 :

각 사용자 작업에 대한 Facebook에 게시하는 것 외에도 새 테이블과 새 작업을 정의 할 수 있습니다.

class EmailToMom(Model): 
    user = ForeignKey('auth.User') 
    text = TextField() 
    subject = CharField(max_length=255) 
    sent = BooleanField(default=False) 
    time = DateTimeField() 

@celery.task 
def send_emails_to_mom(): 
    scheduled_emails = EmailToMom.objects.filter(
     sent=False, 
     time__lt=timezone.now() 
    ) 
    for email in scheduled_emails: 
     sent = send_mail(
      email.subject, 
      email.text, 
      email.user.email, 
      [email.user.mom.email], 
     ) 
     if sent: 
      email.sent = True 
      email.save() 

    CELERYBEAT_SCHEDULE = { 
     'fb-every-30-seconds': { 
      'task': 'tasks.post_scheduled_updates', 
      'schedule': timedelta(seconds=30), 
     }, 
     'mom-every-30-seconds': { 
      'task': 'tasks.send_emails_to_mom', 
      'schedule': timedelta(seconds=30), 
     }, 
    } 

속도와 최적화 :

대신 게시 업데이트 반복하고 post_scheduled_updates 통화 중에 직렬을 보내는 더 많은 처리량을 얻으려면, 당신은 하위의 무리를 산란 할 수 충분히 주어진 (병렬로 수행 workers). 그런 다음 post_scheduled_updates에 대한 호출이 매우 빠르게 실행되며 각 fb 업데이트에 대해 하나씩 많은 작업이 가능한 빨리 실행되도록 예약합니다. 그러면 다음과 같이 보일 것입니다 :

내가 게시 한 코드는 테스트를 거치지 않았지만 확실히 최적화되지는 않았지만, 올바른 방향으로 가야합니다. 귀하의 질문에 당신은 처리량에 대한 우려를 암시하므로 최적화 할 수있는 장소를 면밀히 관찰하고 싶을 것입니다. 한 가지 분명한 사실은 반복적으로 post.sent=True;post.save()을 호출하는 대신 대량 업데이트입니다.

상세 정보 : 정기적 인 작업에

더 많은 정보 : http://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html.

작업의 디자인 전략에 대한 섹션 : http://docs.celeryproject.org/en/latest/userguide/optimizing.html을 : http://docs.celeryproject.org/en/latest/userguide/tasks.html#performance-and-strategies

여기에 셀러리를 최적화하는 방법에 대한 전체 페이지가 있습니다.

하위 작업에 대한이 페이지는 http://docs.celeryproject.org/en/latest/userguide/canvas.html입니다.

실제로 모든 셀러리 문서를 읽는 것이 좋습니다.

+0

감사합니다. 당신의 대답은 믿을 수 없을 정도로 도움이되었습니다. –

+0

굉장합니다. 다행스럽게 도울 수있어! – dokkaebi

+0

일부 태스크는 셀 룰러 태스크의 중요한 멱등 원 규칙에 실패합니다. 예를 들면 : celery.task @ ''' 데프 send_emails_to_mom() :' : 이 scheduled_emails = EmailToMom.objects.filter scheduled_emails에서 이메일 ( 이 time__lt = timezone.now() = 거짓 전송) '' 이 작업이 동시에 두 번 시작된 경우 둘 다 동일한 예약 된 전자 메일 목록을 보내고 동일한 목록으로 보내기 시작합니다. – dalore

0

내가 할 일은 ScheduledPost라는 모델을 만드는 것입니다.

매 5 분마다 실행되는 PeriodicTask가 있습니다.

작업은 ScheduledPost 테이블에서 Facebook으로 푸시해야하는 게시물을 확인합니다.