2017-10-13 9 views
0

텍스트 파일을 읽고 날짜를 기준으로 평균을 계산하고 요약을 MySQL 데이터베이스에 저장하는 작은 시나리오가 있습니다.데이터 집합 데이터가 MySQL 데이터베이스에 삽입 된 후 업데이트됩니다.

다음은 그런 저장 명령 실행 STEP 1

+----------+------------------+-----+-----+ 
|  date|    flo| hz|count| 
+----------+------------------+-----+-----+ 
|2017-10-05|52.887049194476745|10.27| 5.0| 
|2017-10-04| 55.4188048943416|10.27| 5.0| 
|2017-10-03| 54.1529270444092|10.27| 10.0| 
+----------+------------------+-----+-----+ 

결과 다음 repo_sum의 dataframe 평균을 계산 한 후 코드

val repo_sum = joined_data.map(SensorReport.generateReport) 
      repo_sum.show() --- STEP 1 
      repo_sum.write.mode(SaveMode.Overwrite).jdbc(url, "sensor_report", prop) 
      repo_sum.show() --- STEP 2 

되어 있고 2 단계에서의 데이터 세트 값

이고
+----------+-----------------+------------------+-----+ 
|  date|    flo|    hz|count| 
+----------+-----------------+------------------+-----+ 
|2017-10-05|52.88704919447673|31.578524597238367| 10.0| 
|2017-10-04| 55.4188048943416| 32.84440244717079| 10.0| 
+----------+-----------------+------------------+-----+ 

다음은 코드

입니다.
class StreamRead extends Serializable { 
    org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this); 
    def main(args: Array[String]) { 
    val conf = new SparkConf().setAppName("Application").setMaster("local[2]") 
    val ssc = new StreamingContext(conf, Seconds(2)) 
    val sqlContext = new SQLContext(ssc.sparkContext) 
    import sqlContext.implicits._ 
    val sensorDStream = ssc.textFileStream("file:///C:/Users/M1026352/Desktop/Spark/StreamData").map(Sensor.parseSensor) 
    val url = "jdbc:mysql://localhost:3306/streamdata" 
    val prop = new java.util.Properties 
    prop.setProperty("user", "root") 
    prop.setProperty("password", "root") 
    val tweets = sensorDStream.foreachRDD { 
     rdd => 
     if (rdd.count() != 0) { 
      val databaseVal = sqlContext.read.jdbc("jdbc:mysql://localhost:3306/streamdata", "sensor_report", prop) 
      val rdd_group = rdd.groupBy { x => x.date } 
      val repo_data = rdd_group.map { x => 
      val sum_flo = x._2.map { x => x.flo }.reduce(_ + _) 
      val sum_hz = x._2.map { x => x.hz }.reduce(_ + _) 
      val sum_flo_count = x._2.size 
      print(sum_flo_count) 
      SensorReport(x._1, sum_flo, sum_hz, sum_flo_count) 
      } 
      val df = repo_data.toDF() 
      val joined_data = df.join(databaseVal, Seq("date"), "fullouter") 
      joined_data.show() 
      val repo_sum = joined_data.map(SensorReport.generateReport) 
      repo_sum.show() 
      repo_sum.write.mode(SaveMode.Overwrite).jdbc(url, "sensor_report", prop) 
      repo_sum.show() 
     } 
    } 

    ssc.start() 
    WorkerAndTaskExample.main(args) 
    ssc.awaitTermination() 
    } 
    case class Sensor(resid: String, date: String, time: String, hz: Double, disp: Double, flo: Double, sedPPM: Double, psi: Double, chlPPM: Double) 

    object Sensor extends Serializable { 
    def parseSensor(str: String): Sensor = { 
     val p = str.split(",") 
     Sensor(p(0), p(1), p(2), p(3).toDouble, p(4).toDouble, p(5).toDouble, p(6).toDouble, p(7).toDouble, p(8).toDouble) 
    } 
    } 
    case class SensorReport(date: String, flo: Double, hz: Double, count: Double) 
    object SensorReport extends Serializable { 
    def generateReport(row: Row): SensorReport = { 
     print(row) 
     if (row.get(4) == null) { 
     SensorReport(row.getString(0), row.getDouble(1)/row.getDouble(3), row.getDouble(2)/row.getDouble(3), row.getDouble(3)) 
     } else if (row.get(2) == null) { 
     SensorReport(row.getString(0), row.getDouble(4), row.getDouble(5), row.getDouble(6)) 
     } else { 
     val count = row.getDouble(3) + row.getDouble(6) 
     val flow_avg_update = (row.getDouble(6) * row.getDouble(4) + row.getDouble(1))/count 
     val flow_flo_update = (row.getDouble(6) * row.getDouble(5) + row.getDouble(1))/count 
     print(count + " : " + flow_avg_update + " : " + flow_flo_update) 
     SensorReport(row.getString(0), flow_avg_update, flow_flo_update, count) 
     } 
    } 
    } 

지금까지 나는 모든 프로세스가 다시 실행되는 스파크에서 저장 명령이 실행될 때 이해합니다. 내 이해가 정확하다는 사실을 알려주십시오.

+0

시도 : 두 가지 작업이 예기치 않은 동작의 원인이되는 dataframe에서 수행되기 때문에, dataframe를 캐싱하는 문제를 해결할 것이 경우


, 여전히 발생합니다 :'val repo_sum = joined_data.map (SensorReport.generateReport) .cache()'. – Shaido

+0

예 val repo_sum = joined_data.map (SensorReport.generateReport) .cache()가 잘 작동합니다. –

+0

질문에 대한 완전한 대답을 추가했습니다. – Shaido

답변

1

모든 변환이 지연되면 action이 호출 될 때까지 아무런 반응이 없습니다. 동시에 동일한 RDD 또는 데이터 프레임에서 여러 동작이 호출되는 경우 모든 계산이 여러 번 수행된다는 것을 의미합니다. 여기에는 데이터 및 모든 변환로드가 포함됩니다.

이 문제를 방지하려면 cache() 또는 persist()을 사용하십시오 (cache()은 다른 유형의 저장소를 지정할 수 있습니다. 기본값은 RAM 메모리 만 해당). cache()은 처음 동작이 사용 된 후 RDD/데이터 프레임을 메모리에 유지합니다. 따라서 같은 변환을 여러 번 실행하지 않아도됩니다. 은`RDD`를 캐시하고 있는지 확인하기 위해

val repo_sum = joined_data.map(SensorReport.generateReport).cache()