2017-12-30 22 views
0

우분투에서 jupyter를 사용하고 있습니다. 그래서 내가 다음 문제가있어다중 키와 단일 값에 대해 pyspark에서 reduceByKey를 사용하는 방법

이 내 코드입니다 :

from pyspark import SparkContext 
sc = SparkContext.getOrCreate() 
ut = sc.textFile("hdfs://localhost:54310/hduser/firstnames") 
rows= ut.map(lambda line: line.split(";")) 
res = rows.filter(lamda row: row[2] >= "2000" and row[2] <= "2004") 
res = res.map(lambda row: ({row[1],row[2]},int(row[3]))) 

출력 :

[({'2001', 'Brussel'}, 113), 
({'2001', 'Vlaanderen'}, 16), 
({'2002', 'Brussel'}, 12)] 

I :

[({'2001', 'Brussel'}, 9), 
({'2001', 'Brussel'}, 104), 
({'2001', 'Vlaanderen'}, 16), 
({'2002', 'Brussel'}, 12), ...] 

내가처럼 내 출력이 필요합니다 전에 reduceByKey를 사용하여 몇 가지 작업을 시도했으며 은 reduceByKey, bu에 대해 많은 질문을 보았습니다. 그것을 알 수 없었다. 미리 감사드립니다.

답변

0

A list as a key for PySpark's reduceByKey에 의해 zero323에서 설명한 바와 같이, 키는 해시 방법을 구현해야합니다. 당신은 tuples를 사용할 수 있습니다

>>> from operator import add 
... 
... sc.parallelize([ 
...  (('2001', 'Brussel'), 9), (('2001', 'Brussel'), 104), 
...  (('2001', 'Vlaanderen'), 16), (('2002', 'Brussel'), 12) 
... ]).reduceByKey(add).take(2) 
... 
[(('2002', 'Brussel'), 12), (('2001', 'Brussel'), 113)] 

교체 :

res.map(lambda row: ({row[1],row[2]},int(row[3]))) 

res.map(lambda row: ((row[1], row[2]), int(row[3]))) 

또는 set frozenset로 교체 :

>>> sc.parallelize([ 
...  (frozenset(['2001', 'Brussel']), 9), (frozenset(['2001', 'Brussel']), 104), 
...  (frozenset(['2001', 'Vlaanderen']), 16), (frozenset(['2002', 'Brussel']), 12) 
... ]).reduceByKey(add).take(2) 

[(frozenset({'2002', 'Brussel'}), 12), (frozenset({'2001', 'Brussel'}), 113)] 
+0

감사합니다! 이제 잘 작동합니다! –