2017-12-07 18 views
1

나는 풀에서 비동기 적으로 파일을 NumPy와로드하려고 :파이썬 멀티 apply_async는 "왼쪽 주장> 0"AssertionError를

self.pool = Pool(2, maxtasksperchild = 1) 
... 
nextPackage = self.pool.apply_async(loadPackages, (...)) 
for fi in np.arange(len(files)): 
    packages = nextPackage.get(timeout=30) 
    # preload the next package asynchronously. It will be available 
    # by the time it is required. 
    nextPackage = self.pool.apply_async(loadPackages, (...)) 

방법 "loadPackages"

def loadPackages(... (2 strings & 2 ints) ...): 
    print("This isn't printed!') 
    packages = { 
     "TRUE": np.load(gzip.GzipFile(path1, "r")), 
     "FALSE": np.load(gzip.GzipFile(path2, "r")) 
    } 
    return packages 

전에라도 최초의 " 패키지 "가로드되면 다음 오류가 발생합니다.

스레드의 예외 스레드 8 : 추적 (가장 최근 호출 마지막) :
파일 "C : \ Users \ roman \ Anaconda3 \ envs \ tsc1 \ lib \ threading.py", 줄 914, in _bootstrap_inner self.run() 파일 "C : \ Users \ roman \ Anaconda3 \ envs \ tsc1 \ user \ roman \ Anaconda3 \ envs \ tsc1 \ lib \ multiprocessing \ pool 파일의 내용은 다음과 같습니다. .py ", line 463, _handle_results task = get() 파일"C : \ Users \ roman \ Anaconda3 \ envs \ tsc1 \ lib \ multiprocessing \ connection.py "파일, 라인 250, recv buf = self .recv_bytes() 파일 "C : \ Users \ roman \ Anaconda3 \ envs \ tsc1 \ lib \ multiprocessing \ connection.py" 줄 318, _recv_bytes에 반환 self._get_more_data (ov, maxsize) 파일 "C : \ Users \ roman \ Anaconda3 \ envs \ ts C1 \ lib 디렉토리 \ 왼쪽 _get_more_data 어설에서 \ connection.py " 라인 (337)을, 멀티 프로세싱> 0 AssertionError를

나는 밀접하게 리소스를 모니터링 : 메모리가 문제가되지 않는 오류가 발생했을 때 난 여전히 많은 왼쪽이 . 압축 해제 된 파일은 단순한 다차원 numpy 배열입니다. 개별적으로, 더 간단한 방법으로 풀을 사용하고 작동하는 것처럼 파일을로드합니다. 조합으로 만 실패합니다. (이 모든 것은 커스텀 생성기에서 발생합니다.이 기능이 도움이 될지는 모르지만 누가 알 수 있습니다.) Python 3.5.

이 문제의 원인은 무엇입니까? 이 오류를 어떻게 해석 할 수 있습니까?

도움 주셔서 감사합니다.

+0

왼쪽에는 많은 RAM이 있어도 같은 문제가 있습니다. 문제의 해결책을 찾았습니까? –

답변

1

작은 조각으로 데이터를 검색하여 해결 방법을 찾은 것 같습니다. 내 경우에는 목록의 목록이었습니다.

for i in range(0, NUMBER_OF_THREADS): 
    print('MAIN: Getting data from process ' + str(i) + ' proxy...') 
    X_train.extend(ListasX[i]._getvalue()) 
    Y_train.extend(ListasY[i]._getvalue()) 
    ListasX[i] = None 
    ListasY[i] = None 
    gc.collect() 

가 변경 :

내가했다

CHUNK_SIZE = 1024 
for i in range(0, NUMBER_OF_THREADS): 
    print('MAIN: Getting data from process ' + str(i) + ' proxy...') 
    for k in range(0, len(ListasX[i]), CHUNK_SIZE): 
     X_train.extend(ListasX[i][k:k+CHUNK_SIZE]) 
     Y_train.extend(ListasY[i][k:k+CHUNK_SIZE]) 
    ListasX[i] = None 
    ListasY[i] = None 
    gc.collect() 

그리고 지금은 아마도 한 번에 적은 양의 데이터를 직렬화하여 작동하는 것 같다. 데이터를 작은 부분으로 나누면 문제를 극복 할 수 있습니다. 행운을 빕니다!

+0

귀하의 답변을 주셔서 감사합니다, 시스코! 아니요, 불행히도 여기에서 정확히 무슨 일이 일어 났는지에 대한 설명이 여전히 부족합니다. 나는 또한 다른 아키텍처로 그 주위에서 작업했습니다. 0보다 왼쪽의 어설 션은 파이프의 내용을 버퍼로 복사 할 때 발생합니다. "함수가 성공하면 반환 값은 0이 아닙니다." 그리고 GetLastError를 사용하면 의미있는 오류 메시지를 얻을 수 있습니다. 그러나 당분간이 문제를 해결할 방법을 찾았습니다. 해결 방법 일 뿐이므로 대답을 대답으로 표시하지 못하게 해주세요. – Doidel

+0

@Doidel 물론 도움이되는 겸손한 시도였습니다.] –