일부 데이터를 내 클러스터에 보내는 앱을 만들고 있습니다. 이 데이터는 Spark Streaming 응용 프로그램이 실행중인 특정 HDFS 폴더에 저장됩니다.Spark Structured Streaming은 ElasticSearch와 통합합니다.
이 streamApp에서는 저렴한 데이터 공유를 위해 &을 사용합니다. 이후 AngularApp에 데이터를 제공하기 위해 ElasticSearch에 결과를 인덱싱해야합니다.
다,
를 잘 작동하지만 ... 인덱스 ES 내 결과가 없습니다. 사실은 입력으로 일부 Dataframe 스트림을 사용하기 때문에 내가 RDD 내 결과 DataFrame를 tranform 수 없다 ...입니다
여기내 의사 코드 :
val schema = StructType(
StructField("id", StringType, nullable = false) ::
StructField("code", StringType, nullable = false) :: Nil)
val lines = spark.readStream
.format("json")
.schema(schema)
.load(HDFSPath)
// Do some basics stuff here
import spark.implicits._
val linesRDD = lines.rdd.map(row =>
StreamingObj(row(0).toString,row(1).toString)) // RDD[StreamingObj]
linesRDD.saveToEs("stream/stream") // ES
val linesDF= linesRDD.toDF()
val queryNode = linesDF
.writeStream
.format("console")
.outputMode(OutputMode.Append)
.trigger(Trigger.ProcessingTime(4.seconds))
.start
그리고 그것은 내가하려고 할 때 실패 내 DataFrame을 RDD로 변환하십시오.
데이터를 색인 할 수 있으려면 RDD로 변환해야합니다.
lines.rdd.map에서이 문제가 발생했습니다.
Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
ES 내에서 DataStreaming 스파크를 인덱싱 할 수 있습니까?
도움 주셔서 감사합니다.
EDIT :
더 간단한 경우를보십시오
val lines = spark.readStream
.format("json")
.schema(schema)
.load(HDFSPATH).as[StreamingObj]
lines.writeStream
.format("org.elasticsearch.spark.sql")
.outputMode("append")
.start("index/stream")
17/12/21 15시 37분 55초 정보 util.Version : Elasticsearch 하둡 v5.4.2 [a478aabe9e]에서 예외 "주" java.lang.UnsupportedOperationException가 스레드 : 데이터 소스 org.elasticsearch.spark.sql는 스트리밍 쓰기를 지원하지 않습니다
Cmon ... 난으로 같은 일을 문서 =>
또는 예 https://www.elastic.co/guide/en/elasticsearch/hadoop/current/spark.html#spark-sql-streaming : https://discuss.elastic.co/t/spark-structured-streaming-sink-in-append-mode/105664/4
또는이 하나
다음https://discuss.elastic.co/t/structured-streaming-failed-to-find-data-source-es/112144
내 메이븐 의존성 :
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-spark-20_2.11</artifactId>
<version>5.4.2</version>
</dependency>
좋은가요? 형식 ('es')을 사용할 수 없으며 찾을 수 없습니다.
EDIT 2 :
이 ES에 그 점화 구조화 스트리밍 만 보인다> 6.0
https://www.elastic.co/blog/structured-streaming-elasticsearch-for-hadoop-6-0
참조 :(
방금 편집했습니다. OP Post – GreGGus