다소 큰 (~ 20 GB) 분할 데이터 세트가 쪽모 세공 형식으로 있습니다. pyarrow
을 사용하여 데이터 세트의 특정 파티션을 읽으 려합니다. pyarrow.parquet.ParquetDataset
으로이 작업을 수행 할 수 있다고 생각했지만 그럴 것 같지 않습니다. 다음은 내가 원하는 것을 설명하기위한 작은 예입니다. pyarrow를 사용하여 분할 된 쪽모 세공 데이터 세트에서 특정 파티션 읽기
from collections import OrderedDict
from itertools import product, chain
from uuid import uuid4
import os
from glob import glob
import numpy as np
import pandas as pd
import pyarrow as pa
from pyarrow.parquet import ParquetWriter, ParquetDataset
def get_partitions(basepath, partitions):
"""Generate directory hierarchy for a paritioned dataset
data
├── part1=foo
│ └── part2=True
├── part1=foo
│ └── part2=False
├── part1=bar
│ └── part2=True
└── part1=bar
└── part2=False
"""
path_tmpl = '/'.join(['{}={}'] * len(partitions)) # part=value
path_tmpl = '{}/{}'.format(basepath, path_tmpl) # part1=val/part2=val
parts = [product([part], vals) for part, vals in partitions.items()]
parts = [i for i in product(*parts)]
return [path_tmpl.format(*tuple(chain.from_iterable(i))) for i in parts]
partitions = OrderedDict(part1=['foo', 'bar'], part2=[True, False])
parts = get_partitions('data', partitions)
for part in parts:
# 3 columns, 5 rows
data = [pa.array(np.random.rand(5)) for i in range(3)]
table = pa.Table.from_arrays(data, ['a', 'b', 'c'])
os.makedirs(part, exist_ok=True)
out = ParquetWriter('{}/{}.parquet'.format(part, uuid4()),
table.schema, flavor='spark')
out.write_table(table)
out.close()
내가 파티션 하나를 위해 모든 값을 읽을 수 및 pandas.read_parquet
와 파티션 2 만 사실, 그것이 가능하지, 난 항상 전체 열을 읽을 수 있습니다.
parts2 = OrderedDict(part1=['foo', 'bar'], part2=[True])
parts2 = get_partitions('data', parts2)
files = [glob('{}/*'.format(dirpath)) for dirpath in parts2]
files = [i for i in chain.from_iterable(files)]
df2 = ParquetDataset(files).read().to_pandas()
중 하나가 작동하지 않습니다 :
>>> df2.columns
Index(['a', 'b', 'c'], dtype='object')
내가 이렇게 pyspark
쉽게이 작업을 수행 할 수 있습니다
def get_spark_session_ctx(appName):
"""Get or create a Spark Session, and the underlying Context."""
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName(appName).getOrCreate()
sc = spark.sparkContext
return (spark, sc)
spark, sc = get_spark_session_ctx('test')
spark_df = spark.read.option('basePath', 'data').parquet(*parts2)
df3 = spark_df.toPandas()
아래에서 볼 수 있듯이 나는 pyarrow
와 함께 다음과 같은 시도 :
>>> df3.columns
Index(['a', 'b', 'c', 'part1', 'part2'], dtype='object')
pyarrow
또는 pandas
으로이를 수행 할 수 있습니까? 아니면 맞춤 구현이 필요합니까?
업데이트 : 웨스 (Wes)의 요청에 따라이 내용은 JIRA입니다.
감사합니다. 레드와 글쓰기 모두에 대한 기능 요청을 만들어야한다고 생각합니다. (놓치지 않았다면). 요즘 자유 시간이 많아요. 누군가 나를 안내하면 구현 작업도 할 수 있습니다. – suvayu