2017-11-01 4 views
2

로컬 드라이브에 CSV 파일로 데이터 프레임을 저장하려고합니다. 그러나 그렇게 할 때 생성 된 폴더가 생성되고 그 파티션 내에 파일이 작성됩니다. 이것을 극복하기위한 제안이 있습니까?Spark에서 단일 (일반) csv 파일로 데이터를 쓰는 방법은 무엇입니까?

내 요구 사항 : 코드에 제공된 실제 이름을 가진 일반 csv 파일을 가져 오려면.

코드 조각 : dataframe.coalesce(1).write.mode("overwrite").format("com.databricks.spark.csv").option("header", "true").csv("E:/dataframe.csv")

답변

1

TL : DR 당신은 배포 enviornment에서 현재의 핵심 개념, 연속을 적용하려고합니다. 그것은 잘 끝낼 수 없습니다.

스파크는 이와 같은 유틸리티를 제공하지 않습니다. 반 분산 방식으로 하나를 만들려면 멀티 스텝, 소스 종속 프로토콜을 구현해야합니다.

  • 헤더를 작성합니다.
  • 각 파티션에 데이터 파일을 작성합니다.
  • 파일을 병합하고 새 이름을 지정하십시오.

제한된 응용 프로그램을 가지고 있기 때문에 작은 파일에만 유용하며 일부 소스 (객체 저장소와 같은)에서는 Spark에서 구현되는 것과 같이 매우 비쌉니다.

물론 데이터를 수집하고 표준 CSV 파서 (Univoicity, Apache Commons)를 사용하여 원하는 저장소에 저장할 수 있습니다. 이는 순차적이며 여러 데이터 전송이 필요합니다.

0

자동으로 수행 할 방법이 없습니다. 로컬 디렉토리가 모든 실행에 설치하는 경우에는 나는 두 가지 솔루션

  • 를 참조하십시오 디렉토리를 사용할 수없는 경우 원하는 이름
  • part-*csv 파일의 이름을 변경 또는 이동/다음처럼 파일을 작성하지만, 모든 집행에 :

그러나 두 솔루션이 종류의 병렬 처리와 불꽃의 따라서 목표를 파괴 일반 스칼라를 사용하여 파일을 생성 한 후 드라이버에 dataframe를 수집합니다.

0

이 불가능하지만,이 같은 일도 수행 할 수 있습니다

dataframe.coalesce(1).write.mode("overwrite").format("com.databricks.spark.csv").option("header", "true").csv("E:/data/") 

import org.apache.hadoop.fs._ 
val fs = FileSystem.get(sc.hadoopConfiguration) 
val filePath = "E:/data/" 
val fileName = fs.globStatus(new Path(filePath+"part*"))(0).getPath.getName 
fs.rename(new Path(filePath+fileName), new Path(filePath+"dataframe.csv"))