다음 코드 단편을 가지고 있습니다.데이터 프레임 필터 작업에 대한 매개 변수로 RDD 목록 사용
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import *
sc = SparkContext()
spark = SparkSession.builder.appName("test").getOrCreate()
schema = StructType([
StructField("name", StringType(), True),
StructField("a", StringType(), True),
StructField("b", StringType(), True),
StructField("c", StringType(), True),
StructField("d", StringType(), True),
StructField("e", StringType(), True),
StructField("f", StringType(), True)])
arr = [("Alice", "1", "2", None, "red", None, None), \
("Bob", "1", None, None, None, None, "apple"), \
("Charlie", "2", "3", None, None, None, "orange")]
df = spark.createDataFrame(arr, schema)
df.show()
#+-------+---+----+----+----+----+------+
#| name| a| b| c| d| e| f|
#+-------+---+----+----+----+----+------+
#| Alice| 1| 2|null| red|null| null|
#| Bob| 1|null|null|null|null| apple|
#|Charlie| 2| 3|null|null|null|orange|
#+-------+---+----+----+----+----+------+
는 지금, 나는이 같은입니다 RDD :
{'c,d,e': ['Bob', 'Charlie'], 'f': ['Alice']}
: 위의 예에서
lrdd = sc.parallelize([['a', 'b'], ['c', 'd', 'e'], ['f']])
내 목표는 속성의 빈 부분 집합이 이름을 찾는 것입니다, 즉,
이제는 목록을 수집 한 다음 데이터 프레임을 쿼리하는 하위 집합을 순환하는 다소 모호한 솔루션을 얻었습니다.
def build_filter_condition(l):
return ' AND '.join(["({} is NULL)".format(x) for x in l])
res = {}
for alist in lrdd.collect():
cond = build_filter_condition(alist)
p = df.select("name").where(cond)
if p and p.count() > 0:
res[','.join(alist)] = p.rdd.map(lambda x: x[0]).collect()
print(res)
매우 효과적이지만 매우 비효율적입니다. 대상 속성 스키마가 10000 속성과 비슷하기 때문에 lrdd에 600 개 이상의 분리 된 목록이 생성됩니다.
그래서, 내 질문은 : 효율적으로 SQL 데이터 프레임 쿼리에 대한 매개 변수로 분산 된 컬렉션의 내용을 사용하는 방법? 힌트를 보내 주시면 감사하겠습니다.
대단히 감사합니다.
와우. 이것은 굉장한 것처럼 보인다. 나는 crossJoin을 생각하지 않았다. 감사. 불행히도 가장 큰 시나리오에서는'org.apache.spark.sql.catalyst.expressions.GeneratedClass $ SpecificUnsafeRowJoiner가 64KB를 넘어서서 커질 것 '입니다. [자세한 내용] (https://issues.apache.org/jira/browse/SPARK-16845). 나는 당신의 접근 방식을 더 조사 할 것이다. 다시 한번 감사드립니다. –