2017-12-26 51 views
0

클러스터의 슬레이브 노드와 통신하기 위해 마스터 노드를 사용하는 컴퓨터 클러스터가 있습니다.어떻게 코어에 대한 연결을 닫지 않고 실행중인 작업을 종료 할 수 있습니까? (현재 execnet 사용)

내가 직면 한 주된 문제는 execnet을 사용하여 실행중인 특정 작업을 종료 한 다음 다른 작업이 종료 된 동일한 코어에서 대기열을 다시 대기열로 추가 할 수 있다는 것입니다. (모든 코어 주어진 시간에 슬레이브 노드의 수).

지금까지는 execnet을 사용하여 실행중인 작업을 종료 할 수있는 방법이 없으므로 bash 스크립트를 통해 작업을 수동으로 종료 할 수 있다고 생각하면 sudo kill 12345라고 말합니다. 여기서 12345는 작업의 PID입니다 job은 execnet에 의해 지원되지 않는 또 다른 것입니다. 그러나 그것은 또 다른 주제입니다). 그런 다음 작업을 종료하고 방금 종료 된 동일한 코어에서 다른 작업을 재구성합니다. 작업을 올바르게 종료하지만 모든 채널 작업이 완료 될 때까지 해당 채널 (코어, 마스터 노드는 각 코어에 개별적으로 통신)에 대한 연결을 닫은 다음 해당 코어를 더 이상 사용하지 않습니다. 코어에 대한 연결을 종료하지 않고 실행중인 작업을 종료하는 방법이 있습니까? 여기

일자리

import execnet, os, sys 
import re 
import socket 
import numpy as np 
import pickle, cPickle 
from copy import deepcopy 
import time 
import job 


def main(): 
    print 'execnet source files are located at:\n {}/\n'.format(
      os.path.join(os.path.dirname(execnet.__file__)) 
     ) 

# Generate a group of gateways. 
work_dir = '/home/mpiuser/pn2/' 
f = 'cluster_core_info.txt' 
n_start, n_end = 250000, 250008 

ci = get_cluster_info(f) 
group, g_labels = make_gateway_group(ci, work_dir) 


mch = group.remote_exec(job) 

args = range(n_start, n_end+1) # List of parameters to compute factorial. 
manage_jobs(group, mch, queue, g_labels, args) 

# Close the group of gateways. 
group.terminate() 

def get_cluster_info(f): 
    nodes, ncores = [], [] 
    with open(f, 'r') as fid: 
     while True: 
      line = fid.readline() 
      if not line: 
       fid.close() 
       break 
      line = line.strip('\n').split() 
      nodes.append(line[0]) 
      ncores.append(int(line[1])) 
    return dict(zip(nodes, ncores)) 

def make_gateway_group(cluster_info, work_dir): 
    ''' Generate gateways on all cores in remote nodes. ''' 
    print 'Gateways generated:\n' 
    group = execnet.Group() 
    g_labels = [] 
    nodes = list(cluster_info.keys()) 
    for node in nodes: 
     for i in range(cluster_info[node]): 
      group.makegateway(
       "ssh={0}//id={0}_{1}//chdir={2}".format(
       node, i, work_dir 
       )) 
      sys.stdout.write(' ') 
      sys.stdout.flush() 
      print list(group)[-1] 
      # Generate a string 'node-id_core-id'. 
      g_labels.append('{}_{}'.format(re.findall(r'\d+',node)[0], i)) 
    print '' 
    return group, g_labels 

def get_mch_id(g_labels, string): 
    ids = [x for x in re.findall(r'\d+', string)] 
    ids = '{}_{}'.format(*ids) 
    return g_labels.index(ids) 

def manage_jobs(group, mch, queue, g_labels, args): 
    args_ref = deepcopy(args) 
    terminated_channels = 0 
    active_jobs, active_args = [], [] 
