0

연결 :에플루 카프카 내가 합류를 사용하고 elasticsearch 문서 ID 생성

org.apache.kafka.connect.errors.DataException: STRUCT is not supported as the document id. 
    at io.confluent.connect.elasticsearch.DataConverter.convertKey(DataConverter.java:75) 
    at io.confluent.connect.elasticsearch.DataConverter.convertRecord(DataConverter.java:84) 
    at io.confluent.connect.elasticsearch.ElasticsearchWriter.write(ElasticsearchWriter.java:210) 
    at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.put(ElasticsearchSinkTask.java:119) 
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:429) 
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:250) 
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:179) 
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148) 
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139) 
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182) 
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 

내 구성 카프카 - 연결 - JDBC는 다음과 같습니다.

name=task-view-list-stage 
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector 
tasks.max=10 
connection.url=jdbc:postgresql://localhost:5432/postgres?user=postgres&password=test 
table.types=TABLE 
query=select * from employee_master 
mode=timestamp+incrementing 
incrementing.column.name=employee_master_id 
timestamp.column.name=modified_date 
validate.non.null=false 
topic.prefix=my-id-app 

그리고 내 카프카 - 연결 Elasticsearch 구성

은 다음과 같습니다

name=es-id-view 
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector 
tasks.max=1 
topics=my-id-app 
topics.key.ignore=false 
transforms=InsertKey 
transforms.InsertKey.type=org.apache.kafka.connect.transforms.ValueToKey 
transforms.InsertKey.fields=employee_master_id 
connection.url=http://localhost:9200 
type.name=type_id 

내 테이블 구조는 다음과 같습니다

employee_master_id | emp_name | modified_date 
----------------------------------------------------------- 
1     | Bala | "2017-05-18 11:51:46.721182+05:30" 
------------------------------------------------------------------- 
2     | murugan | "2017-05-21 15:59:11.443901+05:30" 
------------------------------------------------------------------- 

이 문제를

+0

:은 일반 필드에 개체에서 키를 변환하는 -p4/discussion –

답변

2

뿐만 아니라 해결을 도와주세요 ValueToKey으로이 필요합니다. https://groups.google.com/d/topic/confluent-platform/2jaRg-oT : 관련 설명을 참조,이를 통해 오는 다른 사람을 위해

transforms=InsertKey,ExtractId 
transforms.InsertKey.type=org.apache.kafka.connect.transforms.ValueToKey 
transforms.InsertKey.fields=employee_master_id  
transforms.ExtractId.type=org.apache.kafka.connect.transforms.ExtractField$Key 
transforms.ExtractId.field=employee_master_id 
+0

OP가이를 승인하지 않았다고 생각합니다. 이것은 나를 도왔다! –