1

나는 멀티 스레드 용 concurrent.future.ThredPoolExecutor로 wokring입니다. 몇 가지 http 서비스를 실행하고 있습니다. 서버가 다운되었을 때 실행을 일시 중지하고 서버를 시작한 다음 실행을 다시 시작합니다.파이썬 3.4 concurrent.futures.Executor가 스레드를 일시 중지하고 다시 시작하는 컨트롤을 제공하지 않습니다.

서버가 다운되는 트리거는 특정 위치에서 파일을 사용할 수 있는지 확인한 후 실행을 일시 중지해야합니다.

그래서 concurrent.futures.Executor.shutdown()은 현재 보류중인 미래가 실행될 때 사용중인 자원을 해제해야한다는 것을 executor에게 알립니다.

하지만 executor의 shutdown() 메소드를 사용하면 스레드를 즉시 종료하지 않고 전체 실행을 마친 후 shutdown()을 호출합니다.

Infact 나는 일시 중지 및 concurren.future에서 재개를 찾을 수 없으므로 shutdown() 메서드를 호출합니다. 따라서 스레드가 실행을 완료하면 목록에서 URL을 제거합니다. 그래서 나는 나머지리스트를 전달하고 같은 방법을 상기 할 수있다. 당신은 취소 또는 일시 정지/파이썬에서 스레드를 다시 시작할 수 없습니다

import concurrent.futures 
import urllib.request 
import os.path 
import datetime 
import sys 
import pathlib 
from errno import ENOENT, EACCES, EPERM 
import time 
import threading 

listOfFilesFromDirectory = [] 
webroot = settings.configuration.WEBSERVER_WEBROOT 
WEBSERVER_PORT = settings.configuration.WEBSERVER_PORT 
shutdown = False 

def class myclass: 

#populating the list with the urls from a file 
def triggerMethod(path): 
    try: 
     for line in open(path): 
      listOfFilesFromDirectory.append(line) 
    except IOError as err: 
     if err.errno == ENOENT: 
      #logging.critical("document.txt file is missing") 
      print("document.txt file is missing") 
     elif err.errno in (EACCES, EPERM): 
      #logging.critical("You are not allowed to read document.txt") 
      print("You are not allowed to read document.txt") 
     else: 
      raise 

# calling this method to stop the threads and restart after a sleep of 100 secs, as the list will always have the urls that were not executed. 
def stopExecutor(executor): 
    filePath = "C:\logs\serverStopLog.txt" 
    while not shutdown: 
     time.sleep(5) 
     if os.path.isfile(filePath): 
      executor.shutdown() 
      time.sleep(100) 
      runRegressionInMultipleThreads() 
      break 

def load_url(url, timeout): 
    conn = urllib.request.urlopen('http://localhost:' + WEBSERVER_PORT + "/" + url, timeout = timeout) 
    return conn.info() 

def trigegerFunc(): 
    # We can use a with statement to ensure threads are cleaned up promptly 
    with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor: 
     # Start the load operations and mark each future with its URL 
     future_to_url = {executor.submit(load_url, url, 60): url for url in listOfFilesFromDirectory} 

     t = threading.Thread(target=stopExecutor, args=(executor)) 
     t.start() 
     for future in concurrent.futures.as_completed(future_to_url): 
      url = future_to_url[future] 
      try: 
       data = future.result() 
      except Exception as exc: 
       print('%r generated an exception: %s' % (url, exc)) 
       listOfFilesFromDirectory.remove(url) 
      else: 
       if data: 
        if "200" in data: 
         listOfFilesFromDirectory.remove(url) 
        else: 
         listOfFilesFromDirectory.remove(url) 
       else: 
        listOfFilesFromDirectory.remove(url) 
     shutdown = True 
     t.join()     


triggerMethod("C:\inetpub\wwwroot") 
trigegerFunc() 

답변

1

: 여기

는 코드입니다. executor.shutdown() 당신은 당신이 문서를 인용 때 수행했다 않습니다 정확히 :

신호는 현재 보류 선물이을 실행 완료되면이 를 사용하는 모든 리소스를 해제해야한다는 집행자.

굵게 표시된 부분 - 실행 프로그램은 현재 실행중인 모든 작업이 완료된 후에 만 ​​종료됩니다.

import time 
import os.path 
import threading 
import urllib.request 
import multiprocessing 
import concurrent.futures 
from multiprocessing import cpu_count 

shutdown = False 
should_cancel = False 

def stopTasks(): 
    global should_cancel 
    filePath = "C:\logs\serverStopLog.txt" 
    while not shutdown: 
     time.sleep(5) 
     if os.path.isfile(filePath): 
      should_cancel = True 
      break 

def _load_url(num, timeout, q): 
    conn = urllib.request.urlopen('http://localhost:' + WEBSERVER_PORT + 
            "/" + url, timeout=timeout) 
    q.put(conn.info()) 

def load_url(num, timeout): 
    q = multiprocessing.Queue() 
    p = multiprocessing.Process(target=_load_url, args=(num, timeout, q)) 
    p.start() 
    while p.is_alive(): 
     time.sleep(.5) 
     if should_cancel: 
      p.terminate() # This will actually kill the process, cancelling the operation 
      break # You could return something here that indicates it was cancelled, too. 
    else: 
     # We'll only enter this if we didn't `break` above. 
     out = q.get() 
     p.join() 
     return out 

def triggerFunc(): 
    global shutdown 
    with concurrent.futures.ThreadPoolExecutor(max_workers=cpu_count()) as executor: 
     # Start the load operations and mark each future with its URL 
     future_to_url = {executor.submit(load_url, url, 60): 
          url for url in listOfFilesFromDirectory} 
     t = threading.Thread(target=stopTasks) 
     t.start() 
     for future in concurrent.futures.as_completed(future_to_url): 
      info = future.result() 
      print("done: {}".format(info)) 
      # other stuff you do 
     shutdown = True 
     t.join() 

if __name__ == "__main__": 
    triggerFunc() 

우리가 실제로 서브를 죽일 수 있기 때문에 : 당신이 원하는 컨트롤의 종류를 얻으려면, 당신은 (이 스크립트의 단순화 된 버전)과 같이 별도의 프로세스에서 urllib 호출을 실행해야합니다 -process를 SIGTERM으로 보내면 여전히 진행 중일 때 urlopen 작업을 취소 할 수 있습니다.