2017-11-22 10 views
0
//download file csv 
ByteArrayOutputStream downloadedFile = downloadFile(); 

//save file in temp folder csv (
java.io.File tmpCsvFile = save(downloadedFile); 

//reading 
Dataset<Row> ds = session 
     .read() 
     .option("header", "true") 
     .csv(tmpCsvFile.getAbsolutePath()) 

tmpCsvFile는 다음 경로에 저장 로 사용AWS EMR 스파크 - CSV를 얻을 그리고 SparkSql API를

을/mnt/실/usercache/하둡/앱 캐시/application_1511379756333_0001/container_1511379756333_0001_02_000001/tmp를/1OkYaovxMsmR7iPoPnb8mx45MWvwr6k1y9xIdh8g7K0Q3118887242212394029 읽기에 .CSV

예외 :

org.apache.spark.sql.AnalysisException : 경로가 존재하지 않습니다 HDFS를 : //ip-33-33-33-33.ec2.internal : 8020을/mnt/실/usercache/하둡/앱 캐시 /application_1511379756333_0001/container_1511379756333_0001_02_000001/tmp/1OKYaovxMsmR7iPoPnb8mx45MWvwr6k1y9xIdh8g7K0Q3118887242212394029.csv;

내가 생각하기에 문제는 파일이 로컬에 저장되어 있고 spark-sql API를 통해 읽으려고하면 파일을 찾을 수 없다는 것입니다. sparkContext.addFile()을 사용하여 이미 시도했지만 작동하지 않습니다.

모든 솔루션?

감사합니다.

+1

파일을/mnt/실/usercache/하둡/앱 캐시/application_1511379756333_0001/.. seemns 로컬 파일을합니다. file : /// 옵션을 사용하여 읽기를 시도 했습니까? 이 데이터 세트를 시도하십시오 ds = session.read(). option ("header", "true") .csv (file : ///tmpCsvFile.getAbsolutePath()) –

답변

2

스파크는 읽기 및 쓰기 용으로 파일 시스템을 많이 지원합니다.

  • 지역/일반 (파일 : //)
  • S3 (S3 : //)
  • HDFS (HDFS : //) 표준 행동으로

에는 URI가 지정되지 않은 경우 spark-sql은 hdfs : // driver_address : port/path를 사용합니다.

경로에 file : ///을 추가하는 해결책은 클라이언트 모드 (내 경우 (클러스터))에서만 작동 할 수 있습니다. 드라이버가 파일 읽기 작업을 생성하면 파일이없는 노드 중 하나에 실행 프로그램으로 전달됩니다.

우리가 할 수있는 것? Hadoop에 파일을 작성하십시오.

Configuration conf = new Configuration(); 
    ByteArrayOutputStream downloadedFile = downloadFile(); 
    //convert outputstream in inputstream 
    InputStream is=Functions.FROM_BAOS_TO_IS.apply(fileOutputStream); 
    String myfile="miofile.csv"; 
    //acquiring the filesystem 
    FileSystem fs = FileSystem.get(URI.create(dest),conf); 
    //openoutputstream to hadoop 
    OutputStream outf = fs.create(new Path(dest)); 
    //write file 
    IOUtils.copyBytes(tmpIS, outf, 4096, true); 
    //commit the read task 
    Dataset<Row> ds = session 
    .read() 
    .option("header", "true") 
    .csv(myfile) 

덕분에, 더 나은 솔루션을 환영합니다