2017-09-22 6 views
2

동일한 스키마를 가진 여러 쪽 parquet 파일을 단일 데이터 프레임으로로드하려면 dask를 사용해야합니다. 이것은 동일한 디렉토리에있을 때 모두 작동하지만 별도의 디렉토리에있을 때는 작동하지 않습니다.dask/fastparquet을 사용하여 여러 디렉토리에서 동일한 마루 파일을 읽는 방법

예를 들어

:

import fastparquet 
pfile = fastparquet.ParquetFile(['data/data1.parq', 'data/data2.parq']) 

작품을 잘하지만 다른 디렉토리에 data2.parq를 복사 할 경우, 다음이 작동하지 않습니다

pfile = fastparquet.ParquetFile(['data/data1.parq', 'data2/data2.parq']) 

내가 얻는 추적은 다음과 :

--------------------------------------------------------------------------- 
ValueError        Traceback (most recent call last) 
<ipython-input-11-b3d381f14edc> in <module>() 
----> 1 pfile = fastparquet.ParquetFile(['data/data1.parq', 'data2/data2.parq']) 

~/anaconda/envs/hv/lib/python3.6/site-packages/fastparquet/api.py in __init__(self, fn, verify, open_with, sep) 
    82   if isinstance(fn, (tuple, list)): 
    83    basepath, fmd = metadata_from_many(fn, verify_schema=verify, 
---> 84            open_with=open_with) 
    85    self.fn = sep.join([basepath, '_metadata']) # effective file 
    86    self.fmd = fmd 

~/anaconda/envs/hv/lib/python3.6/site-packages/fastparquet/util.py in metadata_from_many(file_list, verify_schema, open_with) 
    164  else: 
    165   raise ValueError("Merge requires all PaquetFile instances or none") 
--> 166  basepath, file_list = analyse_paths(file_list, sep) 
    167 
    168  if verify_schema: 

~/anaconda/envs/hv/lib/python3.6/site-packages/fastparquet/util.py in analyse_paths(file_list, sep) 
    221  if len({tuple([p.split('=')[0] for p in parts[l:-1]]) 
    222    for parts in path_parts_list}) > 1: 
--> 223   raise ValueError('Partitioning directories do not agree') 
    224  for path_parts in path_parts_list: 
    225   for path_part in path_parts[l:-1]: 

ValueError: Partitioning directories do not agree 

dask.dataframe.read_parquet을 사용할 때도 같은 오류가 발생합니다. 동일한 ParquetFile 개체를 사용한다고 가정합니다.

다른 디렉토리에서 여러 파일을로드하려면 어떻게해야합니까? 동일한 디렉토리에로드해야하는 모든 파일을 두는 것은 옵션이 아닙니다.

답변

3

이 절대 경로 또는 명시 적 상대 경로를 사용하는 경우, 마스터 fastparquet에서 작업을 수행합니다.

2

해결 방법은 각 청크를 별도로 읽고 dask.dataframe.from_delayed으로 전달하는 것입니다. 이는 read_parquet (정확히 'index' 이하 여야 함)과 동일한 메타 데이터 처리를 수행하지는 않지만 그렇지 않으면 작동해야합니다. 주요 ./에 대한

pfile = fastparquet.ParquetFile(['./data/data1.parq', './data2/data2.parq']) 

의 필요성 버그 고려되어야한다 - 문제를 참조하십시오

import dask.dataframe as dd  
from dask import delayed  
from fastparquet import ParquetFile 

@delayed 
def load_chunk(pth): 
    return ParquetFile(pth).to_pandas() 

files = ['temp/part.0.parquet', 'temp2/part.1.parquet'] 
df = dd.from_delayed([load_chunk(f) for f in files]) 

df.compute() 
Out[38]: 
    index a 
0  0 1 
1  1 2 
0  2 3 
1  3 4 
+0

github 문제 - https://github.com/dask/fastparquet/issues/217 – chrisb