2

열쇠로 두 개의 RDD를 결합하는 방법을 찾고 있습니다.PySpark를 사용하여 두 개의 RDD에 대한 완전한 외부 조인을 올바르게 수행하는 방법은 무엇입니까?

을 감안할 때 :

x = sc.parallelize([('_guid_YWKnKkcrg_Ej0icb07bhd-mXPjw-FcPi764RRhVrOxE=', 'FR', '75001'), 
       ('_guid_XblBPCaB8qx9SK3D4HuAZwO-1cuBPc1GgfgNUC2PYm4=', 'TN', '8160'), 
       ] 
      ) 
y = sc.parallelize([('_guid_oX6Lu2xxHtA_T93sK6igyW5RaHH1tAsWcF0RpNx_kUQ=', 'JmJCFu3N'), 
       ('_guid_hG88Yt5EUsqT8a06Cy380ga3XHPwaFylNyuvvqDslCw=', 'KNPQLQth'), 
       ('_guid_YWKnKkcrg_Ej0icb07bhd-mXPjw-FcPi764RRhVrOxE=', 'KlGZj08d'), 
       ] 
      ) 

나는 해결책을 발견! 그럼에도 불구하고,이 솔루션은 내가하고 싶은 일에 대해 완전히 만족스럽지 않습니다. 제공

def get_keys(rdd): 

    new_x = rdd.map(lambda item: (item[0], (item[1], item[2]))) 
    return new_x 

new_x = get_keys(x) 

: 다음

[('_guid_YWKnKkcrg_Ej0icb07bhd-mXPjw-FcPi764RRhVrOxE=', ('FR', '75001')), 
('_guid_XblBPCaB8qx9SK3D4HuAZwO-1cuBPc1GgfgNUC2PYm4=', ('TN', '8160'))] 

:

new_x.union(y).map(lambda (x, y): (x, [y])).reduceByKey(lambda p, q : p + q).collect() 

나는 "X"라는 내 RDD에 적용됩니다 내 키를 지정하기 위해 함수를 생성 결과 :

[('_guid_oX6Lu2xxHtA_T93sK6igyW5RaHH1tAsWcF0RpNx_kUQ=', ['JmJCFu3N']), 
('_guid_YWKnKkcrg_Ej0icb07bhd-mXPjw-FcPi764RRhVrOxE=', [('FR', '75001'), 'KlGZj08d']), 
('_guid_XblBPCaB8qx9SK3D4HuAZwO-1cuBPc1GgfgNUC2PYm4=', [('TN', '8160')]), 
('_guid_hG88Yt5EUsqT8a06Cy380ga3XHPwaFylNyuvvqDslCw=', ['KNPQLQth'])] 

내가 갖고 싶은 것은 :

[('_guid_oX6Lu2xxHtA_T93sK6igyW5RaHH1tAsWcF0RpNx_kUQ=', (None, None, 'JmJCFu3N')), 
('_guid_YWKnKkcrg_Ej0icb07bhd-mXPjw-FcPi764RRhVrOxE=', ('FR', '75001', 'KlGZj08d')), 
('_guid_XblBPCaB8qx9SK3D4HuAZwO-1cuBPc1GgfgNUC2PYm4=', ('TN', '8160', None)), 
('_guid_hG88Yt5EUsqT8a06Cy380ga3XHPwaFylNyuvvqDslCw=', (None, None, 'KNPQLQth'))] 

Help?

답변

2

왜 안 되니?

>>> new_x.fullOuterJoin(y) 

또는

>>> x.toDF().join(y.toDF(), ["_1"], "fullouter").rdd 
+0

LostInOverflow :.. x.toDF()에 가입 (y.toDF(), "_1", "fullouter")를 rdd.collect()이 제공 : 행 (_1 = u'_guid_YWKnKkcrg_Ej0icb07bhd-mXPjw-FcPi764RRhVrOxE = ', _2 = u'FR', _3 = u'75001 ', _2 = u'KlGZj08d')]. new_x.fullOuterJoin (y)가 더 좋습니다. – DataAddicted

+0

[('_guid_oX6Lu2xxHtA_T93sK6igyW5RaHH1tAsWcF0RpNx_kUQ ='(없음 'JmJCFu3N')), ('_guid_YWKnKkcrg_Ej0icb07bhd-mXPjw-FcPi764RRhVrOxE =' >>> new_x.fullOuterJoin (Y) ======> (("FR" '75001') 'KlGZj08d')), ('_guid_XblBPCaB8qx9SK3D4HuAZwO-1cuBPc1GgfgNUC2PYm4 =' (('TN', '8160'), 없음)) ('_guid_hG88Yt5EUsqT8a06Cy380ga3XHPwaFylNyuvvqDslCw ='(없음 'KNPQLQth'))] – DataAddicted

+0

(('FR', '75001'), 'KlGZj08d')에 'FR'과 '75001'을 구분하는 방법이 있습니까? – DataAddicted