2016-11-09 3 views
0

여기 스트리밍 디렉토리에서 데이터를 스트리밍하고 출력 위치에 기록합니다. 또한 hdfs 파일을 입력 폴더에서 스트리밍 디렉터리로 이동하는 프로세스를 구현하려고합니다. 이 이동은 스트리밍 컨텍스트가 시작되기 전에 한 번 발생합니다. 그러나이 작업은 Dstream의 각 Batch에 대해 매번 실행되도록하고 싶습니다. 그게 가능한가?내 스파크 스트리밍 응용 프로그램 내에서 파일을 이동하는 방법

val streamed_rdd = ssc.fileStream[LongWritable, Text, TextInputFormat](streaming_directory, (t:Path)=> true , true).map { case (x, y) => (y.toString) } 
    streamed_rdd.foreachRDD(rdd => { 
     rdd.map(x =>x.split("\t")).map(x => x(3)).foreachPartition { partitionOfRecords => 
     val connection: Connection = connectionFactory.createConnection() 
     connection.setClientID("Email_send_module_client_id") 
     println("connection started with active mq") 
     val session: Session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE) 
     println("created session") 
     val dest = session.createQueue("dwEmailsQueue2") 
     println("destination queue name = dwEmailsQueue2") 
     val prod_queue = session.createProducer(dest) 
     connection.start() 
     partitionOfRecords.foreach { record => 
      val rec_to_send: TextMessage = session.createTextMessage(record) 
      println("started creating a text message") 
      prod_queue.send(rec_to_send) 
      println("sent the record") 
     } 
     connection.close() 
     } 
    } 
    ) 
    **val LIST = scala.collection.mutable.MutableList[String]() 
    val files_to_move = scala.collection.mutable.MutableList[String]() 
    val cmd = "hdfs dfs -ls -d "+load_directory+"/*" 
    println(cmd) 
    val system_time = System.currentTimeMillis 
    println(system_time) 
    val output = cmd.!! 
    output.split("\n").foreach(x => x.split(" ").foreach(x => if (x.startsWith("/user/hdpprod/")) LIST += x)) 
    LIST.foreach(x => if (x.toString.split("/").last.split("_").last.toLong < system_time) files_to_move += x) 
    println("files to move" +files_to_move) 
    var mv_cmd :String = "hdfs dfs -mv " 
    for (file <- files_to_move){ 
     mv_cmd += file+" " 
    } 
    mv_cmd += streaming_directory 
    println(mv_cmd) 
    val mv_output = mv_cmd.!! 
    println("moved the data to the folder")** 
    if (streamed_rdd.count().toString == "0") { 
     println("no data in the streamed list") 
    } else { 
     println("saving the Dstream at "+System.currentTimeMillis()) 
     streamed_rdd.transform(rdd => {rdd.map(x => (check_time_to_send+"\t"+check_time_to_send_utc+"\t"+x))}).saveAsTextFiles("/user/hdpprod/temp/spark_streaming_output_sent/sent") 
    } 
    ssc.start() 
    ssc.awaitTermination() 
    } 
} 

답변

0

다음과 같이 자바 구현시 동일한 작업을 시도했습니다. foreachPartion on rdd에서이 메서드를 호출 할 수 있습니다.

public static void moveFiles(final String moveFilePath, 
      final JavaRDD rdd) { 
      for (final Partition partition : rdd.partitions()) { 
       final UnionPartition unionPartition = (UnionPartition) partition; 
       final NewHadoopPartition newHadoopPartition = (NewHadoopPartition) 
        unionPartition.parentPartition(); 
       final String fPath = newHadoopPartition.serializableHadoopSplit() 
        .value().toString(); 
       final String[] filespaths = fPath.split(":"); 

       if ((filespaths != null) && (filespaths.length > 0)) { 
        for (final String filepath : filespaths) { 
         if ((filepath != null) && filepath.contains("/")) { 
          final File file = new File(filepath); 

          if (file.exists() && file.isFile()) { 
           try { 
            File destFile = new File(moveFilePath + "/" + 
              file.getName()); 

            if (destFile.exists()) { 
             destFile = new File(moveFilePath + "/" + 
               file.getName() + "_"); 
            } 

            java.nio.file.Files.move((file 
              .toPath()), destFile.toPath(), 
             StandardCopyOption.REPLACE_EXISTING); 


           } catch (Exception e) { 
            logger.error(
             "Exception while moving file", 
             e); 
           } 
          } 
         } 
        } 
       } 
      } 

     }