0
각 날짜의 최신 데이터 프레임이 있습니다. 매일 새 qte와 새 ca를 이전 날짜에 추가하고 날짜를 업데이트해야합니다. 그래서 이미 존재하는 것을 업데이트하고 새로운 것을 추가해야합니다. 여기Spark : 조인 작업을 기반으로 Dataframe 업데이트
내가 마지막에하고 싶은 것을 예 :
val histocaisse = spark.read
.format("csv")
.option("header", "true") //reading the headers
.load("C:/Users/MHT/Desktop/histocaisse_dte1.csv")
val hist = histocaisse
.withColumn("pos_id", 'pos_id.cast(LongType))
.withColumn("article_id", 'pos_id.cast(LongType))
.withColumn("date", 'date.cast(DateType))
.withColumn("qte", 'qte.cast(DoubleType))
.withColumn("ca", 'ca.cast(DoubleType))
val histocaisse2 = spark.read
.format("csv")
.option("header", "true") //reading the headers
.load("C:/Users/MHT/Desktop/histocaisse_dte2.csv")
val hist2 = histocaisse2.withColumn("pos_id", 'pos_id.cast(LongType))
.withColumn("article_id", 'pos_id.cast(LongType))
.withColumn("date", 'date.cast(DateType))
.withColumn("qte", 'qte.cast(DoubleType))
.withColumn("ca", 'ca.cast(DoubleType))
hist2.show(false)
+------+----------+----------+----+----+
|pos_id|article_id|date |qte |ca |
+------+----------+----------+----+----+
|1 |1 |2000-01-07|2.5 |3.5 |
|2 |2 |2000-01-07|14.7|12.0|
|3 |3 |2000-01-07|3.5 |1.2 |
+------+----------+----------+----+----+
+------+----------+----------+----+----+
|pos_id|article_id|date |qte |ca |
+------+----------+----------+----+----+
|1 |1 |2000-01-08|2.5 |3.5 |
|2 |2 |2000-01-08|14.7|12.0|
|3 |3 |2000-01-08|3.5 |1.2 |
|4 |4 |2000-01-08|3.5 |1.2 |
|5 |5 |2000-01-08|14.5|1.2 |
|6 |6 |2000-01-08|2.0 |1.25|
+------+----------+----------+----+----+
+------+----------+----------+----+----+
|pos_id|article_id|date |qte |ca |
+------+----------+----------+----+----+
|1 |1 |2000-01-08|5.0 |7.0 |
|2 |2 |2000-01-08|39.4|24.0|
|3 |3 |2000-01-08|7.0 |2.4 |
|4 |4 |2000-01-08|3.5 |1.2 |
|5 |5 |2000-01-08|14.5|1.2 |
|6 |6 |2000-01-08|2.0 |1.25|
+------+----------+----------+----+----+
내가이
val df = hist2.join(hist1, Seq("article_id", "pos_id"), "left")
.select($"pos_id", $"article_id",
coalesce(hist2("date"), hist1("date")).alias("date"),
(coalesce(hist2("qte"), lit(0)) + coalesce(hist1("qte"), lit(0))).alias("qte"),
(coalesce(hist2("ca"), lit(0)) + coalesce(hist1("ca"), lit(0))).alias("ca"))
.orderBy("pos_id", "article_id")
// df.show()
|pos_id|article_id| date| qte| ca|
+------+----------+----------+----+----+
| 1| 1|2000-01-08| 5.0| 7.0|
| 2| 2|2000-01-08|29.4|24.0|
| 3| 3|2000-01-08| 7.0| 2.4|
| 4| 4|2000-01-08| 3.5| 1.2|
| 5| 5|2000-01-08|14.5| 1.2|
| 6| 6|2000-01-08| 2.0|1.25|
+------+----------+----------+----+----+
목표를했다 이렇게하려면이 존재하고 추가 할 경우 정보를 정기적으로 업데이트하는 것입니다 새로운 것들.하지만 내가 사건을 시도하면 빈은 다음과 같은 문제가 발생했습니다
Exception in thread "main" java.lang.UnsupportedOperationException: empty collection
at org.apache.spark.rdd.RDD$$anonfun$first$1.apply(RDD.scala:1321)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
취해야 할 조치 첫 번째 테이블이 비어있는 경우조차도 고려하십시오.
'histocaisse_dte1.csv'에 헤더 행이 있습니까? – MaxU