2017-09-15 7 views
1

다음 코드 단편을 가지고 있습니다.데이터 프레임 필터 작업에 대한 매개 변수로 RDD 목록 사용

from pyspark import SparkContext 
from pyspark.sql import SparkSession 
from pyspark.sql.types import * 

sc = SparkContext() 
spark = SparkSession.builder.appName("test").getOrCreate() 

schema = StructType([                   
     StructField("name", StringType(), True), 
     StructField("a", StringType(), True), 
     StructField("b", StringType(), True), 
     StructField("c", StringType(), True), 
     StructField("d", StringType(), True), 
     StructField("e", StringType(), True), 
     StructField("f", StringType(), True)]) 

arr = [("Alice", "1", "2", None, "red", None, None), \ 
     ("Bob", "1", None, None, None, None, "apple"), \ 
     ("Charlie", "2", "3", None, None, None, "orange")] 

df = spark.createDataFrame(arr, schema) 
df.show() 

#+-------+---+----+----+----+----+------+ 
#| name| a| b| c| d| e|  f| 
#+-------+---+----+----+----+----+------+ 
#| Alice| 1| 2|null| red|null| null| 
#| Bob| 1|null|null|null|null| apple| 
#|Charlie| 2| 3|null|null|null|orange| 
#+-------+---+----+----+----+----+------+ 

는 지금, 나는이 같은입니다 RDD :

{'c,d,e': ['Bob', 'Charlie'], 'f': ['Alice']} 
: 위의 예에서

lrdd = sc.parallelize([['a', 'b'], ['c', 'd', 'e'], ['f']]) 

내 목표는 속성의 빈 부분 집합이 이름을 찾는 것입니다, 즉,

이제는 목록을 수집 한 다음 데이터 프레임을 쿼리하는 하위 집합을 순환하는 다소 모호한 솔루션을 얻었습니다.

def build_filter_condition(l): 
    return ' AND '.join(["({} is NULL)".format(x) for x in l]) 

res = {} 
for alist in lrdd.collect(): 
    cond = build_filter_condition(alist) 
    p = df.select("name").where(cond) 
    if p and p.count() > 0: 
     res[','.join(alist)] = p.rdd.map(lambda x: x[0]).collect() 

print(res) 

매우 효과적이지만 매우 비효율적입니다. 대상 속성 스키마가 10000 속성과 비슷하기 때문에 lrdd에 600 개 이상의 분리 된 목록이 생성됩니다.

그래서, 내 질문은 : 효율적으로 SQL 데이터 프레임 쿼리에 대한 매개 변수로 분산 된 컬렉션의 내용을 사용하는 방법? 힌트를 보내 주시면 감사하겠습니다.

대단히 감사합니다.

답변

0

이 방법을 시도해 볼 수 있습니다.

우선 crossjoin 둘 dataframes

from pyspark.sql.types import * 
    lrdd = sc.parallelize([['a', 'b'], ['c', 'd', 'e'], ['f']]). 
         map(lambda x: ("key", x)) 

    schema = StructType([StructField("K", StringType()), 
         StructField("X", ArrayType(StringType()))]) 

    df2 = spark.createDataFrame(lrdd, schema).select("X") 
    df3 = df.crossJoin(df2) 

결과 crossjoin의

지금
+-------+---+----+----+----+----+------+---------+ 
| name| a| b| c| d| e|  f|  X| 
+-------+---+----+----+----+----+------+---------+ 
| Alice| 1| 2|null| red|null| null| [a, b]| 
| Alice| 1| 2|null| red|null| null|[c, d, e]| 
| Alice| 1| 2|null| red|null| null|  [f]| 
| Bob| 1|null|null|null|null| apple| [a, b]| 
|Charlie| 2| 3|null|null|null|orange| [a, b]| 
| Bob| 1|null|null|null|null| apple|[c, d, e]| 
| Bob| 1|null|null|null|null| apple|  [f]| 
|Charlie| 2| 3|null|null|null|orange|[c, d, e]| 
|Charlie| 2| 3|null|null|null|orange|  [f]| 
+-------+---+----+----+----+----+------+---------+ 

원하는 출력을 얻을

from pyspark.sql.functions import udf, struct, collect_list 

def foo(data): 

    d = list(filter(lambda x: data[x], data['X'])) 
    print(d) 
    if len(d)>0: 
     return(False) 
    else: 
     return(True) 

udf_foo = udf(foo, BooleanType()) 

df4 = df3.filter(udf_foo(struct([df3[x] for x in df3.columns]))).select("name", 'X') 



