2013-11-25 5 views
0

간단한 multiprocessing 작업 관리 시스템을 구성하기 시작했으며 대기열 (아래 표시된 대기열 communications)을 사용하는 프로세스간에 사용자 정의 개체 (아래 표시된 Message)의 사용자 정의 개체를 보내는 데 어려움이 있습니다. 대기열을 사용하는 프로세스간에이 객체 (또는 유사한 객체)가 전송되도록 올바른 방향으로 나를 안내 할 수 있습니까? 코드에 표시된 모든 접근법에 대해 비판적인 의견을 제시해주십시오.파이썬 다중 처리에서 대기열을 사용하는 프로세스간에 사용자 정의 객체를 전달하는 방법은 무엇입니까?

#!/usr/bin/env python 

import multiprocessing 
from multiprocessing import Queue 
import os 
import signal 
import time 

class Message: 
    """ 
    This class acts as a container for process information. The conventions for 
    the various member variables are as follows: 

    - messageType (specification of message type) 
     - "status" 
     - "result" 
     - "error" 
    - sender (name of sender process) 
     - str(os.getpid()) 
     - "janus" 
    - recipient (name of recipient process) 
     - str(os.getpid()) 
     - "all" 
     - "janus" 
    - senderStatus (specification of sender status) 
     - "running" 
     - "halt" 
     - "complete" 
     - "waiting" 
    - controlMessage (control message for recipient process) 
     - "start" 
     - "terminate" 
     - "report" 
     - "pause" 
    - resultObject (some object containing results) 
     - example: tuple 
    - naturalLanguageMessage (natural language message, e.g. human-readable) 
     - human readable string 
    - timeStamp (message timestamp) 
     - time.time() 
    """ 
    messageType="status" 
    sender="unknown" 
    recipient="all" 
    senderStatus="running" 
    controlMessage="null" 
    resultObject="null" 
    naturalLanguageMessage="This is natural language text." 
    timeStamp=time.time() 
    def set(
     self, 
     messageType, 
     sender, 
     recipient, 
     senderStatus, 
     controlMessage, 
     resultObject, 
     naturalLanguageMessage, 
     timeStamp=time.time() 
     ): 
     """ 
     This function sets the values of the member variables all at once. 
     """ 
     self.messageType=messageType 
     self.sender=sender 
     self.recipient=recipient 
     self.senderStatus=senderStatus 
     self.controlMessage=controlMessage 
     self.resultObject=resultObject 
     self.naturalLanguageMessage=naturalLanguageMessage 
     def timeStamp(self): 
      # Update the timestamp in the timestamp member variable. 
      self.timeStamp=time.time() 
     def printout(self): 
      # Print a dictionary of all member variables. 
      #print('-'*80) 
      #print("message content:") 
      printHR(vars(self)) 
      #print('-'*80) 
def printHR(object): 
    """ 
    This function prints a specified object in a human readable way. 
    """ 
    # dictionary 
    if isinstance(object, dict): 
     for key, value in sorted(object.items()): 
      print u'{0}: {1}'.format(key, value) 
    # list or tuple 
    elif isinstance(object, list) or isinstance(object, tuple): 
     for element in object: 
      print element 
    # other 
    else: 
     print object 

def initialise_process(): 
    signal.signal(signal.SIGINT, signal.SIG_IGN) 

def work1(): 
    processID=os.getpid() 
    time.sleep(3) 
    print " work function: work run by process number %d" % (os.getpid()) 
    # prepare message 
    message=Message() 
    message.set(
     "status", 
     str(processID), 
     "janus", 
     "running", 
     "null", 
     "null", 
     "work running" 
    ) 
    # send message 
    communications.put(message) 

def workFunctionTest(testString): 
    processID=os.getpid() 
    print("test string:") 
    print(testString) 
    # prepare message 
    message=Message() 
    message.set(
      "status", 
      str(processID), 
      "janus", 
      "running", 
      "null", 
      "null", 
      "work running") 
    # send message 
    communications.put(message) 
    # do work 
    time.sleep(3) 

def janus(
    workFunction=workFunctionTest, 
    numberOfJobs=1, 
    numberOfProcesses=4 
    ): 
    # printout of multiprocessing specifications 
    print("\nJANUS MULTIPROCESSING JOB SYSTEM\n") 
    print(" multiprocessing specifications:") 
    print(" number of jobs: %s" % (str(numberOfJobs))) 
    print(" number of processes: %s" % (str(numberOfProcesses))) 
    print(" work to complete: %s" % (str(workFunction))) 
    #print(" arguments for work function: " %s (str(workFunctionArguments))) 

    # create process pool 
    print(" initialising process pool...") 
    pool1 = multiprocessing.Pool(numberOfProcesses, initialise_process) 
    print(" pool created: %s" % (str(pool1))) 

    # create message queue for interprocess communications 
    print(" initialising interprocess communications queue...") 
    communications=Queue() 
    print(" queue created: %s" % (str(communications))) 

    # send work to pool 
    print(" applying work to pool...") 
    print(" applying each of %s jobs," % (str(numberOfJobs))) 
    for jobIndex in range(numberOfJobs): 
     print("  applying work function %s as job %s..." 
      % (str(workFunction), jobIndex)) 
     pool1.apply_async(workFunction) 

    # monitor processes 

    # check messages 
    while True: 
     time.sleep(3) 
     if communications.empty() == True: 
      print(" checking for messages... no messages") 
     elif communications.empty() == False: 
      buffer=communications.get() 
     print('-'*80) 
      print("new message:") 
      print buffer 
     print('-'*80) 
      break 
     else: 
      print(" fail") 
    # monitor 
    try: 
     print " jobs running..." 
     time.sleep(10) 
    except KeyboardInterrupt: 
     print " termination command received\nterminating processes..." 
     pool1.terminate() 
     pool1.join() 
    else: 
     print " jobs complete\nterminating..." 
     pool1.close() 
     pool1.join() 

def main(): 
    print('-'*80) 
    janus(work1, 5) 
    print '-'*80 

if __name__ == "__main__": 
    main() 
+0

"어려움"이란 정확히 무엇을 의미합니까? 너무 막연하다. 예외가 발생합니까? 손상된 개체를 만드시겠습니까? 다른 것? –

+0

'''janus''' 함수에서 메시지를 검사하는 지점이 있습니다. 즉, "통신"이 비어 있지 않은지 여부를 확인한다. 작업 함수'''work1'''에 의해 대기열에 정보를 저장하려고 시도 했음에도 불구하고 대기열이 계속 비어있는 것처럼 보입니다. 왜 이런 일이 일어나고 있는지 모르겠습니다. – d3pd

답변

0

체크 아웃 파이썬 celery project

셀러리는 분산 메시지 전달을 기반 비동기 작업 큐/작업 큐입니다.