2014-10-22 6 views
4

가 나는 레디 스 브로커와 백엔드와 다음 설정을 사용하여 화음의 모든 작업을 취소 :셀러리 : 중단 또는

chord([A, A, A, ...])(B)

  • 작업 A는 몇 가지 검사를 수행합니다. AbortableTask을 기본으로 사용하고 task.is_aborted() 플래그를 정기적으로 확인합니다.
  • 태스크 B가 계산

사용자는 A가 작업을 중단 할 가능성이 결과에 대해 사용자에게 통지한다. 아쉽게도 모든 작업 A 인스턴스에서 AbortableAsyncResult(task_a_id).abort()을 호출하면 활성 인스턴스 만 중단됩니다. 아직 작업자가받지 못한 작업의 상태는 ABORTED으로 변경되지만 여전히 처리되며 is_aborted() 플래그는 False를 반환합니다.

당연히 대신에 보류중인 작업을 수행 할 수도 있지만 문제는 그 코드 바디 (작업 B)가 더 이상 실행되지 않는다는 것입니다.

보류중인 실행중인 작업 A 인스턴스를 중지하고 B 작업을 계속 실행하려면 어떻게해야합니까?

답변

0

작업 자체에 코드를 지정하는 대신 A 작업을 보는 코드 작업을 고려하는 것이 좋습니다. 이것이 의미하는 바는, 화음에는 실행중인 작업 (A)을 가끔씩 점검하여 수행 여부를 확인하는 작업이 포함되어 있다는 것입니다. 그들 모두가 성공적으로 작업 B로 체인과 함께 화음을 반환하면 모든 인스턴스의 ID의 목록을 얻을 그들을 막을 수 있습니다.

+0

흥미로운 해킹처럼 들리지만 모든 것을 더 복잡하게 만듭니다. 나는 이것을 달성하기 위해 셀러리에 내장 된 방법이 있다는 것을 희망합니다. –

+0

그래서 문제는 샐러리가 작동하는 방식입니다. 작업이 대기열에 있고 취소되지 않는 한 "활성"상태입니다. 내 지식으로는이 문제를 해결할 방법이 없습니다. 작업이 취소 된 경우에도 체인을 연결하는 코드가 필요합니다. 다시는 내 지식으로는 불가능합니다. – user2097159

+0

AbortableTasks (contrib 패키지로부터) *는 중단 가능합니다 (콜백을 죽이지 않고). 문제는 보류중인 태스크가 아니라 실행중인 태스크에서만 작동한다는 것입니다. –

2

지금 당신은 당신이 원하는 무엇이든 할 수

for taks in my_chord.parent.subtasks: 
    print(task.id) 

를 사용하여,이 간단한 코드 이제

from celery import chord 

my_chord = chord(a.si() for i in range(300))(b.si()) 

당신은 하위 목록 my_chord 인스턴스 (a 작업의 모든 instaces)를 얻을 수를 고려 그 작업 instures 함께 할. 예를 들어 현재 상태에 관계없이 모두 취소 할 수 있습니다.

기본적으로 보류중인 작업 만 종료됩니다. 그러나 terminate=True을 전달하면 실행중인 작업도 종료됩니다.

또한 코드의 콜백 함수는 모든 하위 작업이 성공적으로 실행 된 후에 호출됩니다. 코드의 하위 작업을 취소하기 때문에 콜백 기능이 호출되지 않고 코드 작업이 실패합니다. 콜백 작업을 다시 시도해야합니다.

+0

답해 주셔서 감사합니다.불행히도 그것은 해결책이 아닙니다 :) 질문에 설명 된 것처럼 ID로 작업을 이미 중단/취소하고 있습니다. abort()는 보류중인 작업에서는 작동하지 않으며, revoke()는 콜백이 작동하지 않도록합니다. 그게 제가 풀려고하는 문제입니다. –

+1

그런 다음 하위 잠금 결과가 성공 또는 실패 인 경우 콜백이 호출되도록 'unlock_chord'를 재정의해야합니다. – ChillarAnand