0

내가 FLINK 1.1.2 사용하고 어떠한 Elasticsearch 노드에 연결되지 않고 메이븐에 ElesticSearch 의존성을 추가 한 내가 homebrew를 사용하여 ElesticSearch를 설치 한 후클라이언트는 FLINK

,691 아래와 같이이 elesticsearch 명령을 사용하기 시작했습니다

public class ReadFromKafka { 


    public static void main(String[] args) throws Exception { 
     // create execution environment 
     StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 

     Properties properties = new Properties(); 
     properties.setProperty("bootstrap.servers", "localhost:9092"); 
     properties.setProperty("zookeeper.connect", "localhost:2181"); 
     properties.setProperty("group.id", "test"); 


     DataStream<JoinedStreamEvent> message = env.addSource(new FlinkKafkaConsumer09<JoinedStreamEvent>("test", 
       new JoinSchema(), properties)); 

     System.out.println("reading form kafka "); 

     message.print(); 


     Map<String, String> config = new HashMap<>(); 
     config.put("bulk.flush.max.actions", "1"); // flush inserts after every event 
     config.put("cluster.name", "elasticsearch_amar"); // default cluster name 

     List<InetSocketAddress> transports = new ArrayList<>(); 
// set default connection details 
     transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300)); 

     message.addSink(new ElasticsearchSink<>(config,transports,new ElasticInserter())); 

     env.execute(); 


    } //main 

    public static class ElasticInserter implements ElasticsearchSinkFunction<JoinedStreamEvent>{ 


     @Override 
     public void process(JoinedStreamEvent record, RuntimeContext runtimeContext, RequestIndexer requestIndexer) { 

      Map<String, Integer> json = new HashMap<>(); 

      json.put("Time", record.getPatient_id()); 
      json.put("heart Rate ", record.getHeartRate()); 
      json.put("resp rete", record.getRespirationRate()); 


      IndexRequest rqst = Requests.indexRequest() 
        .index("nyc-places")   // index name 
        .type("popular-locations")  // mapping name 
        .source(json); 

      requestIndexer.add(rqst); 

     } //process 


    } //ElasticInserter 

} //ReadFromKafka 

탄성 검색 내 프로그램을 시작할 때

enter image description here

그러나, 나는 50 아래에있는 내 평판은 말씀 드릴 수 없습니다 오류 enter image description here

답변

0

다음 얻었다. ES가 있는지 여부를

  • 먼저 확인, 이 Can't Connect to Elasticsearch (through Curl)를 참조하십시오

    나는 제안의 비트.

  • 도커 컨테이너를 사용하여 ES를 시작하는 것이 좋습니다. 고정 표시기 실행 -d --name ES -p 9200 : 9200 elasticsearch : 2 -Des.network.host BTW = 0.0.0.0
  • , 당신은 시도 할 수 있습니다 :
: ES의 설정 elasticsearch.yml0.0.0.0es.network.host 값을 수정