나는 차단하지 않고 이전에 how to run several autobahn.ApplicationSession
instances from within the same python process에게 질문했습니다.신호를 사용하여 Autobahn ApplicationRunner(). run()을 정상적으로 종료합니다 .SIGINT
문제가 해결되었지만 새로운 문제가 발생했습니다.
이러한 mp.Process
인스턴스를 종료하는 것은 어렵습니다. ApplicationRunner.run()
의 코드가 KeyboardInterrupt
에 존재한다는 것을 알고 있지만 올바르게 트리거 할 수 없었습니다.
샘플 코드 :
class PoloniexSession(ApplicationSession):
@coroutine
def onJoin(self, *args, **kwargs):
channel = self.config.extra['channel']
def onTicker(*args, **kwargs):
self.config.extra['queue'].put((channel, (args, kwargs, time.time())))
try:
yield from self.subscribe(onTicker, self.config.extra['channel'])
except Exception as e:
raise
class PlnxEndpoint(mp.Process):
def __init__(self, endpoint, q, **kwargs):
super(PlnxEndpoint, self).__init__(name='%s Endpoint Process' %
endpoint, **kwargs)
self.endpoint = endpoint
self.q = q
def run(self):
self.runner = ApplicationRunner("wss://api.poloniex.com:443", 'realm1',
extra={'channel': self.endpoint,
'queue': self.q})
self.runner.run(PoloniexSession)
def join(self, *args, **kwargs):
def sig_handler(x, y):
pass
signal.signal(signal.SIGINT, sig_handler)
super(PlnxEndpoint, self).join(*args, **kwargs)
class PoloniexWSS(WSSAPI):
def __init__(self, endpoints=None):
super(PoloniexWSS, self).__init__(None, 'Poloniex')
self.data_q = mp.Queue()
self.connections = {}
if endpoints:
self.endpoints = endpoints
else:
r = requests.get('https://poloniex.com/public?command=returnTicker')
self.endpoints = list(r.json().keys())
self.endpoints.append('ticker')
for endpoint in self.endpoints:
self.connections[endpoint] = PlnxEndpoint(endpoint, self.data_q)
def start(self):
super(PoloniexWSS, self).start()
for conn in self.connections:
self.connections[conn].start()
def stop(self):
for conn in self.connections:
self.connections[conn].join()
super(PoloniexWSS, self).stop()
이 적절 self.q
을 채우는 동안, 나는 여전히 내 서브 프로세스가 중지 오류가 나타납니다
RuntimeError: Event loop stopped before Future completed.
Traceback (most recent call last):
File "/home/nils/anaconda3/lib/python3.5/multiprocessing /process.py", line 254, in _bootstrap
self.run()
File "/home/nils/git/tools/bitexwss/bitexws//api/poloniex.py", line 46, in run
self.runner.run(PoloniexSession)
File "/home/nils/anaconda3/lib/python3.5/site-packages/autobahn-0.14.1-py3.5.egg/autobahn/asyncio/wamp.py", line 172, in run
loop.run_until_complete(protocol._session.leave())
File "/home/nils/anaconda3/lib/python3.5/asyncio/base_events.py", line 335, in run_until_complete
raise RuntimeError('Event loop stopped before Future completed.')
signal.SIGINT
어디 트리거되지 않습니다 내 있는게 틀림 없어 나는 그것을 원한다.
According to the source code of ApplicationRunner.run()
, SIGINT
/KeyboardInterrupt
은 serve_forever()
메소드를 정상적으로 종료해야합니다.
asyncio.event_loop
결과를 폐쇄 :
class PlnxEndpoint(mp.Process):
#...
def join(self, *args, **kwargs):
loop = get_event_loop()
loop.stop()
super(PlnxEndpoint, self).join(*args, **kwargs)
#...
이전 질문에서이 문제를 처리해야합니다.이 질문을 닫으십시오. – stovfl
흠, 나는 동의하지 않는다. 그것은 결국 별개의 문제입니다. '2 개의 ApplicationRunner(). run() 메소드를 별도의 프로세스에서 실행하기'v''별도의 프로세스에서 ApplicationRunner 닫기하기 ''. 그들은 의심 할 여지없이 관련이 있지만 여전히 두 가지 문제가 있습니다. – nlsdfnbch