4

concurrent.futures는 여러 프로세스 (또는 스레드)에서 실행되도록 인수를 산세에 의존한다는 것을 이해했습니다. 논쟁의 복사본을 만들지 않아야합니까? 리눅스에서는 그렇게하지 않는 것 같습니다. 즉 명시 적으로 사본을 전달해야합니다. 지구에왜 concurrent.futures가 인수 사본을 작성하지 않습니까?

from __future__ import print_function 
import time 
import random 
try: 
    from concurrent import futures 
except ImportError: 
    import futures 


def work_with_rands(i, rands): 
    print('in function', i, rands) 


def main(): 
    random.seed(1) 
    rands = [random.randrange(100) for _ in range(10)] 

    # sequence 1 and sequence 2 should give the same results but they don't 
    # only difference is that one uses a copy of rands (i.e., rands.copy()) 
    # sequence 1 
    with futures.ProcessPoolExecutor() as ex: 
     for i in range(4): 
      print("<{}> rands before submission: {}".format(i, rands)) 
      ex.submit(work_with_rands, i, rands) 
      random.shuffle(rands) 

    print('-' * 30) 
    random.seed(1) 
    rands = [random.randrange(100) for _ in range(10)] 
    # sequence 2 
    print("initial sequence: ", rands) 
    with futures.ProcessPoolExecutor() as ex: 
     for i in range(4): 
      print("<{}> rands before submission: {}".format(i, rands)) 
      ex.submit(work_with_rands, i, rands[:]) 
      random.shuffle(rands) 

if __name__ == "__main__": 
    main() 

에서 오는 [97, 32, 17, 15, 57, 97, 63, 72, 60, 8]입니다 :

<0> rands before submission: [17, 72, 97, 8, 32, 15, 63, 97, 57, 60] 
<1> rands before submission: [97, 15, 97, 32, 60, 17, 57, 72, 8, 63] 
<2> rands before submission: [15, 57, 63, 17, 97, 97, 8, 32, 60, 72] 
<3> rands before submission: [32, 97, 63, 72, 17, 57, 97, 8, 15, 60] 
in function 0 [97, 15, 97, 32, 60, 17, 57, 72, 8, 63] 
in function 1 [97, 32, 17, 15, 57, 97, 63, 72, 60, 8] 
in function 2 [97, 32, 17, 15, 57, 97, 63, 72, 60, 8] 
in function 3 [97, 32, 17, 15, 57, 97, 63, 72, 60, 8] 

여기에 코드입니다 :

나는 다음과 같은 결과를 이해하기 위해 노력하고있어? submit에 전달 된 시퀀스 중 하나도 아닙니다.

결과는 모든 스레드와의 돌연변이에 같은 목록을 공유 파이썬 2

답변

1

을 기본적으로, ProcessPoolExecutor. 다른 스레드 (_queue_management_worker)와 공유되는 일부 "작업 항목"dict (절임 없음)에 함수 및 해당 인수를 넣고 해당 스레드가 해당 dict의 WorkItems를 실제 작업자 프로세스. http://hg.python.org/cpython/file/16207b8495bf/Lib/concurrent/futures/process.py#l6

이 전화를 제출 사이에 새 항목에 대한 알림을받을 수 _queue_management_worker 충분한 시간이없는 것으로 밝혀 :

동시 모듈 아키텍처를 설명하는 소스 코드의 주석이 있습니다.

