클러스터의 슬레이브 노드와 통신하기 위해 마스터 노드를 사용하는 컴퓨터 클러스터가 있습니다.어떻게 코어에 대한 연결을 닫지 않고 실행중인 작업을 종료 할 수 있습니까? (현재 execnet 사용)
내가 직면 한 주된 문제는 execnet을 사용하여 실행중인 특정 작업을 종료 한 다음 다른 작업이 종료 된 동일한 코어에서 대기열을 다시 대기열로 추가 할 수 있다는 것입니다. (모든 코어 주어진 시간에 슬레이브 노드의 수).
지금까지는 execnet을 사용하여 실행중인 작업을 종료 할 수있는 방법이 없으므로 bash 스크립트를 통해 작업을 수동으로 종료 할 수 있다고 생각하면 sudo kill 12345
라고 말합니다. 여기서 12345는 작업의 PID입니다 job은 execnet에 의해 지원되지 않는 또 다른 것입니다. 그러나 그것은 또 다른 주제입니다). 그런 다음 작업을 종료하고 방금 종료 된 동일한 코어에서 다른 작업을 재구성합니다. 작업을 올바르게 종료하지만 모든 채널 작업이 완료 될 때까지 해당 채널 (코어, 마스터 노드는 각 코어에 개별적으로 통신)에 대한 연결을 닫은 다음 해당 코어를 더 이상 사용하지 않습니다. 코어에 대한 연결을 종료하지 않고 실행중인 작업을 종료하는 방법이 있습니까? 여기
일자리
import execnet, os, sys
import re
import socket
import numpy as np
import pickle, cPickle
from copy import deepcopy
import time
import job
def main():
print 'execnet source files are located at:\n {}/\n'.format(
os.path.join(os.path.dirname(execnet.__file__))
)
# Generate a group of gateways.
work_dir = '/home/mpiuser/pn2/'
f = 'cluster_core_info.txt'
n_start, n_end = 250000, 250008
ci = get_cluster_info(f)
group, g_labels = make_gateway_group(ci, work_dir)
mch = group.remote_exec(job)
args = range(n_start, n_end+1) # List of parameters to compute factorial.
manage_jobs(group, mch, queue, g_labels, args)
# Close the group of gateways.
group.terminate()
def get_cluster_info(f):
nodes, ncores = [], []
with open(f, 'r') as fid:
while True:
line = fid.readline()
if not line:
fid.close()
break
line = line.strip('\n').split()
nodes.append(line[0])
ncores.append(int(line[1]))
return dict(zip(nodes, ncores))
def make_gateway_group(cluster_info, work_dir):
''' Generate gateways on all cores in remote nodes. '''
print 'Gateways generated:\n'
group = execnet.Group()
g_labels = []
nodes = list(cluster_info.keys())
for node in nodes:
for i in range(cluster_info[node]):
group.makegateway(
"ssh={0}//id={0}_{1}//chdir={2}".format(
node, i, work_dir
))
sys.stdout.write(' ')
sys.stdout.flush()
print list(group)[-1]
# Generate a string 'node-id_core-id'.
g_labels.append('{}_{}'.format(re.findall(r'\d+',node)[0], i))
print ''
return group, g_labels
def get_mch_id(g_labels, string):
ids = [x for x in re.findall(r'\d+', string)]
ids = '{}_{}'.format(*ids)
return g_labels.index(ids)
def manage_jobs(group, mch, queue, g_labels, args):
args_ref = deepcopy(args)
terminated_channels = 0
active_jobs, active_args = [], []
while True:
channel, item = queue.get()
if item == 'terminate_channel':
terminated_channels += 1
print " Gateway closed: {}".format(channel.gateway.id)
if terminated_channels == len(mch):
print "\nAll jobs done.\n"
break
continue
if item != "ready":
mch_id_completed = get_mch_id(g_labels, channel.gateway.id)
depopulate_list(active_jobs, mch_id_completed, active_args)
print " Gateway {} channel id {} returned:".format(
channel.gateway.id, mch_id_completed)
print " {}".format(item)
if not args:
print "\nNo more jobs to submit, sending termination request...\n"
mch.send_each(None)
args = 'terminate_channel'
if args and \
args != 'terminate_channel':
arg = args.pop(0)
idx = args_ref.index(arg)
channel.send(arg) # arg is copied by value to the remote side of
# channel to be executed. Maybe blocked if the
# sender queue is full.
# Get the id of current channel used to submit a job,
# this id can be used to refer mch[id] to terminate a job later.
mch_id_active = get_mch_id(g_labels, channel.gateway.id)
print "Job {}: {}! submitted to gateway {}, channel id {}".format(
idx, arg, channel.gateway.id, mch_id_active)
populate_list(active_jobs, mch_id_active,
active_args, arg)
def populate_list(jobs, job_active, args, arg_active):
jobs.append(job_active)
args.append(arg_active)
def depopulate_list(jobs, job_completed, args):
i = jobs.index(job_completed)
jobs.pop(i)
args.pop(i)
if __name__ == '__main__':
main()
제출 스크립트이며, 여기 내 job.py 스크립트입니다 :
#!/usr/bin/env python
import os, sys
import socket
import time
import numpy as np
import pickle, cPickle
import random
import job
def hostname():
return socket.gethostname()
def working_dir():
return os.getcwd()
def listdir(path):
return os.listdir(path)
def fac(arg):
return np.math.factorial(arg)
def dump(arg):
path = working_dir() + '/out'
if not os.path.exists(path):
os.mkdir(path)
f_path = path + '/fac_{}.txt'.format(arg)
t_0 = time.time()
num = fac(arg) # Main operation
t_1 = time.time()
cPickle.dump(num, open(f_path, "w"), protocol=2) # Main operation
t_2 = time.time()
duration_0 = "{:.4f}".format(t_1 - t_0)
duration_1 = "{:.4f}".format(t_2 - t_1)
#num2 = cPickle.load(open(f_path, "rb"))
return '--Calculation: {} s, dumping: {} s'.format(
duration_0, duration_1)
if __name__ == '__channelexec__':
channel.send("ready")
for arg in channel:
if arg is None:
break
elif str(arg).isdigit():
channel.send((
str(arg)+'!',
job.hostname(),
job.dump(arg)
))
else:
print 'Warnning! arg sent should be number | None'