2017-11-23 11 views
0

저는 CSV에서 일부 처리를 수행하기 위해 스케줄러와 4 개의 작업자 노드를 설정했습니다. csv의 크기는 300MB입니다.나는 dask 데이터 프레임에서 지속되는 결과 인 미래의 컬렉션을 가지고 있습니다. 지연된 작업을 수행하는 방법은 무엇입니까?

distributed.worker - WARNING - Compute Failed 
Function: create_sep_futures 
args:  ('PHG',  symbol col_3 col_2 \ 
0    A   1.451261e+09    23.512857 
1    A   1.451866e+09    23.886857 
2    A   1.452470e+09    25.080429 

kwargs: {} 
Exception: KeyError(False,) 

내 가정은 근로자가에서 전체 dataframe 및 쿼리를 얻을해야한다는 것입니다 :

df = dd.read_csv('/Downloads/tmpcrnin5ta',assume_missing=True) 

df = df.groupby(['col_1','col_2']).agg('mean').reset_index() 
df = client.persist(df) 



def create_sep_futures(symbol,df):  

    symbol_df = copy.deepcopy(df[df['symbol' == symbol]]) 

    return symbol_df 
lazy_values = [delayed(create_sep_futures)(symbol, df) for symbol in st] 

future = client.compute(lazy_values) 
result = client.gather(future) 

일 목록 내가이 오류가 1000 개 요소

내가 이렇게

가 포함되어 있습니다. 그러나 나는 그것이 단지 블록을 얻고 그것을하려고 생각합니다.

어떤 해결 방법이 있습니까? 데이터 프레임 청크는 이미 작업자 메모리에 있기 때문에 나는 각 작업자에게 데이터 프레임을 옮기고 싶지 않다.

답변

0

데이터 프레임 구문 및 API를 사용하여 데이터 프레임을 조작하는 것은 기본적으로 지연 (지연)되므로 더 이상 수행하지 않아도됩니다.

첫 번째 문제 : 구문이 잘못되었습니다 df[df['symbol' == symbol]] =>df[df['symbol'] == symbol]입니다. 이것이 False 키의 근원입니다.

그래서 당신은 아마 찾고있는 솔루션 : 당신이을 경우

future = client.compute(df[df['symbol'] == symbol]) 

당신이 정상적인 기능을 사용하고 돌봐 df.map_partitions로 볼 수 별도로 덩어리에 작업 할 데이터 또는 지연/선물 또는 df.to_delayed을 전달하면 지연된 기능과 함께 사용할 수있는 일련의 지연된 개체가 제공됩니다.