이렇게 스레드는 항상 여기에서 대기합니다. (http://hg.python.org/cpython/file/16207b8495bf/Lib/concurrent/futures/process.py#l226) ProcessPoolExecutor.shutdown (ProcessPoolExecutor 컨텍스트에서 끝낼 때)에서만 깨어납니다.그런

첫 번째 순서에 약간의 지연을 넣을 경우 :

with futures.ProcessPoolExecutor() as ex: 
    for i in range(4): 
     print("<{}> rands before submission: {}".format(i, rands)) 
     ex.submit(work_with_rands, i, rands) 
     random.shuffle(rands) 
     time.sleep(0.01) 

당신이 볼, _queue_management_worker이 일어나 작업자 프로세스에 대한 호출을 전달하고 work_with_rands 다른 값을 인쇄 할 것이다.

2

에 따라 약간 다릅니다. 인쇄를 추가 할 때 다르게 작동하기 때문에 디버그하기가 어렵습니다. 이 [97, 32, 17, 15, 57, 97, 63, 72, 60, 8]shuffle 내부의 상태 여야합니다. shuffle은 목록 (모든 스레드에있는 동일한 목록)을 보유하고 두 번 이상 변경합니다. 스레드가 호출 될 때 상태는 [97, 32, 17, 15, 57, 97, 63, 72, 60, 8]입니다. 값은 내재적으로 복사되지 않고 다른 스레드에 복사되므로 언제 복사할지 보증 할 수 없습니다.

셔플이 완료되기 전에 생산 무엇 셔플의 예는 다음과 같습니다

[31, 64, 88, 7, 68, 85, 69, 3, 15, 47] # initial value (rands) 
# ex.submit() is called here 
# shuffle() is called here 
# shuffle starts changing rand to: 
[31, 64, 88, 47, 68, 85, 69, 3, 15, 7] 
[31, 64, 15, 47, 68, 85, 69, 3, 88, 7] 
[31, 64, 15, 47, 68, 85, 69, 3, 88, 7] 
[31, 64, 69, 47, 68, 85, 15, 3, 88, 7] 
[31, 64, 85, 47, 68, 69, 15, 3, 88, 7] # threads may be called here 
[31, 64, 85, 47, 68, 69, 15, 3, 88, 7] # or here 
[31, 64, 85, 47, 68, 69, 15, 3, 88, 7] # or here 
[31, 85, 64, 47, 68, 69, 15, 3, 88, 7] 
[85, 31, 64, 47, 68, 69, 15, 3, 88, 7] # value when the shuffle has finished 

셔플 소스 코드 :

그래서
def shuffle(self, x, random=None): 
    if random is None: 
     randbelow = self._randbelow 
     for i in reversed(range(1, len(x))): 
      # pick an element in x[:i+1] with which to exchange x[i] 
      j = randbelow(i+1) 
      x[i], x[j] = x[j], x[i] 
      # added this print here. that's what prints the output above 
      # your threads are probably being called when this is still pending 
      print(x) 
    ... other staff here 

귀하의 의견은 [17, 72, 97, 8, 32, 15, 63, 97, 57, 60]하고 당신의 출력은 셔플은 "단계가 [97, 15, 97, 32, 60, 17, 57, 72, 8, 63] 경우 그 중간에. " 당신의 스레드는 정말 열심히하기 때문에 바로 그걸 얻기 위해 스레드간에 데이터 공유를 방지하기 위해 일반적으로 시도에서, 돌연변이없이

예로은 "중간 단계"에서 호출되는 :

def work_with_rands(i, rands): 
    print('in function', i, rands) 


def foo(a): 
    random.seed(random.randrange(999912)/9) 
    x = [None]*len(a) 
    for i in a: 
     _rand = random.randrange(len(a)) 

     while x[_rand] is not None: 
      _rand = random.randrange(len(a)) 

     x[_rand] = i 
    return x 

def main(): 
    rands = [random.randrange(100) for _ in range(10)] 
    with futures.ProcessPoolExecutor() as ex: 
     for i in range(4): 
      new_rands = foo(rands) 
      print("<{}> rands before submission: {}".format(i, new_rands)) 
      ex.submit(work_with_rands, i, new_rands) 


<0> rands before submission: [84, 12, 93, 47, 40, 53, 74, 38, 52, 62] 
<1> rands before submission: [74, 53, 93, 12, 38, 47, 52, 40, 84, 62] 
<2> rands before submission: [84, 12, 93, 38, 62, 52, 53, 74, 47, 40] 
<3> rands before submission: [53, 62, 52, 12, 84, 47, 93, 40, 74, 38] 
in function 0 [84, 12, 93, 47, 40, 53, 74, 38, 52, 62] 
in function 1 [74, 53, 93, 12, 38, 47, 52, 40, 84, 62] 
in function 2 [84, 12, 93, 38, 62, 52, 53, 74, 47, 40] 
in function 3 [53, 62, 52, 12, 84, 47, 93, 40, 74, 38] 
+0

솔루션 (명시 적 사본)이 있습니다. 나는 그것이 (1) 피클 때 피클을 만들지 않는지, 피클 링인지 그리고 (2) 어디에서''[97, 32, 17, 15, 57, 97, 63, 72, 60, 8]''오고있다. – ariddell

+0

shuffle에서 온다 shuffle이 실제로 완료 될 때까지 shuffle이 여러 번 목록을 변경시킨다. 나는 혼란 스러울 수도 있다고 설명하기 위해 대답을 업데이트했다. –

+0

내 독서는 공유 된 데이터가 없다는 것이다. 절인하고 따로 보내. 그게 내 혼란의 원천이며 그 질문을 던지는 이유입니다. – ariddell