0

나는 카프카 스트림에서와 Elasticsearch 문서 upsert 읽고 파이썬에서 의사가 (카운터 view를 증가.Spark에서 ElasticSearch의 스크립트 문서를 업데이트하는 방법은 무엇입니까? 문서가 이미 존재하는 경우

for message in consumer: 

    msg = json.loads(message.value) 
    print(msg) 
    index = INDEX_NAME 
    es_id = msg["id"] 
    script = {"script":"ctx._source.view+=1","upsert" : msg} 
    es.update(index=index, doc_type="test", id=es_id, body=script) 

을 내가 분산 환경에서 사용하기 원하기 때문에, 내가 사용 KafkaStream로부터 판독 스칼라 스파크 구조화 스트리밍

df.writeStream \ 
.format("org.elasticsearch.spark.sql")\ 
.queryName("ESquery")\ 
.option("es.resource","credentials/url") \ 
.option("checkpointLocation", "checkpoint").start() 

또는 SparkStreaming :

// Initializing Spark Streaming Context and kafka stream 
sparkConf.setMaster("local[2]") 
val ssc = new StreamingContext(sparkConf, Seconds(10)) 
[...] 
val messages = KafkaUtils.createDirectStream[String, String](
     ssc, 
     PreferConsistent, 
     Subscribe[String, String](topicsSet, kafkaParams) 
    ) 

[...] 
val urls = messages.map(record => JsonParser.parse(record.value()).values.asInstanceOf[Map[String, Any]]) 
urls.saveToEs("credentials/credential") 

.saveToEs(...)elastic-hadoop.jar의 API이며 here입니다. 불행히도 this repo은 실제로 문서화되지 않았습니다. 그래서 나는 어디에서 스크립트 명령을 넣을 수 있는지 이해할 수 없다.

나를 도와 줄 사람이 있습니까? 미리 감사드립니다.

답변

1

쓰기 모드를 "업데이트"(또는 업서 트)로 설정하고 스크립트를 "스크립트"(ES 버전에 따라 다름)로 전달하여 수행 할 수 있어야합니다.

EsSpark.saveToEs(rdd, "spark/docs", Map("es.mapping.id" -> "id", "es.write.operation" -> "update","es.update.script.inline" -> "your script" ,)) 

은 아마 당신이 "upsert"사용하려는

같은 라이브러리에서 좋은 unit tests in cascading integration있다; 이 설정은 모두 동일한 작성자를 사용하기 때문에 좋을 것입니다.

ES 버전에 맞는 올바른 설정을 선택하기 위해 단위 테스트를 읽는 것이 좋습니다.