while True: 
    channel, item = queue.get() 

    if item == 'terminate_channel': 
     terminated_channels += 1 
     print " Gateway closed: {}".format(channel.gateway.id) 
     if terminated_channels == len(mch): 
      print "\nAll jobs done.\n" 
      break 
     continue 

    if item != "ready": 
     mch_id_completed = get_mch_id(g_labels, channel.gateway.id) 
     depopulate_list(active_jobs, mch_id_completed, active_args) 
     print " Gateway {} channel id {} returned:".format(
       channel.gateway.id, mch_id_completed) 
     print " {}".format(item) 

    if not args: 
     print "\nNo more jobs to submit, sending termination request...\n" 
     mch.send_each(None) 
     args = 'terminate_channel' 

    if args and \ 
     args != 'terminate_channel': 
     arg = args.pop(0) 
     idx = args_ref.index(arg) 
     channel.send(arg) # arg is copied by value to the remote side of 
          # channel to be executed. Maybe blocked if the 
          # sender queue is full. 

     # Get the id of current channel used to submit a job, 
     # this id can be used to refer mch[id] to terminate a job later. 
     mch_id_active = get_mch_id(g_labels, channel.gateway.id) 
     print "Job {}: {}! submitted to gateway {}, channel id {}".format(
       idx, arg, channel.gateway.id, mch_id_active) 
     populate_list(active_jobs, mch_id_active, 
         active_args, arg) 


def populate_list(jobs, job_active, args, arg_active): 
    jobs.append(job_active) 
    args.append(arg_active) 

def depopulate_list(jobs, job_completed, args): 
    i = jobs.index(job_completed) 
    jobs.pop(i) 
    args.pop(i) 


if __name__ == '__main__': 
    main() 

제출 스크립트이며, 여기 내 job.py 스크립트입니다 :

#!/usr/bin/env python 
import os, sys 
import socket 
import time 
import numpy as np 
import pickle, cPickle 
import random 
import job 


def hostname(): 
    return socket.gethostname() 

def working_dir(): 
    return os.getcwd() 

def listdir(path): 
    return os.listdir(path) 

def fac(arg): 
    return np.math.factorial(arg) 

def dump(arg): 
    path = working_dir() + '/out' 
    if not os.path.exists(path): 
     os.mkdir(path) 
    f_path = path + '/fac_{}.txt'.format(arg) 
    t_0 = time.time() 
    num = fac(arg)         # Main operation 
    t_1 = time.time() 
    cPickle.dump(num, open(f_path, "w"), protocol=2) # Main operation 
    t_2 = time.time() 
    duration_0 = "{:.4f}".format(t_1 - t_0) 
    duration_1 = "{:.4f}".format(t_2 - t_1) 
    #num2 = cPickle.load(open(f_path, "rb")) 
    return '--Calculation: {} s, dumping: {} s'.format(
      duration_0, duration_1) 


if __name__ == '__channelexec__': 
    channel.send("ready") 

    for arg in channel: 
     if arg is None: 
      break 
     elif str(arg).isdigit(): 
      channel.send((
        str(arg)+'!', 
        job.hostname(), 
        job.dump(arg) 
       )) 
     else: 
      print 'Warnning! arg sent should be number | None' 

답변

0

예, 당신이 바로 그 트랙에 있습니다. psutil 라이브러리를 사용하여 프로세스를 관리하고 자신의 pid 등을 찾습니다. 그리고 그들을 죽이십시오. 어디서나 bash를 사용할 필요가 없습니다. 파이썬이이 모든 것을 다룹니다.

또는 마스터가 그렇게 말할 때 스크립트를 종료하도록 프로그래밍하십시오. 이렇게하는 것이 일반적입니다. 원할 경우/끝내기 전에 다른 스크립트를 시작하도록 만들 수도 있습니다. 또는 다른 프로세스에서 수행 할 작업이 동일하면 현재 작업을 중지하고 스크립트를 종료하지 않고 새 작업을 시작하십시오.

그리고 제안을 할 수 있다면. 파일을 한 줄씩 읽지 않고 전체 파일을 읽은 다음 * .splitlines()를 사용하십시오. 작은 파일을 청크로 읽으 려하면 IO를 고문합니다. * .strip()도 필요하지 않습니다. 그리고 사용하지 않은 가져 오기도 제거해야합니다.