2017-12-28 24 views
1

다소 큰 (~ 20 GB) 분할 데이터 세트가 쪽모 세공 형식으로 있습니다. pyarrow을 사용하여 데이터 세트의 특정 파티션을 읽으 려합니다. pyarrow.parquet.ParquetDataset으로이 작업을 수행 할 수 있다고 생각했지만 그럴 것 같지 않습니다. 다음은 내가 원하는 것을 설명하기위한 작은 예입니다. pyarrow를 사용하여 분할 된 쪽모 세공 데이터 세트에서 특정 파티션 읽기

는 임의의 데이터 집합을 만들려면 :

from collections import OrderedDict 
from itertools import product, chain 
from uuid import uuid4 
import os 
from glob import glob 

import numpy as np 
import pandas as pd 
import pyarrow as pa 
from pyarrow.parquet import ParquetWriter, ParquetDataset 


def get_partitions(basepath, partitions): 
    """Generate directory hierarchy for a paritioned dataset 

    data 
    ├── part1=foo 
    │ └── part2=True 
    ├── part1=foo 
    │ └── part2=False 
    ├── part1=bar 
    │ └── part2=True 
    └── part1=bar 
     └── part2=False 

    """ 
    path_tmpl = '/'.join(['{}={}'] * len(partitions)) # part=value 
    path_tmpl = '{}/{}'.format(basepath, path_tmpl) # part1=val/part2=val 

    parts = [product([part], vals) for part, vals in partitions.items()] 
    parts = [i for i in product(*parts)] 
    return [path_tmpl.format(*tuple(chain.from_iterable(i))) for i in parts] 


partitions = OrderedDict(part1=['foo', 'bar'], part2=[True, False]) 
parts = get_partitions('data', partitions) 
for part in parts: 
    # 3 columns, 5 rows 
    data = [pa.array(np.random.rand(5)) for i in range(3)] 
    table = pa.Table.from_arrays(data, ['a', 'b', 'c']) 
    os.makedirs(part, exist_ok=True) 
    out = ParquetWriter('{}/{}.parquet'.format(part, uuid4()), 
         table.schema, flavor='spark') 
    out.write_table(table) 
    out.close() 

내가 파티션 하나를 위해 모든 값을 읽을 수 및 pandas.read_parquet와 파티션 2 만 사실, 그것이 가능하지, 난 항상 전체 열을 읽을 수 있습니다.

parts2 = OrderedDict(part1=['foo', 'bar'], part2=[True]) 
parts2 = get_partitions('data', parts2) 
files = [glob('{}/*'.format(dirpath)) for dirpath in parts2] 
files = [i for i in chain.from_iterable(files)] 
df2 = ParquetDataset(files).read().to_pandas() 

중 하나가 작동하지 않습니다 :

>>> df2.columns 
Index(['a', 'b', 'c'], dtype='object') 

내가 이렇게 pyspark 쉽게이 작업을 수행 할 수 있습니다

def get_spark_session_ctx(appName): 
    """Get or create a Spark Session, and the underlying Context.""" 
    from pyspark.sql import SparkSession 
    spark = SparkSession.builder.appName(appName).getOrCreate() 
    sc = spark.sparkContext 
    return (spark, sc) 


spark, sc = get_spark_session_ctx('test') 
spark_df = spark.read.option('basePath', 'data').parquet(*parts2) 
df3 = spark_df.toPandas() 

아래에서 볼 수 있듯이 나는 pyarrow와 함께 다음과 같은 시도 :

>>> df3.columns 
Index(['a', 'b', 'c', 'part1', 'part2'], dtype='object') 

pyarrow 또는 pandas으로이를 수행 할 수 있습니까? 아니면 맞춤 구현이 필요합니까?

업데이트 : 웨스 (Wes)의 요청에 따라이 내용은 JIRA입니다.

답변

1

질문 : pyarrow를 사용하여 분할 된 쪽모 세공 데이터 세트에서 특정 파티션을 읽으려면 어떻게해야합니까?

답변 : 지금 당장은 할 수 없습니다.

https://issues.apache.org/jira에이 기능을 요청하는 Apache Arrow JIRA를 생성 할 수 있습니까?

이것은 우리가 pyarrow API에서 지원할 수 있어야하지만 누군가 구현해야합니다. 감사합니다.

+0

감사합니다. 레드와 글쓰기 모두에 대한 기능 요청을 만들어야한다고 생각합니다. (놓치지 않았다면). 요즘 자유 시간이 많아요. 누군가 나를 안내하면 구현 작업도 할 수 있습니다. – suvayu