2012-02-13 3 views
7

그냥 실험하고 배우고, 여러 proceses를 사용하여 액세스 할 수있는 공유 사전을 만드는 방법을 알고 있지만 사전을 동기화하는 방법을 모르겠습니다. 은 내가 가진 문제를 설명합니다.다중 처리와 함께 defaultdict 사용 하시겠습니까?

from collections import defaultdict 
from multiprocessing import Pool, Manager, Process 

#test without multiprocessing 
s = 'mississippi' 
d = defaultdict(int) 
for k in s: 
    d[k] += 1 

print d.items() # Success! result: [('i', 4), ('p', 2), ('s', 4), ('m', 1)] 
print '*'*10, ' with multiprocessing ', '*'*10 

def test(k, multi_dict): 
    multi_dict[k] += 1 

if __name__ == '__main__': 
    pool = Pool(processes=4) 
    mgr = Manager() 
    multi_d = mgr.dict() 
    for k in s: 
     pool.apply_async(test, (k, multi_d)) 

    # Mark pool as closed -- no more tasks can be added. 
    pool.close() 

    # Wait for tasks to exit 
    pool.join() 

    # Output results 
    print multi_d.items() #FAIL 

print '*'*10, ' with multiprocessing and process module like on python site example', '*'*10 
def test2(k, multi_dict2): 
    multi_dict2[k] += 1 


if __name__ == '__main__': 
    manager = Manager() 

    multi_d2 = manager.dict() 
    for k in s: 
     p = Process(target=test2, args=(k, multi_d2)) 
    p.start() 
    p.join() 

    print multi_d2 #FAIL 

(그것 multiprocessing를 사용하지 않는 때문에),하지만 난 그것을 점점 문제가 multiprocessing와 함께 작동하는 데 첫 번째 결과가 작동합니다. 나는 그것을 해결하는 방법을 모르겠지만 동기화되지 않아서 (나중에 결과에 합류) 또는 어쩌면 multiprocessing 내에서 사전에 defaultdict(int)을 설정하는 방법을 알 수 없기 때문일 수 있습니다.

이 기능을 사용하는 방법에 대한 도움이나 제안 사항은 훌륭합니다!

답변

10

당신은 BaseManager를 서브 클래스 및 공유를위한 추가 유형을 등록 할 수 있습니다. 기본 AutoProxy 생성 유형이 작동하지 않는 경우 적합한 프록시 유형을 제공해야합니다. defaultdict의 경우 이미 dict에있는 속성에만 액세스해야한다면 DictProxy을 사용할 수 있습니다.

from multiprocessing import Pool 
from multiprocessing.managers import BaseManager, DictProxy 
from collections import defaultdict 

class MyManager(BaseManager): 
    pass 

MyManager.register('defaultdict', defaultdict, DictProxy) 

def test(k, multi_dict): 
    multi_dict[k] += 1 

if __name__ == '__main__': 
    pool = Pool(processes=4) 
    mgr = MyManager() 
    mgr.start() 
    multi_d = mgr.defaultdict(int) 
    for k in 'mississippi': 
     pool.apply_async(test, (k, multi_d)) 
    pool.close() 
    pool.join() 
    print multi_d.items() 
+1

와우, 감사합니다. 나는 당신의 수정을 정말로 이해하지 못한다. MyManager (BaseManager) 클래스의 목적은 무엇인가? – Lostsoul

+0

@Lostsoul Manager가 지원하는 것보다 다른 유형의 공유에 대한 지원을 추가하는 것은 [문서화 된 방법] (http://docs.python.org/library/multiprocessing.html#customized-managers)입니다. –

+0

정말 고마워, 내가 공부할거야! – Lostsoul

2

글쎄, Manager 클래스는 프로세스간에 공유 될 수있는 미리 정의 된 데이터 구조의 고정 된 수만을 제공하는 것으로 보이며 defaultdict은 그 중 하나가 아닙니다. 당신이 정말로 하나의 defaultdict는, 가장 쉬운 해결책은 자신의 디폴트 동작을 구현하는 것이 필요한 경우 :

def test(k, multi_dict): 
    if k not in multi_dict: 
     multi_dict[k] = 0 
    multi_dict[k] += 1