외부 프로세스가 있으며 실행하는 데 다소 시간이 걸리고 결국 일부 출력이 표준 출력으로 인쇄됩니다.동일한 프로세스를 N 번 실행하고 X 초 내에 완료하지 않은 경우 종료하십시오.
외부 프로세스 N 번 실행하는 python 프로세스를 작성해야합니다. X 초 후에, 이미 종료 된 각 프로세스에 대해 출력을 수집해야하며, 종료하지 않은 각 프로세스에 대해 종료해야합니다.
어떻게 하시겠습니까?
외부 프로세스가 있으며 실행하는 데 다소 시간이 걸리고 결국 일부 출력이 표준 출력으로 인쇄됩니다.동일한 프로세스를 N 번 실행하고 X 초 내에 완료하지 않은 경우 종료하십시오.
외부 프로세스 N 번 실행하는 python 프로세스를 작성해야합니다. X 초 후에, 이미 종료 된 각 프로세스에 대해 출력을 수집해야하며, 종료하지 않은 각 프로세스에 대해 종료해야합니다.
어떻게 하시겠습니까?
당신이 사용할 수있는 내장 된 API :
http://strftime.org/
https://docs.python.org/3.5/library/time.html
start_time = time.time()
# your code
elapsed_time = time.time() - start_time
time.strftime("%H:%M:%S", time.gmtime(elapsed_time))
#'00:00:02'
는 나도 몰라, 프로세스를 종료하는 방법이를 처리 :
당신을 수 있습니다 사용 :
import subprocess, signal
p = subprocess.Popen(['ps', '-A'], stdout=subprocess.PIPE)
out, err = p.communicate()
for line in out.splitlines():
if 'iChat' in line:
pid = int(line.split(None, 1)[0])
os.kill(pid, signal.SIGKILL) # -9 (equivalent.)
apscheduler 모듈을 사용할 수 있습니다. A (빠르고 더러운) 예 :
from apscheduler.schedulers.background import BlockingScheduler
count = 0
N = 10 # Change this per your requirement
def run_external_process():
global count, scheduler
# Execute the job N times
count = count + 1
if count == 10: # Shutdown the scheduler and exit
scheduler.shutdown(wait=False)
sys.exit(1)
#Run your code here
# When each job is executed check for its output
def listen_for_output():
if event.retval: # Check the return value of your external process
# Collect output
else:
# Call script to terminate process
try:
scheduler = BlockingScheduler() # Initiate your scheduler
except ImportError:
raise ImportError('Failed to import module')
sys.exit(1)
scheduler.add_job(run_external_process, 'interval', seconds=1, id='my_job_id')
# Add a listener to check the output of your external process
scheduler.add_listener(listen_for_output, EVENT_JOB_EXECUTED|EVENT_JOB_ERROR)
scheduler.start()