0

나는 하위 프로세스를 생성하고, Future를 사용하여 결과를 받아야하며, 필요할 때 그 중 일부를 죽여야한다.다중 처리. 프로세스를 concurrent.future._base.Future와 통합하기

이 때문에 나는 다중 처리. 서브 클래스를 처리하고 start() 메소드에서 Future 객체를 리턴한다.

문제는 내가 결코 호출되지 않기 때문에 cb() 함수에서 결과를 수신 할 수 없다는 것입니다.

이 방법이 현재 구현에서 누락되었거나 다른 방법으로 수행 될 수 있다면 도와주십시오.

을 수행하면 내 현재 접근하여 시작 방법에

from multiprocessing import Process, Queue 
from concurrent.futures import _base 
import threading 
from time import sleep 


def foo(x,q): 
    print('result {}'.format(x*x)) 
    result = x*x 
    sleep(5) 
    q.put(result) 


class MyProcess(Process): 

    def __init__(self, target, args): 
     super().__init__() 
     self.target = target 
     self.args = args 
     self.f = _base.Future() 

    def run(self): 
     q = Queue() 
     worker_thread = threading.Thread(target=self.target, args=(self.args+ (q,))) 
     worker_thread.start() 
     r = q.get(block=True) 
     print('setting result {}'.format(r)) 
     self.f.set_result(result=r) 
     print('done setting result') 

    def start(self): 
     f = _base.Future() 
     run_thread = threading.Thread(target=self.run) 
     run_thread.start() 
     return f 


def cb(future): 
    print('received result in callback {}'.format(future)) 


def main(): 
    p1 = MyProcess(target=foo, args=(2,)) 
    f = p1.start() 
    f.add_done_callback(fn=cb) 

    sleep(10) 


if __name__ == '__main__': 

    main() 

    print('Main thread dying') 

답변

1

당신은 당신이 다음 돌아가 새로운 미래를 만들 수 있습니다. 이것은 다른 미래이며 결과를 설정 한 미래입니다.이 미래는 전혀 사용되지 않습니다. 시도 :

def start(self): 
    run_thread = threading.Thread(target=self.run) 
    run_thread.start() 
    return self.f 

그러나 코드에 더 많은 문제가 있습니다. 프로세스의 start 메소드를 대체하여 작업자 스레드의 실행으로 대체하므로 실제로 다중 처리를 우회합니다. 또한 _base 모듈을 가져 오지 않아야합니다. 즉 맨 앞에있는 밑줄에서 볼 수있는 구현 세부 사항입니다. concurrent.futures.Future (동일한 클래스이지만 공개 API를 통해 가져 오기)해야합니다.

이 정말 다중 사용

from multiprocessing import Process, Queue 
from concurrent.futures import Future 
import threading 
from time import sleep 


def foo(x,q): 
    print('result {}'.format(x*x)) 
    result = x*x 
    sleep(5) 
    q.put(result) 

class MyProcess(Process): 

    def __init__(self, target, args): 
     super().__init__() 
     self.target = target 
     self.args = args 
     self.f = Future() 

    def run(self): 
     q = Queue() 
     worker_thread = threading.Thread(target=self.target, args=(self.args+ (q,))) 
     worker_thread.start() 
     r = q.get(block=True) 
     print('setting result {}'.format(r)) 
     self.f.set_result(result=r) 
     print('done setting result') 

def cb(future): 
    print('received result in callback {}: {}'.format(future, future.result())) 

def main(): 
    p1 = MyProcess(target=foo, args=(2,)) 
    p1.f.add_done_callback(fn=cb) 
    p1.start() 
    p1.join() 
    sleep(10) 

if __name__ == '__main__': 
    main() 
    print('Main thread dying') 

그리고 당신이 정말로 필요는 없습니다 대상 기능을 실행하기 위해 작업자 스레드를 산란, 이제 새로운 과정에서 이미있어, 당신은 당신의 목표 기능을 실행할 수 있습니다 직접 대신. 대상 함수가 알 수없는 예외를 발생시키는 경우 콜백은 성공시에만 호출됩니다. 당신이 것을 해결하는 경우, 당신은 왼쪽하고 그래서 :

from multiprocessing import Process 
from concurrent.futures import Future 
import threading 
from time import sleep 


def foo(x): 
    print('result {}'.format(x*x)) 
    result = x*x 
    sleep(5) 
    return result 

class MyProcess(Process): 

    def __init__(self, target, args): 
     super().__init__() 
     self.target = target 
     self.args = args 
     self.f = Future() 

    def run(self): 
     try: 
      r = self.target(*self.args) 
      print('setting result {}'.format(r)) 
      self.f.set_result(result=r) 
      print('done setting result') 
     except Exception as ex: 
      self.f.set_exception(ex) 

def cb(future): 
    print('received result in callback {}: {}'.format(future, future.result())) 

def main(): 
    p1 = MyProcess(target=foo, args=(2,)) 
    p1.f.add_done_callback(fn=cb) 
    p1.start() 
    p1.join() 
    sleep(10) 

if __name__ == '__main__': 
    main() 
    print('Main thread dying') 

이것은 ProcessPoolExecutor이 무엇을 기본적으로.

+0

답장을 보내 주셔서 감사합니다. 이것은 정말 바보 같은 실수였습니다. 주의 해 주셔서 감사합니다! :) –