2017-03-16 11 views
1

나는 차단하지 않고 이전에 how to run several autobahn.ApplicationSession instances from within the same python process에게 질문했습니다.신호를 사용하여 Autobahn ApplicationRunner(). run()을 정상적으로 종료합니다 .SIGINT

문제가 해결되었지만 새로운 문제가 발생했습니다.

이러한 mp.Process 인스턴스를 종료하는 것은 어렵습니다. ApplicationRunner.run()의 코드가 KeyboardInterrupt에 존재한다는 것을 알고 있지만 올바르게 트리거 할 수 없었습니다.

샘플 코드 :

class PoloniexSession(ApplicationSession): 

    @coroutine 
    def onJoin(self, *args, **kwargs): 
     channel = self.config.extra['channel'] 

     def onTicker(*args, **kwargs): 
      self.config.extra['queue'].put((channel, (args, kwargs, time.time()))) 

     try: 
      yield from self.subscribe(onTicker, self.config.extra['channel']) 

     except Exception as e: 
      raise 


class PlnxEndpoint(mp.Process): 
    def __init__(self, endpoint, q, **kwargs): 
     super(PlnxEndpoint, self).__init__(name='%s Endpoint Process' % 
               endpoint, **kwargs) 
     self.endpoint = endpoint 
     self.q = q 

    def run(self): 
     self.runner = ApplicationRunner("wss://api.poloniex.com:443", 'realm1', 
            extra={'channel': self.endpoint, 
              'queue': self.q}) 
     self.runner.run(PoloniexSession) 

    def join(self, *args, **kwargs): 
     def sig_handler(x, y): 
      pass 
     signal.signal(signal.SIGINT, sig_handler) 
     super(PlnxEndpoint, self).join(*args, **kwargs) 


class PoloniexWSS(WSSAPI): 
    def __init__(self, endpoints=None): 
     super(PoloniexWSS, self).__init__(None, 'Poloniex') 
     self.data_q = mp.Queue() 
     self.connections = {} 
     if endpoints: 
      self.endpoints = endpoints 
     else: 
      r = requests.get('https://poloniex.com/public?command=returnTicker') 
      self.endpoints = list(r.json().keys()) 
      self.endpoints.append('ticker') 

     for endpoint in self.endpoints: 
      self.connections[endpoint] = PlnxEndpoint(endpoint, self.data_q) 

    def start(self): 
     super(PoloniexWSS, self).start() 
     for conn in self.connections: 
      self.connections[conn].start() 

    def stop(self): 
     for conn in self.connections: 
      self.connections[conn].join() 
     super(PoloniexWSS, self).stop() 

이 적절 self.q을 채우는 동안, 나는 여전히 내 서브 프로세스가 중지 오류가 나타납니다

RuntimeError: Event loop stopped before Future completed. 
    Traceback (most recent call last): 
    File "/home/nils/anaconda3/lib/python3.5/multiprocessing /process.py", line 254, in _bootstrap 
    self.run() 
    File "/home/nils/git/tools/bitexwss/bitexws//api/poloniex.py", line 46, in run 
    self.runner.run(PoloniexSession) 
    File "/home/nils/anaconda3/lib/python3.5/site-packages/autobahn-0.14.1-py3.5.egg/autobahn/asyncio/wamp.py", line 172, in run 
    loop.run_until_complete(protocol._session.leave()) 
    File "/home/nils/anaconda3/lib/python3.5/asyncio/base_events.py", line 335, in run_until_complete 
    raise RuntimeError('Event loop stopped before Future completed.') 

signal.SIGINT 어디 트리거되지 않습니다 내 있는게 틀림 없어 나는 그것을 원한다.

According to the source code of ApplicationRunner.run(), SIGINT/KeyboardInterruptserve_forever() 메소드를 정상적으로 종료해야합니다.

수동으로뿐만 아니라 위의 오류에 asyncio.event_loop 결과를 폐쇄 :

class PlnxEndpoint(mp.Process): 
#... 
    def join(self, *args, **kwargs): 
     loop = get_event_loop() 
     loop.stop() 
     super(PlnxEndpoint, self).join(*args, **kwargs) 
#... 
+0

이전 질문에서이 문제를 처리해야합니다.이 질문을 닫으십시오. – stovfl

+0

흠, 나는 동의하지 않는다. 그것은 결국 별개의 문제입니다. '2 개의 ApplicationRunner(). run() 메소드를 별도의 프로세스에서 실행하기'v''별도의 프로세스에서 ApplicationRunner 닫기하기 ''. 그들은 의심 할 여지없이 관련이 있지만 여전히 두 가지 문제가 있습니다. – nlsdfnbch

답변

0
손보는 주위의 약간은 오히려 간단한 솔루션 굴복 Afterall는,

: multiprocessing.Event()를, 내가 정상적으로 종료 할 수 있었다 사용

을 내 과정.

class PoloniexSession(ApplicationSession): 

    @coroutine 
    def onJoin(self, *args, **kwargs): 
     channel = self.config.extra['channel'] 

     def onTicker(*args, **kwargs): 
      self.config.extra['queue'].put((channel, (args, kwargs, time.time()))) 

     if self.config.extra['is_killed'].is_set(): 
      raise KeyboardInterrupt() 
     try: 
      yield from self.subscribe(onTicker, self.config.extra['channel']) 

     except Exception as e: 
      raise 


class PlnxEndpoint(mp.Process): 
    def __init__(self, endpoint, q, **kwargs): 
     super(PlnxEndpoint, self).__init__(name='%s Endpoint Process' % 
               endpoint, **kwargs) 
     self.endpoint = endpoint 
     self.q = q 
     self.is_killed = mp.Event() 

    def run(self): 
     self.runner = ApplicationRunner("wss://api.poloniex.com:443", 'realm1', 
            extra={'channel': self.endpoint, 
              'queue': self.q, 
              'is_killed': self.is_killed}) 
     self.runner.run(PoloniexSession) 

    def join(self, *args, **kwargs): 
     self.is_killed.set() 
     super(PlnxEndpoint, self).join(*args, **kwargs) 
+0

'def join()'은 이제 권장대로 다시 사용됩니다. 그러나 왜 당신이'join()'안에서'process'를 멈추려는지 아직 알 수 없습니다. – stovfl