3 개의 데이터 세트가 있는데, 합쳐진 데이터가 포함 된 CSV를 얻으려면이 데이터 세트에 가입하고 그룹화하려고합니다.CSV 출력에 hadoop 쪽 마루를 사용하여 빅 데이터 처리
데이터는 마루 파일로 하둡에 저장되며 Zeppelin을 사용하여 데이터 처리를 위해 Apache Spark + Scala를 실행합니다.
내 데이터 세트는 다음과 같다 :
user_actions.show(10)
user_clicks.show(10)
user_options.show(10)
+--------------------+--------------------+
| id| keyword|
+--------------------+--------------------+
|00000000000000000001| aaaa1|
|00000000000000000002| aaaa1|
|00000000000000000003| aaaa2|
|00000000000000000004| aaaa2|
|00000000000000000005| aaaa0|
|00000000000000000006| aaaa4|
|00000000000000000007| aaaa1|
|00000000000000000008| aaaa2|
|00000000000000000009| aaaa1|
|00000000000000000010| aaaa1|
+--------------------+--------------------+
+--------------------+-------------------+
| search_id| selected_user_id|
+--------------------+-------------------+
|00000000000000000001| 1234|
|00000000000000000002| 1234|
|00000000000000000003| 1234|
|00000000000000000004| 1234|
+--------------------+-------------------+
+--------------------+----------+----------+
| search_id| user_id| position|
+--------------------+----------+----------+
|00000000000000000001| 1230| 1|
|00000000000000000001| 1234| 3|
|00000000000000000001| 1232| 2|
|00000000000000000002| 1231| 1|
|00000000000000000002| 1232| 2|
|00000000000000000002| 1233| 3|
|00000000000000000002| 1234| 4|
|00000000000000000003| 1234| 1|
|00000000000000000004| 1230| 1|
|00000000000000000004| 1234| 2|
+--------------------+----------+----------+
무엇을 달성하려고하는 것은 내가 MySQL을 수입하고 PK로 USER_ID 한 그들을 필요로하기 때문에 각 사용자 ID에 대한 키워드와 JSON을 얻는 것입니다. JSON은 상자 밖으로없는 경우에, 나는 튜플 또는 문자열
user_id,keywords
1234,"{\"aaaa1\":3.5,\"aaaa2\":0.5}"
사용할 수 있습니다 :이 같은 작동하지만
val user_actions_data = user_actions
.join(user_options, user_options("search_id") === user_actions("id"))
val user_actions_full_data = user_actions_data
.join(
user_clicks,
user_clicks("search_id") === user_actions_data("search_id") && user_clicks("selected_user_id") === user_actions_data("user_id"),
"left_outer"
)
val user_actions_data_groupped = user_actions_full_data
.groupBy("user_id", "search")
.agg("search" -> "count", "selected_user_id" -> "count", "position" -> "avg")
def udfScoreForUser = ((position: Double, searches: Long) => (position/searches))
val search_log_keywords = user_actions_data_groupped.rdd.map({row => row(0) -> (row(1) -> udfScoreForUser(row.getDouble(4), row.getLong(2)))}).groupByKey()
val search_log_keywords_array = search_log_keywords.collect.map(r => (r._1.asInstanceOf[Long], r._2.mkString(", ")))
val search_log_keywords_df = sc.parallelize(search_log_keywords_array).toDF("user_id","keywords")
.coalesce(1)
.write.format("csv")
.option("header", "true")
.mode("overwrite")
.save("hdfs:///Search_log_testing_keywords/")
: 내가 지금까지했던 무엇
user_id,keywords
1234,"(aaaa1,0.58333),(aaaa2,1.5)"
작은 데이터 세트 및 출력 CSV 파일로 인해 예상되는 값은 다음과 같습니다.
200 + GB의 데이터를 실행할 때 문제가 있습니다.
저는 Spark & 스칼라에 상당히 익숙합니다.하지만 뭔가 빠졌다고 생각합니다. 그리고 나는 ddd를 rdd로 사용하지 않아야하고, 배열에 매핑하고, DF로 다시 병렬 처리하여 CSV로 내보낼 수 없습니다.
요약하면 모든 키워드에 점수를 적용하고 사용자 ID별로 그룹화하여 CSV에 저장하려고합니다. 내가 지금까지 한 것은 작은 데이터 세트로 작동하지만 200GB 이상의 데이터에 적용하면 아파치 스파크가 실패합니다.
오, 아니, 고쳐 준다고 생각합니다. 오류는 보이지 않지만 전체 데이터를 실행하는 데 시간이 걸립니다. 튜플 문자열 대신 JSON 결과를 얻을 수있는 방법에 대한 단서가 있습니까? – mvpasarel
'write.json (your/target/path)'에 대해 어떻습니까? –
그래. 참으로 도움이되었습니다. 'collect'는 나를 위해 일을 망쳤다. 그러나, 나는 그것을 완전히 실행할 수 없었다.이 오류가 있습니다 : '원인 : java.lang.RuntimeException : java.io.FileNotFoundException :/yarn/nm/usercache/hdfs/appcache/application_1493881537049_0724/blockmgr-933c832c-636a-4748-9103-eff656f2456c/25/shuffle_92_0_0 .index (그런 파일이나 디렉토리가 없음)' 나는 아직도 내 서버가 처리 할 수있는 것보다 많은 데이터를 가지고 실행한다고 생각한다. : – mvpasarel