2017-05-17 3 views
0

3 개의 데이터 세트가 있는데, 합쳐진 데이터가 포함 된 CSV를 얻으려면이 데이터 세트에 가입하고 그룹화하려고합니다.CSV 출력에 hadoop 쪽 마루를 사용하여 빅 데이터 처리

데이터는 마루 파일로 하둡에 저장되며 Zeppelin을 사용하여 데이터 처리를 위해 Apache Spark + Scala를 실행합니다.

내 데이터 세트는 다음과 같다 :

user_actions.show(10) 
user_clicks.show(10) 
user_options.show(10) 

+--------------------+--------------------+ 
|     id|    keyword| 
+--------------------+--------------------+ 
|00000000000000000001|    aaaa1| 
|00000000000000000002|    aaaa1| 
|00000000000000000003|    aaaa2| 
|00000000000000000004|    aaaa2| 
|00000000000000000005|    aaaa0| 
|00000000000000000006|    aaaa4| 
|00000000000000000007|    aaaa1| 
|00000000000000000008|    aaaa2| 
|00000000000000000009|    aaaa1| 
|00000000000000000010|    aaaa1| 
+--------------------+--------------------+ 
+--------------------+-------------------+ 
|   search_id| selected_user_id| 
+--------------------+-------------------+ 
|00000000000000000001|    1234| 
|00000000000000000002|    1234| 
|00000000000000000003|    1234| 
|00000000000000000004|    1234| 
+--------------------+-------------------+ 

+--------------------+----------+----------+ 
|   search_id| user_id| position| 
+--------------------+----------+----------+ 
|00000000000000000001|  1230|   1| 
|00000000000000000001|  1234|   3| 
|00000000000000000001|  1232|   2| 
|00000000000000000002|  1231|   1| 
|00000000000000000002|  1232|   2| 
|00000000000000000002|  1233|   3| 
|00000000000000000002|  1234|   4| 
|00000000000000000003|  1234|   1| 
|00000000000000000004|  1230|   1| 
|00000000000000000004|  1234|   2| 
+--------------------+----------+----------+ 

무엇을 달성하려고하는 것은 내가 MySQL을 수입하고 PK로 USER_ID 한 그들을 필요로하기 때문에 각 사용자 ID에 대한 키워드와 JSON을 얻는 것입니다. JSON은 상자 밖으로없는 경우에, 나는 튜플 또는 문자열

user_id,keywords 
1234,"{\"aaaa1\":3.5,\"aaaa2\":0.5}" 

사용할 수 있습니다 :이 같은 작동하지만

val user_actions_data = user_actions 
           .join(user_options, user_options("search_id") === user_actions("id")) 

val user_actions_full_data = user_actions_data 
            .join(
              user_clicks, 
              user_clicks("search_id") === user_actions_data("search_id") && user_clicks("selected_user_id") === user_actions_data("user_id"), 
              "left_outer" 
             ) 

val user_actions_data_groupped = user_actions_full_data 
             .groupBy("user_id", "search") 
             .agg("search" -> "count", "selected_user_id" -> "count", "position" -> "avg") 


def udfScoreForUser = ((position: Double, searches: Long) => (position/searches)) 

val search_log_keywords = user_actions_data_groupped.rdd.map({row => row(0) -> (row(1) -> udfScoreForUser(row.getDouble(4), row.getLong(2)))}).groupByKey() 


val search_log_keywords_array = search_log_keywords.collect.map(r => (r._1.asInstanceOf[Long], r._2.mkString(", "))) 

val search_log_keywords_df = sc.parallelize(search_log_keywords_array).toDF("user_id","keywords") 
    .coalesce(1) 
    .write.format("csv") 
    .option("header", "true") 
    .mode("overwrite") 
    .save("hdfs:///Search_log_testing_keywords/") 

: 내가 지금까지했던 무엇

user_id,keywords 
1234,"(aaaa1,0.58333),(aaaa2,1.5)" 

작은 데이터 세트 및 출력 CSV 파일로 인해 예상되는 값은 다음과 같습니다.

200 + GB의 데이터를 실행할 때 문제가 있습니다.

저는 Spark & 스칼라에 상당히 익숙합니다.하지만 뭔가 빠졌다고 생각합니다. 그리고 나는 ddd를 rdd로 사용하지 않아야하고, 배열에 매핑하고, DF로 다시 병렬 처리하여 CSV로 내보낼 수 없습니다.

요약하면 모든 키워드에 점수를 적용하고 사용자 ID별로 그룹화하여 CSV에 저장하려고합니다. 내가 지금까지 한 것은 작은 데이터 세트로 작동하지만 200GB 이상의 데이터에 적용하면 아파치 스파크가 실패합니다.

답변

1

예, 뭔가를 디버깅하지 않는 한 Spark에서 collect에 의존하는 것은 일반적으로 잘못되었습니다. collect로 전화하면 모든 데이터가 드라이버에서 어레이로 수집되므로, 대부분의 빅 데이터 세트의 경우 이것은 옵션이 아닙니다. 운전자가 OOM을 던져 죽을 것입니다.

내가 이해할 수없는 것은 왜 처음에 수집하는 것입니까? 왜 단순히 분산 데이터 세트에 매핑하지 않습니까?

search_log_keywords 
    .map(r => (r._1.asInstanceOf[Long], r._2.mkString(", "))) 
    .toDF("user_id","keywords") 
    .coalesce(1) 
    .write.format("csv") 
    .option("header", "true") 
    .mode("overwrite") 
    .save("hdfs:///Search_log_testing_keywords/") 

그런 식으로 모든 것이 병렬로 수행됩니다.

dataframesrdds 사이를 전환하는 것과 관련하여 나는 지금 당장 그것에 대해 너무 걱정하지 않을 것입니다. 나는 커뮤니티가 대부분 dataframes을 사용하는 것을 주장하지만, Spark의 버전과 유스 케이스에 따라 rdds이 더 나은 선택 일 수 있습니다.

+0

오, 아니, 고쳐 준다고 생각합니다. 오류는 보이지 않지만 전체 데이터를 실행하는 데 시간이 걸립니다. 튜플 문자열 대신 JSON 결과를 얻을 수있는 방법에 대한 단서가 있습니까? – mvpasarel

+0

'write.json (your/target/path)'에 대해 어떻습니까? –

+0

그래. 참으로 도움이되었습니다. 'collect'는 나를 위해 일을 망쳤다. 그러나, 나는 그것을 완전히 실행할 수 없었다.이 오류가 있습니다 : '원인 : java.lang.RuntimeException : java.io.FileNotFoundException :/yarn/nm/usercache/hdfs/appcache/application_1493881537049_0724/blockmgr-933c832c-636a-4748-9103-eff656f2456c/25/shuffle_92_0_0 .index (그런 파일이나 디렉토리가 없음)' 나는 아직도 내 서버가 처리 할 수있는 것보다 많은 데이터를 가지고 실행한다고 생각한다. : – mvpasarel

0

HDFS의 주요 목표는 파일을 청크로 분할하여 중복 저장하는 것입니다. HDFS에 분할 된 데이터를 저장하는 것이 더 좋습니다. 단 하나의 큰 파일이 반드시 필요한 경우는 제외합니다.