2016-10-25 3 views
0

안녕하세요,Dask - 대용량 메모리 사용으로 인한 Rechunk 또는 배열 슬라이스?

저는 Dask 프로세싱 체인에서 과도한 (또는 가능하지 않은) 메모리 사용을 이해하는 데 도움을 청했습니다.

문제는 다음과 같은 기능의 실행으로부터 온다 :

def create_fft_arrays(master_array, fft_size, overlap): 

    input_shape = master_array.shape[0] 
    # Determine zero pad length 
    zero_len = fft_size - ((input_shape - fft_size) % ((1-overlap) * fft_size)) 

    zeros = da.zeros((zero_len, master_array.shape[1]), 
        dtype = master_array.dtype, 
        chunks = (zero_len, master_array.shape[1])) 
    # Create the reshaped array 
    reshape_array = da.concatenate((master_array, zeros), axis = 0) 
    # Create an index series to use to index the reshaped array for re-blocking. 
    fft_index = np.arange(0, reshape_array.shape[0] - (fft_size -1), fft_size * overlap) 
    # Break reshape_array into fft size chunks 
    fft_arrays = [reshape_array[x:x + fft_size] for x in fft_index] 

    # Returns list of dask arrays 
    return [array.rechunk(array.shape) for array in fft_arrays] 

어디 master_array 메모리 (703,이 경우에 포인트 57,600,001)에 저장하기에 너무 큰 Dask Array이다. 최소한의 예를 들어

는 다음 컨텍스트에서 코드를 넣으려면

import dask.array as da 
import numpy as np 

def create_fft_arrays(master_array, fft_size, overlap): 

    input_shape = master_array.shape[0] 
    # Determine zero pad length 
    zero_len = fft_size - ((input_shape - fft_size) % ((1-overlap) * fft_size)) 

    zeros = da.zeros((zero_len, master_array.shape[1]), 
        dtype = master_array.dtype, 
        chunks = (zero_len, master_array.shape[1])) 
    # Create the reshaped array 
    reshape_array = da.concatenate((master_array, zeros), axis = 0) 
    # Create an index series to use to index the reshaped array for re-blocking. 
    fft_index = np.arange(0, reshape_array.shape[0] - (fft_size -1), fft_size * overlap) 
    # Break reshape_array into fft size chunks 
    fft_arrays = [reshape_array[x:x + fft_size] for x in fft_index] 

    # Returns list of dask arrays 
    return [array.rechunk(array.shape) for array in fft_arrays] 

# Fabricate an input array of the same shape and size as the problematic dataset 
master_array = da.random.normal(10, 0.1, size = (703, 57600001), chunks = (703, 372)) 

# Execute the create_fft_arrays function 
fft_arrays = create_fft_arrays(master_array.T, 2**15, 0.5) 

아래의 전체 코드와 같은 메모리 사용을 원인, 다음 코드의 실행은 최대 내 RAM (20GB의)를 발생 마지막 줄 fft_arrays = create_fft_arrays(master_array.T, FFT_SIZE, 0.5) 실행 아웃 때이 후

import dask.array as da 

import h5py as h5 
import numpy as np 

import os 

FORMAT = '.h5' 
DSET_PATH = '/DAS/Data' 
TSET_PATH = '/DAS/Time' 

FFT_SIZE = 2**15 
OVERLAP = 0.5 

input_dir = r'D:\' 
file_paths = [] 

# Get list of all valid files in directory 
for dir_name, sub_dir, f_name in os.walk(input_dir): 
    for f in f_name: 
     if f[-1*len(FORMAT):] == FORMAT: 
      file_paths.append(os.path.join(dir_name, f)) 

#H5 object for each file 
file_handles = [h5.File(f_path, 'r') for f_path in file_paths] 

# Handle for dataset and timestamps from each file 
dset_handles = [f[DSET_PATH] for f in file_handles] 
tset_handles = [f[TSET_PATH] for f in file_handles] 

# Create a Dask Array object for each dataset and timestamp set 
dset_arrays = [da.from_array(dset, chunks = dset.chunks) for dset in dset_handles] 
tset_arrays = [da.from_array(tset, chunks = tset.chunks) for tset in tset_handles] 

# Concatenate all datasets along along the time axis 
master_array = da.concatenate(dset_arrays, axis = 1) 

def create_fft_arrays(master_array, fft_size, overlap): 

    input_shape = master_array.shape[0] 
    # Determine zero pad length 
    zero_len = fft_size - ((input_shape - fft_size) % ((1-overlap) * fft_size)) 

    zeros = da.zeros((zero_len, master_array.shape[1]), 
        dtype = master_array.dtype, 
        chunks = (zero_len, master_array.shape[1])) 
    # Create the reshaped array 
    reshape_array = da.concatenate((master_array, zeros), axis = 0) 
    # Create an index series to use to index the reshaped array for re-blocking. 
    fft_index = np.arange(0, reshape_array.shape[0] - (fft_size -1), fft_size * overlap) 
    # Break reshape_array into fft size chunks 
    fft_arrays = [reshape_array[x:x + fft_size] for x in fft_index] 

    # Returns list of dask arrays 
    return [array.rechunk(array.shape) for array in fft_arrays] 

# Break master_array into FFT sized arrays with a single chunk in each 
fft_arrays = create_fft_arrays(master_array.T, FFT_SIZE, 0.5) 

을, 나는이 FFT 배열의 각각의 주파수 응답을 계산하는 da.fft.fft 방법을 사용에 갈 것입니다. 이 좋은 그래서

어떤 도움이나 권고 사항을 크게 감상 할 수

,

조지

+0

[mcve] (http://stackoverflow.com/help/mcve)를 제작할 수 있다면 더 빨리 응답 할 수 있습니다. – MRocklin

+0

수정되었으므로 아마도 조금 더 명확해질 수 있습니다. –

+0

줄 단위로 살펴보면, 이것은 많은 양의 메모리를 사용하는 줄처럼 보입니다. fft_arrays = [reshape_array [x : x + FFT_SIZE] for x in fft_index]. 조각 작업이 원인입니까? –

답변

0

마스터 배열은 매우 많은 덩어리

>>> master_array = da.random.normal(10, 0.1, size = (703, 57600001), chunks = (703, 372)) 
>>> master_array.npartitions 
154839 

각 청크에 대한 일부 관리 오버 헤드가있다 숫자를 이보다 다소 작게 유지하십시오. 이 section on chunksdask.array documentation

이 배열을 수천 번 슬라이싱하려고하면 병목 현상이 발생합니다.

청크 크기를 늘리면 문제가 다소 해결 될 수 있습니다. 위에 링크 된 문서는 몇 가지 권장 사항을 제공합니다.

+0

조언 해 주셔서 감사합니다. –

+0

조언 해 주셔서 감사합니다. 나는 그 청크 크기를 h5 청크 크기로 시작했습니다. 궁극적으로 디스크의 데이터 IO가 병목 현상이 될 수 있지만이 워크 플로 확장을 EC2 인스턴스에서 실행하도록 Dask를 실험하고 싶습니다. 'master_array'의 덩어리 크기를 올리면 적어도 그래프를 만들 수 있습니다. 도와 주셔서 감사합니다. –