2

다음 목표를 달성하는 가장 효율적인 방법은 무엇입니까 : 주 프로세스는 이벤트를 수집 및 배포 (이벤트 루프 run_forever) 하위 프로세스. 이 서브 프로세스들은 살아남아 외부 세계로부터 신호를 수집하거나 신호를 소비하고 CPU 바인딩 작업을 수행합니다. 나는 위가 충분히 우아한/효율적인 아니라고 느낌이와 내가 뭔가를 누락 그러나파이썬 asyncio 영원히 실행 및 프로세스 간 통신

import os 
import time 
import signal 
import asyncio 
from asyncio import PriorityQueue 
from multiprocessing import Process, Pipe 

class Event(object): 
    __slots__ = ['type','priority','payload','timestamp'] 
    def __init__(self, _type = None, _priority = None, _payload = None): 
     self.type, self.priority, self.payload, self.timestamp = _type, _priority, _payload, time.time() 
    def __str__(self): 
     return "%s(%s,%s,%s)" % (self.__class__.__name__, self.type, self.priority, self.payload) 
    def __lt__(self, other): 
     return (self.priority, self.timestamp) > (other.priority, other.timestamp) 

class EventQueue(PriorityQueue): 
    def _put(self, event): 
     super()._put((event.priority, event)) 

@asyncio.coroutine 
def consumeAnyEvent(eq, acon_write_p = None): ## more args with write_conn pipes 
    while True: 
     priority, event = yield from eq.get() 
     print("consumed",event) 
     if event.type == 'sig_a': 
      acon_write_p.send(event) 
     if event.type == 'sig_b': 
      pass 
     if event.type == 'sig_c': 
      pass 
     ## and so on - broadcast events to relevant sub-processes 
     yield from asyncio.sleep(0) 

@asyncio.coroutine 
def produceSignalA(eq,read_p): 
    while True: 
     yield from asyncio.sleep(0) 
     row = read_p.recv() 
     if row: 
      yield from eq.put(Event('sig_a', payload = row)) 

class someSource(object): 
    """db, http, file watch or other io""" 
    def fetch(self): 
     pass 

def someSlowMethod(a=None): 
    """cpu-bound operations""" 
    pass 

def signalAPublisher(pipe): 
    read_p, write_p = pipe 
    read_p.close() 
    s = someSource() 
    while True: 
     result = s.fetch() 
     if result: 
      write_p.send(result) 

def signalAConsumer(pipe): 
    read_p, write_p = pipe 
    while True: 
     inp = read_p.recv()   
     if inp: 
      result = someSlowMethod(inp) 
      write_p.send(result) 

def main(): 
    ## main process is responsible for handling events: 
    ## colllecting from all signal publisher subprocessses 
    ## and broadcasting to all interested consumer subprocesses 
    eventQueue = EventQueue()  
    apub_read_p, apub_write_p = Pipe() 
    acon_read_p, acon_write_p = Pipe() 
    ## more pipes for Signal B, ... Signal Z 
    signalAPublisher_p = Process(target=signalAPublisher, args=((apub_read_p, apub_write_p),))  
    signalAConsumer_p = Process(target=signalAPublisher, args=((acon_read_p, acon_write_p),)) 
    signalAPublisher_p.start() 
    signalAConsumer_p.start() 
    ## and so on for Signal B, Signal C, ... Signal Z 
    loop = asyncio.get_event_loop() 
    try: 
     tasks = asyncio.gather(
      loop.create_task(produceSignalA(eventQueue,apub_read_p)), 
      loop.create_task(consumeAnyEvent(eventQueue,acon_write_p))  
     ) 
     loop.run_forever() 
    except KeyboardInterrupt: 
     print("Caught keyboard interrupt. Canceling tasks...") 
     tasks.cancel() 
    finally: 
     loop.close() 
     os.kill(signalAPublisher_p.pid, signal.SIGTERM) 
     os.kill(signalAConsumer_p.pid, signal.SIGTERM) 
     ## kill for Signal B, ... Signal Z 

if __name__ == '__main__': 
    main() 

: 지금까지, 나는 이런 식으로 뭔가를 내놓았다. 어떤 아이디어, 제안?

+1

http://zeromq.org/ –

+0

@PadraicCunningham의 통찰력에 감사드립니다 - 공부하러갔습니다! 파이썬에 통합하는 것이 얼마나 고통 스럽습니까? 이상적으로 표준 파이썬 라이브러리를 넘어서고 싶지 않습니다. – Nicholas

답변

2

시작으로, 그렇지 않으면 그냥 큐없이 asyncio에서 일반 async def/async for/await을 사용하십시오 CPU 바운드 절차를 배포 할 ProcessPoolExecutorrun_in_executor()를 사용해보십시오.

import asyncio 
import time 
from concurrent.futures.process import ProcessPoolExecutor 

import random 


async def coro_a(n): 
    print("> a", n) 
    await asyncio.sleep(random.uniform(0.1, 1)) 
    result = await asyncio.gather(coro_b(n), 
            loop.run_in_executor(None, slow_method_c, n)) 
    print("< a", n, result) 


async def coro_b(n): 
    print("> b", n) 
    await asyncio.sleep(random.uniform(0.1, 1)) 
    result = await loop.run_in_executor(None, slow_method_d, n) 
    print("< b", n, result) 
    return ("B", result) 


def slow_method_c(n): 
    print("> c", n) 
    time.sleep(random.uniform(0.5, 5)) 
    print("< c", n) 
    return ("C", n) 


def slow_method_d(n): 
    print("> d", n) 
    time.sleep(random.uniform(0.5, 5)) 
    print("< d", n) 
    return ("D", n) 


async def main_producer(): 
    tasks = [] 
    for i in range(10): 
     tasks.append(asyncio.ensure_future(coro_a(i + 1))) 
     await asyncio.sleep(1) 
    await asyncio.wait(tasks) 


loop = asyncio.get_event_loop() 
loop.set_default_executor(ProcessPoolExecutor()) 
loop.run_until_complete(main_producer()) 
loop.close()