저는 CSV에서 일부 처리를 수행하기 위해 스케줄러와 4 개의 작업자 노드를 설정했습니다. csv의 크기는 300MB입니다.나는 dask 데이터 프레임에서 지속되는 결과 인 미래의 컬렉션을 가지고 있습니다. 지연된 작업을 수행하는 방법은 무엇입니까?
distributed.worker - WARNING - Compute Failed
Function: create_sep_futures
args: ('PHG', symbol col_3 col_2 \
0 A 1.451261e+09 23.512857
1 A 1.451866e+09 23.886857
2 A 1.452470e+09 25.080429
kwargs: {}
Exception: KeyError(False,)
내 가정은 근로자가에서 전체 dataframe 및 쿼리를 얻을해야한다는 것입니다 :
df = dd.read_csv('/Downloads/tmpcrnin5ta',assume_missing=True)
df = df.groupby(['col_1','col_2']).agg('mean').reset_index()
df = client.persist(df)
def create_sep_futures(symbol,df):
symbol_df = copy.deepcopy(df[df['symbol' == symbol]])
return symbol_df
lazy_values = [delayed(create_sep_futures)(symbol, df) for symbol in st]
future = client.compute(lazy_values)
result = client.gather(future)
일 목록 내가이 오류가 1000 개 요소
내가 이렇게
가 포함되어 있습니다. 그러나 나는 그것이 단지 블록을 얻고 그것을하려고 생각합니다.어떤 해결 방법이 있습니까? 데이터 프레임 청크는 이미 작업자 메모리에 있기 때문에 나는 각 작업자에게 데이터 프레임을 옮기고 싶지 않다.