나는 카프카 스트림에서와 Elasticsearch 문서 upsert 읽고 파이썬에서 의사가 (카운터 view
를 증가.Spark에서 ElasticSearch의 스크립트 문서를 업데이트하는 방법은 무엇입니까? 문서가 이미 존재하는 경우
for message in consumer:
msg = json.loads(message.value)
print(msg)
index = INDEX_NAME
es_id = msg["id"]
script = {"script":"ctx._source.view+=1","upsert" : msg}
es.update(index=index, doc_type="test", id=es_id, body=script)
을 내가 분산 환경에서 사용하기 원하기 때문에, 내가 사용 KafkaStream로부터 판독 스칼라 스파크 구조화 스트리밍
df.writeStream \
.format("org.elasticsearch.spark.sql")\
.queryName("ESquery")\
.option("es.resource","credentials/url") \
.option("checkpointLocation", "checkpoint").start()
또는 SparkStreaming :
// Initializing Spark Streaming Context and kafka stream
sparkConf.setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(10))
[...]
val messages = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topicsSet, kafkaParams)
)
[...]
val urls = messages.map(record => JsonParser.parse(record.value()).values.asInstanceOf[Map[String, Any]])
urls.saveToEs("credentials/credential")
.saveToEs(...)
은 elastic-hadoop.jar
의 API이며 here입니다. 불행히도 this repo은 실제로 문서화되지 않았습니다. 그래서 나는 어디에서 스크립트 명령을 넣을 수 있는지 이해할 수 없다.
나를 도와 줄 사람이 있습니까? 미리 감사드립니다.