2017-12-21 8 views
1

일부 데이터를 내 클러스터에 보내는 앱을 만들고 있습니다. 이 데이터는 Spark Streaming 응용 프로그램이 실행중인 특정 HDFS 폴더에 저장됩니다.Spark Structured Streaming은 ElasticSearch와 통합합니다.

이 streamApp에서는 저렴한 데이터 공유를 위해 &을 사용합니다. 이후 AngularApp에 데이터를 제공하기 위해 ElasticSearch에 결과를 인덱싱해야합니다.

다,

를 잘 작동하지만 ... 인덱스 ES 내 결과가 없습니다. 사실은 입력으로 일부 Dataframe 스트림을 사용하기 때문에 내가 RDD 내 결과 DataFrame를 tranform 수 없다 ...입니다

여기

내 의사 코드 :

val schema = StructType(
    StructField("id", StringType, nullable = false) :: 
    StructField("code", StringType, nullable = false) :: Nil) 


val lines = spark.readStream 
    .format("json") 
    .schema(schema) 
    .load(HDFSPath) 

// Do some basics stuff here 

import spark.implicits._ 

val linesRDD = lines.rdd.map(row => 
     StreamingObj(row(0).toString,row(1).toString)) // RDD[StreamingObj] 
linesRDD.saveToEs("stream/stream")   // ES 
val linesDF= linesRDD.toDF() 

val queryNode = linesDF 
    .writeStream 
    .format("console") 
    .outputMode(OutputMode.Append) 
    .trigger(Trigger.ProcessingTime(4.seconds)) 
    .start 

그리고 그것은 내가하려고 할 때 실패 내 DataFrame을 RDD로 변환하십시오.

데이터를 색인 할 수 있으려면 RDD로 변환해야합니다.

lines.rdd.map에서이 문제가 발생했습니다.

Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();; 

ES 내에서 DataStreaming 스파크를 인덱싱 할 수 있습니까?

도움 주셔서 감사합니다.

EDIT :

더 간단한 경우를보십시오

val lines = spark.readStream 
     .format("json") 
     .schema(schema) 
     .load(HDFSPATH).as[StreamingObj] 

    lines.writeStream 
     .format("org.elasticsearch.spark.sql") 
     .outputMode("append") 
     .start("index/stream") 

17/12/21 15시 37분 55초 정보 util.Version : Elasticsearch 하둡 v5.4.2 [a478aabe9e]에서 예외 "주" java.lang.UnsupportedOperationException가 스레드 : 데이터 소스 org.elasticsearch.spark.sql는 스트리밍 쓰기를 지원하지 않습니다

Cmon ... 난으로 같은 일을 문서 =>

또는 예 https://www.elastic.co/guide/en/elasticsearch/hadoop/current/spark.html#spark-sql-streaming : https://discuss.elastic.co/t/spark-structured-streaming-sink-in-append-mode/105664/4

또는이 하나

다음

https://discuss.elastic.co/t/structured-streaming-failed-to-find-data-source-es/112144

내 메이븐 의존성 :

<dependency> 
    <groupId>org.elasticsearch</groupId> 
    <artifactId>elasticsearch-spark-20_2.11</artifactId> 
    <version>5.4.2</version> 
</dependency> 

좋은가요? 형식 ('es')을 사용할 수 없으며 찾을 수 없습니다.

EDIT 2 :

이 ES에 그 점화 구조화 스트리밍 만 보인다> 6.0

https://www.elastic.co/blog/structured-streaming-elasticsearch-for-hadoop-6-0

참조 :(

답변

2

을 문제는 RDD

val linesRDD = lines.rdd.map(row => 
     StreamingObj(row(0).toString,row(1).toString)) // RDD[StreamingObj] 

변환이 구조화 된 스트리밍 쿼리에서 허용되지 않습니다 여기에 직접 작성하려고 할 수 있습니다 :.

lines 
    .writeStream 
    .format("org.elasticsearch.spark.sql") 
    ... 

또는 ForeachWriter :

lines.writeStream.foreach(...) 
+0

방금 ​​편집했습니다. OP Post – GreGGus