2017-03-14 23 views
3

Python concurrent.futures와 ProcessPoolExecutor는 작업을 예약하고 모니터하기위한 깔끔한 인터페이스를 제공합니다. 선물도 provide .cancel() 메소드 :Python : concurrent.futures 취소 가능하게 만드는 방법?

취소() : 호출을 취소 할 시도. 호출이 이고 현재 실행 중이므로 취소 할 수 없으면 메서드는 False를 반환하고 그렇지 않으면 호출이 취소되고 메서드가 True를 반환합니다. 불행히도에 simmilar question (관련된 asyncio)에서

는 작업을 실행 응답 주장은 문서의 잘린를 사용 uncancelable,하지만 그들은 실행 및 uncancelable 경우에만 워드 프로세서, 그런 말하지 말아.

프로세스에 multiprocessing.Events 제출은 하찮게 수 없습니다 내가 뭘하려고 오전

(multiprocess.Process에 같은 매개 변수를 통해 그렇게는 RuntimeError에를 반환)? 검색 공간을 분할하고 모든 파티션에 대해 작업을 실행하고 싶습니다. 그러나 하나의 솔루션 만 있으면 충분하며 프로세스는 CPU를 많이 사용합니다. ProcessPool을 사용하여 이익을 상쇄하지 않는 실제적인 안락한 방법이 있습니까?

예 :

from concurrent.futures import ProcessPoolExecutor, FIRST_COMPLETED, wait 

# function that profits from partitioned search space 
def m_run(partition): 
    for elem in partition: 
     if elem == 135135515: 
      return elem 
    return False 

futures = [] 
# used to create the partitions 
steps = 100000000 
with ProcessPoolExecutor(max_workers=4) as pool: 
    for i in range(4): 
     # run 4 tasks with a partition, but only *one* solution is needed 
     partition = range(i*steps,(i+1)*steps) 
     futures.append(pool.submit(m_run, partition)) 

    done, not_done = wait(futures, return_when=FIRST_COMPLETED) 
    for d in done: 
     print(d.result()) 

    print("---") 
    for d in not_done: 
     # will return false for Cancel and Result for all futures 
     print("Cancel: "+str(d.cancel())) 
     print("Result: "+str(d.result())) 
+0

당신은 PARAM로 전달하는 대신 글로벌 변수에'Event'를 설정하려고 할 수 http://stackoverflow.com/questions/1675766/how-to-combine-pool-map-with 참조 -array-shared-memory-in-python-multiprocessing – niemmi

+0

@niemmi tipp에 감사드립니다. 아마 다른 모듈에 대한 호출로 잘 짜여진 느낌이 없으므로이를 해결 방법으로 시도 할 것입니다. – Ketzu

답변

2

그래서 여기 내 발견의 질문이 흥미로운 발견했다.

.cancel() 메서드의 동작이 python 설명서에 명시된 것과 같습니다. 실행중인 동시 기능에 관해서는 불행히도 명령을받은 후에도 취소 할 수 없습니다. 내 발견이 맞다면 파이썬이보다 효과적인 .cancel() 메소드를 필요로한다고 생각합니다.

내 검색 결과를 확인하려면 아래 코드를 실행하십시오.

from concurrent.futures import ProcessPoolExecutor, as_completed 
from time import time 

# function that profits from partitioned search space 
def m_run(partition): 
    for elem in partition: 
     if elem == 3351355150: 
      return elem 
      break #Added to terminate loop once found 
    return False 

start = time() 
futures = [] 
# used to create the partitions 
steps = 1000000000 
with ProcessPoolExecutor(max_workers=4) as pool: 
    for i in range(4): 
     # run 4 tasks with a partition, but only *one* solution is needed 
     partition = range(i*steps,(i+1)*steps) 
     futures.append(pool.submit(m_run, partition)) 

    ### New Code: Start ### 
    for f in as_completed(futures): 
     print(f.result()) 
     if f.result(): 
      print('break') 
      break 

    for f in futures: 
     print(f, 'running?',f.running()) 
     if f.running(): 
      f.cancel() 
      print('Cancelled? ',f.cancelled()) 

    print('New Instruction Ended at = ', time()-start) 
print('Total Compute Time = ', time()-start) 

업데이트 : 그것은 강제로 떠들썩한 파티를 통해 동시 프로세스를 종료 할 수 있지만, 결과는 주 파이썬 프로그램도 종료 할 것입니다. 문제가되지 않는다면 아래 코드를 시도해보십시오.

본인 확인을 위해 마지막 2 개의 print 문 사이에 아래 코드를 추가해야합니다. 참고 :이 코드는 다른 python3 프로그램을 실행하지 않는 경우에만 작동합니다. concurrent.futures.Future.kill() 방법이없는 이유

import subprocess, os, signal 
result = subprocess.run(['ps', '-C', 'python3', '-o', 'pid='], 
         stdout=subprocess.PIPE).stdout.decode('utf-8').split() 
print ('result =', result) 
for i in result: 
    print('PID = ', i) 
    if i != result[0]: 
     os.kill(int(i), signal.SIGKILL) 
     try: 
      os.kill(int(i), 0) 
      raise Exception("""wasn't able to kill the process 
           HINT:use signal.SIGKILL or signal.SIGABORT""") 
     except OSError as ex: 
      continue 
1

는 모르겠지만, 당신은 pool.shutdown(wait=False)하여 프로세스 풀을 종료, 손으로 남아있는 자식 프로세스를 죽이는 당신이 원하는 것을 수행 할 수 있습니다.

자식 프로세스를 종료하는 기능을 만듭니다

import signal, psutil 

def kill_child_processes(parent_pid, sig=signal.SIGTERM): 
    try: 
     parent = psutil.Process(parent_pid) 
    except psutil.NoSuchProcess: 
     return 
    children = parent.children(recursive=True) 
    for process in children: 
     process.send_signal(sig) 

은 당신의 코드를 실행하면 첫 번째 결과를 얻을 때까지, 모든 남아있는 자식 프로세스를 죽일 : 불행하게도

from concurrent.futures import ProcessPoolExecutor, FIRST_COMPLETED, wait 

# function that profits from partitioned search space 
def m_run(partition): 
    for elem in partition: 
     if elem == 135135515: 
      return elem 
    return False 

futures = [] 
# used to create the partitions 
steps = 100000000 
pool = ProcessPoolExecutor(max_workers=4) 
for i in range(4): 
    # run 4 tasks with a partition, but only *one* solution is needed 
    partition = range(i*steps,(i+1)*steps) 
    futures.append(pool.submit(m_run, partition)) 

done, not_done = wait(futures, timeout=3600, return_when=FIRST_COMPLETED) 

# Shut down pool 
pool.shutdown(wait=False) 

# Kill remaining child processes 
kill_child_processes(os.getpid()) 
1

Futures 취소 할 수 없습니다 실행 .핵심적인 이유는 다른 구현에서 동일한 API를 보장하는 것입니다 (실행중인 스레드 또는 coroutines를 인터럽트 할 수 없음).

Pebble 라이브러리는 이러한 제한 사항을 극복하기 위해 설계되었습니다.

from pebble import ProcessPool 

def function(foo, bar=0): 
    return foo + bar 

with ProcessPool() as pool: 
    future = pool.schedule(function, args=[1]) 

    # if running, the container process will be terminated 
    # a new process will be started consuming the next task 
    future.cancel()