2016-06-30 5 views
3

나는 사용자 로그 파일에 대한 탐색 데이터 분석을 위해 Spark을 사용하고 있습니다. 내가하고있는 분석 중 하나는 호스트 당 일일 평균 요청입니다. 따라서 평균을 계산하려면 DataFrame의 총 요청 열을 DataFrame의 고유 한 요청 열로 나누어야합니다.다른 데이터 프레임의 두 열을 나누기

total_req_per_day_df = logs_df.select('host',dayofmonth('time').alias('day')).groupby('day').count() 

avg_daily_req_per_host_df = total_req_per_day_df.select("day",(total_req_per_day_df["count"]/daily_hosts_df["count"]).alias("count")) 

이것은 평균을 결정하기 위해 PySpark를 사용하여 작성한 것입니다. daily_hosts_df 및 logs_df이 메모리에 캐시 : 그리고 여기에 내가

AnalysisException: u'resolved attribute(s) count#1993L missing from day#3628,count#3629L in operator !Project [day#3628,(cast(count#3629L as double)/cast(count#1993L as double)) AS count#3630]; 

주를 얻을 오류의 로그입니다. 두 데이터 프레임의 카운트 열을 어떻게 나누나요?

+1

누가 다운 투표 한 사람에게 : 만약 당신이 상관 없으면 이유를 적어주세요. 왜냐하면 나는 그 질문의 중복을 보지 못했기 때문이다. 그리고 비록 그것이 어리석은 실수이거나 적어도 나를 인도 할지라도, 나는 무엇을 놓치고 있습니까? – StackPointer

답변

15

다른 표에서 열을 참조 할 수 없습니다.

from pyspark.sql.functions import col 

(total_req_per_day_df.alias("total") 
    .join(daily_hosts_df.alias("host"), ["day"]) 
    .select(col("day"), (col("total.count")/col("host.count")).alias("count"))) 
+0

여기에 많이 바뀔 수있는 것은 없습니다. 윈도우 함수를 실험해볼 수는 있지만 큰 영향을 미칠 것입니다. – zero323

0

이 열 하루에 두 개의 데이터 프레임에 참여하고 카운트 열 하루 비율을 선택하면 데이터를 결합하려면 먼저이 비슷한을 사용 join해야합니다.

total_req_per_day_df = logs_df.select(dayofmonth('time') 
             .alias('day') 
            ).groupBy('day').count() 

avg_daily_req_per_host_df = (
    total_req_per_day_df.join(daily_hosts_df, 
          total_req_per_day_df.day == daily_hosts_df.day 
          ) 
    .select(daily_hosts_df['day'], 
      (total_req_per_day_df['count']/daily_hosts_df['count']) 
      .alias('avg_reqs_per_host_per_day') 
     ) 
    .cache() 
) 
1

edX 스파크 코스 지정에서 질문입니다. 해결책이 공개 된 이래로 나는 다른, 더 느린 것을 공유하고 그 성능이 향상 될 수 있는지 또는 완전히 스파크가 아닌지 묻는 기회를 갖습니다.

daily_hosts_list = (daily_hosts_df.map(lambda r: (r[0], r[1])).take(30)) 
days_with_hosts, hosts = zip(*daily_hosts_list) 
requests = (total_req_per_day_df.map(lambda r: (r[1])).take(30)) 
average_requests = [(days_with_hosts[n], float(l)) for n, l in enumerate(list(np.array(requests, dtype=float)/np.array(hosts)))] 
avg_daily_req_per_host_df = sqlContext.createDataFrame(average_requests, ('day', 'avg_reqs_per_host_per_day')) 
+1

과제의 핵심은 sparkSQL을 사용하는 것이었고 귀하의 솔루션은이를 피했습니다. 여기에 제시된 해결책은 약간의 데이터 양은 충분하지만 500GB의 데이터로이 작업을 수행했다면 상상할 수 있습니다. 이 경우 솔루션이 실패합니다. 또한이 솔루션은 스파크의 요점 인 노드 클러스터 전반에 걸쳐 병렬이되지 않습니다. 그래서 더 나은 해결책을보기 위해 내가 받아 들인 대답과 내가 그 대답에 대해 한 의견을 살펴보십시오. SparkSQL에서는 2 가지 다른 데이터 프레임을 나눌 수 없으므로 이것이 가장 효율적인 솔루션입니다. – StackPointer

0

해결책은 0323 대답을 기준으로하지만 OUTER 조인으로 올바르게 작동합니다.

avg_daily_req_per_host_df = (
    total_req_per_day_df.join(
     daily_hosts_df, daily_hosts_df['day'] == total_req_per_day_df['day'], 'outer' 
).select(
     total_req_per_day_df['day'], 
     (total_req_per_day_df['count']/daily_hosts_df['count']).alias('avg_reqs_per_host_per_day') 
) 
).cache() 

'외부'매개 변수가 없으면 데이터 프레임 중 하나에서 날짜가 누락 된 데이터가 손실되었습니다. 두 데이터 프레임에 같은 날짜가 포함되어 있기 때문에 PySpark Lab2 작업에는 중요하지 않습니다. 하지만 다른 작업에 약간의 통증을 만들 수 있습니다 :)

+0

당신이 계산을 현명하다고 말한 것에 대해서 궁금해 하셨을뿐입니다. 주어진 가치가없는 하루가 있다고 가정하십시오. 어떻게 그 평균을 계산할 것입니까? 기술적으로 Lab2에서는 dataframe1에 특정 날짜에 대한 데이터가 없으면 dataframe2에 데이터가 없으므로 중요하지 않습니다. 신중하게 생각하면 dataframe1에 데이터가 없습니다. 그래서 우리는 여기서 외부 조인을 필요로하지 않지만 팁을 주셔서 감사합니다. 앞으로도 명심하십시오. – StackPointer

+0

또한 데이터를 산술 연산 할 때 'null'/ 'NaN'값을 제거해야한다고 생각합니다. 따라서 외부 조인은 외부 조인과 같은 상황에서 프로그램을 중단시키는 대신 다시 고정시킵니다. 'NaN'데이터를 제거/대체하는 방법을 살펴볼 수 있습니다. 몇 가지 기술을 제거하거나 랜덤 포레스트를 사용하여 채우는 기술은 거의 없습니다. 그러나 더 자세히 읽으시기 바랍니다. 'Nan'/ null 값을 처리하지 않고 여기에서 외부 조인을 사용했습니다. 프로그램에 심각한 버그가 있습니다. 그러나 실험실 2에서는 괜찮습니다. – StackPointer

+0

의견을 보내 주셔서 감사합니다. 이것은 NaN/zero가있는 실제 버그입니다. –