df4.show() 
+-------+---------+ 
| name|  X| 
+-------+---------+ 
| Alice|  [f]| 
| Bob|[c, d, e]| 
|Charlie|[c, d, e]| 
+-------+---------+ 

그럼 GROUPBY 및 collect_list를 사용하여 UDF를 사용하여 행 걸러

df4.groupby("X").agg(collect_list("name").alias("name")).show() 
+--------------+---------+ 
| name  |  X| 
+--------------+---------+ 
| [ Alice]  |  [f]| 
|[Bob, Charlie]|[c, d, e]| 
+--------------+---------+ 
+0

와우. 이것은 굉장한 것처럼 보인다. 나는 crossJoin을 생각하지 않았다. 감사. 불행히도 가장 큰 시나리오에서는'org.apache.spark.sql.catalyst.expressions.GeneratedClass $ SpecificUnsafeRowJoiner가 64KB를 넘어서서 커질 것 '입니다. [자세한 내용] (https://issues.apache.org/jira/browse/SPARK-16845). 나는 당신의 접근 방식을 더 조사 할 것이다. 다시 한번 감사드립니다. –

1

데이터 형식을 다시 고려해야합니다.

import pyspark.sql.functions as psf 
df = df.select(
    "name", 
    psf.explode(
     psf.array(
      *[psf.struct(
       psf.lit(c).alias("feature_name"), 
       df[c].alias("feature_value") 
      ) for c in df.columns if c != "name"] 
     ) 
    ).alias("feature") 
).select("name", "feature.*") 

    +-------+------------+-------------+ 
    | name|feature_name|feature_value| 
    +-------+------------+-------------+ 
    | Alice|   a|   1| 
    | Alice|   b|   2| 
    | Alice|   c|   null| 
    | Alice|   d|   red| 
    | Alice|   e|   null| 
    | Alice|   f|   null| 
    | Bob|   a|   1| 
    | Bob|   b|   null| 
    | Bob|   c|   null| 
    | Bob|   d|   null| 
    | Bob|   e|   null| 
    | Bob|   f|  apple| 
    |Charlie|   a|   2| 
    |Charlie|   b|   3| 
    |Charlie|   c|   null| 
    |Charlie|   d|   null| 
    |Charlie|   e|   null| 
    |Charlie|   f|  orange| 
    +-------+------------+-------------+ 

우리는 lrdd와 동일한 기능을 수행 할 수 있습니다하지만 우리는 먼저 그것을 약간 수정 한 것입니다 : 대신 너무 많은 열을 가진 당신은 explode는 분산 계산을 할 수 있도록 더 라인을 얻을 수 있어야

subsets = spark\ 
    .createDataFrame(lrdd.map(lambda l: [l]), ["feature_set"])\ 
    .withColumn("feature_name", psf.explode("feature_set")) 

    +-----------+------------+ 
    |feature_set|feature_name| 
    +-----------+------------+ 
    |  [a, b]|   a| 
    |  [a, b]|   b| 
    | [c, d, e]|   c| 
    | [c, d, e]|   d| 
    | [c, d, e]|   e| 
    |  [f]|   f| 
    +-----------+------------+ 

이제이 필터를 feature_name에 가입시키고 feature_setname에있는 필터 중 feature_value이 유일하게 null입니다. lrdd 테이블이 너무 큰되지 않은 경우는 broadcast한다 그것은

df_join = df.join(psf.broadcast(subsets), "feature_name") 
res = df_join.groupBy("feature_set", "name").agg(
    psf.count("*").alias("count"), 
    psf.sum(psf.isnull("feature_value").cast("int")).alias("nb_null") 
).filter("nb_null = count") 

    +-----------+-------+-----+-------+ 
    |feature_set| name|count|nb_null| 
    +-----------+-------+-----+-------+ 
    | [c, d, e]|Charlie| 3|  3| 
    |  [f]| Alice| 1|  1| 
    | [c, d, e]| Bob| 3|  3| 
    +-----------+-------+-----+-------+ 

할 수 있습니다 항상 groupByfeature_set 이후

+0

당신은 내 하루를 보냈습니다! 나는 이것을 "받아 들인 답"으로 표시하고있다. 물론 필자는 입력 데이터의 크기 때문에'groupBy'를 피해야하는데, 드라이버 오버로드가 발생합니다. 어쨌든, 이것은 확실히 시작하기에 합당한 접근법입니다. 감사. –

+0

내가 도와 줘서 다행이다 :) – MaFF