2017-12-31 125 views
1

thats 내 첫 번째 질문에 stackoverflow. 나는 내가 알고 싶은 것을 여기에서 찾을 수 있었다. 이 btw에 대해 감사드립니다.Python Processpoolexecutor - 대기열을 없애시겠습니까?

그러나. 내 ProcessPoolExecutor를 죽이려고하면 생성 된 전체 큐를 통해 작동합니다 (.. 그렇게 생각합니까?). Processpoolexecutor의 큐를 즉시 지울 수있는 간단한 방법이 있습니까?

from concurrent.futures import ProcessPoolExecutor 
from time import sleep 
from random import randint 


def something_fancy(): 
    sleep(randint(0, 5)) 
    return 'im back!' 


class Work: 
    def __init__(self): 
     self.exe = ProcessPoolExecutor(4) 

    def start_procs(self): 
     for i in range(300): 
      t = self.exe.submit(something_fancy) 
      t.add_done_callback(self.done) 

    def done(self, f): 
     print f.result() 

    def kill(self): 
     self.exe.shutdown() 


if __name__ == '__main__': 
    work_obj = Work() 
    work_obj.start_procs() 
    sleep(5) 
    work_obj.kill() 

그래서 내가 원하는 것은 4 개의 프로세스에 의해 해결 된 300 대기열을 생성하는 것입니다. 5 초 후 방금 종료해야합니다.

나는 gil btw 때문에 프로세스를 사용해야합니다.

답변

0

shutdown(wait=False)을 사용하면 더 빨리 반환됩니다. wait의 기본값은 True입니다. 그렇지 않으면 .Cancel()을 제공합니다. 그럴 수 없다면 False를 반환합니다. 모든 보류중인 미래가 실행 완료 될 때까지 True입니다

wait 경우,이 메소드는 반환하지 않습니다 및 집행과 관련된 자원이되었습니다

link to the doku

아직하지만 모든 processiont을 완료합니다 해방.

waitFalse 인 경우이 메서드는 즉시 반환되고 모든 보류중인 선물이 실행되면이 메서드와 연결된 리소스가 해제됩니다. wait 값에 관계없이 모든 보류중인 미래가 실행 완료 될 때까지 전체 Python 프로그램이 종료되지 않습니다. 당신은 시간의 고정 금액이있는 경우

, 당신은 시간 제한 제공해야

map(func, *iterables, timeout=None, chunksize=1) 

부동 또는 INT 될 수 있습니다 -하지만 ms 또는 어떤 경우 다큐는 말하지 않습니다. .. 내가 선물 목록에 추가 수동 큐 크기를 조정함으로써 각 처리를 취소 할 수 있었던 힌트

0

감사 릭

. 그것 없이는 발사되는 많은 과정에 아직도있다.

대기열 크기를 조정하거나 실행을 일시 중지하거나 프로세스 대기열을 삭제하는 API가없는 것처럼 보입니다.

그러나 이것을 실현하는 유일한 방법은 스레드에서 Main 객체를 실행하여 메인 스크립트가 언제든지 그것을 종료 할 수있게하는 것입니다. 그리고 여전히 "CancelledError"를 잡으려고합니다.

나를 위해 꽤 "더러운"과 pythonic가 아닌 것처럼 보입니다. 나는 다른 제안을 취할 것이다. 고마워.

from concurrent.futures import ProcessPoolExecutor, CancelledError 
from time import sleep 
from random import randint 
from threading import Thread 


def something_fancy(): 
    sleep(randint(0, 5)) 
    return 'im back!' 


class Work: 
    def __init__(self): 
     self.exe = ProcessPoolExecutor(4) 
     self.futures = [] 
     self.max_queue = 50 
     self.killed = False 

    def start_procs(self): 
     for i in range(200000): 
      while not self.killed: 
       if len(self.futures) <= self.max_queue: 
        t = self.exe.submit(something_fancy) 
        t.add_done_callback(self.done) 
        self.futures.append(t) 
        break 

    def done(self, f): 
     print f.result() 
     self.futures.remove(f) 

    def kill(self): 
     self.killed = True 
     for future in self.futures: 
      try: 
       future.cancel() 
      except CancelledError, e: 
       print e 


if __name__ == '__main__': 
    work_obj = Work() 
    Thread(target=work_obj.start_procs).start() 
    sleep(5) 
    work_obj.kill() 
작동

edit

from concurrent.futures import ProcessPoolExecutor, CancelledError 
from time import sleep 
from random import randint 
from threading import Thread 


def something_fancy(): 
    sleep(0.5) 
    return 'Hello World, Future was running!' 


class Work: 
    def __init__(self): 
     cpu_usage = 4 
     self.exe = ProcessPoolExecutor(cpu_usage) 
     self.futures = [] 
     self.max_queue = cpu_usage*3 
     self.stop = False 
     self.paused = False 

    def start_procs(self): 
     for i in range(200000): 
      while not self.stop: 
       if len(self.futures) <= self.max_queue: 
        if not self.paused: 
         t = self.exe.submit(something_fancy) 
         t.add_done_callback(self._done) 
         self.futures.append(t) 
         break 

    def _done(self, f): 
     print f.result() 
     self.futures.remove(f) 

    def pause(self): 
     self.paused = False if self.paused else True 

    def shutdown(self): 
     self.stop = True 
     for future in self.futures: 
      try: 
       future.cancel() 
      except CancelledError, e: 
       print e 


if __name__ == '__main__': 
    work_obj = Work() 
    Thread(target=work_obj.start_procs).start() 
    print 'Started' 
    sleep(5) 
    work_obj.pause() 
    print 'Paused' 
    sleep(5) 
    work_obj.pause() 
    print 'Continue' 
    sleep(5) 
    work_obj.shutdown() 
    print 'Shutdown' 

은 - 아직도 꽤 더러운 여전히 CancelledError과를 캐치하지 않습니다.