1

플 링크에서 탄력적 인 제작자를 사용하고 싶지만 인증에 약간의 문제가 있습니다. Nginx를 신축성있는 검색 클러스터 앞에두고 기본 인증을 nginx에서 사용합니다. BasicAuth를 Flink에서 ElasticSearch Connector와 함께 사용하는 방법

그러나 탄성 검색 커넥터와 나는 (때문에 InetSocketAddress의) 내 URL에 기본 인증을 추가 할 수 없습니다

기본적인 인증과 elasticsearch 커넥터를 사용하는 아이디어 있었나요?

감사합니다.

내 코드가있다 :

val configur = new java.util.HashMap[String, String] 

    configur.put("cluster.name", "cluster") 

    configur.put("bulk.flush.max.actions", "1000") 

    val transportAddresses = new java.util.ArrayList[InetSocketAddress] 
    transportAddresses.add(new InetSocketAddress(InetAddress.getByName("cluster.com"), 9300)) 


    jsonOutput.filter(_.nonEmpty).addSink(new ElasticsearchSink(configur, 
                   transportAddresses, 
                   new ElasticsearchSinkFunction[String] { 
     def createIndexRequest(element: String): IndexRequest = { 

     val jsonMap = parse(element).values.asInstanceOf[java.util.HashMap[String, String]] 

     return Requests.indexRequest() 
      .index("flinkTest") 
      .source(jsonMap); 
     } 

     override def process(element: String, ctx: RuntimeContext, indexer: RequestIndexer) { 
     indexer.add(createIndexRequest(element)) 
     } 
    })) 

답변

0

FLINK 포트 9300에 바이너리 프로토콜을 사용하여 연결하는 Elasticsearch 전송 클라이언트를 사용 귀하의 nginx 프록시 포트 9200

을에 HTTP 인터페이스의 앞에 앉아있다

Flink는 프록시를 사용하지 않으므로 인증을 제공 할 필요가 없습니다.

+0

의 "아마존 RS에 FLINK 연결"부분에 설명되어 나는 2 개의 다른 프로토콜 내 나쁜 죄송가 이해가 안 주셔서 감사합니다. 하지만 내 플립 크 클러스터와 ES 클러스터는 동일한 로컬 네트워크 (동일한 공급자가 아님)에 있지 않으므로 HTTP 클라이언트를 사용해야하지만이 공식 클라이언트 플 링크/탄력성 검색을 찾지 못하므로 – FlinkNoob

-1

Flink와 Elasticsearch를 연결하는 데 HTTP 클라이언트를 사용해야하는 경우 한 가지 해결책은 Jest Library입니다.

는이 기본 자바 클래스와 같은 사용자 정의 SinkFunction을 만들어야합니다 : 당신은 또한 더 빠른 속도에 대한 대량 작업을 사용할 수 있습니다

package fr.gfi.keenai.streaming.io.sinks.elasticsearch5; 

import org.apache.flink.configuration.Configuration; 
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; 

import io.searchbox.client.JestClient; 
import io.searchbox.client.JestClientFactory; 
import io.searchbox.client.config.HttpClientConfig; 
import io.searchbox.core.Index; 

public class ElasticsearchJestSinkFunction<T> extends RichSinkFunction<T> { 

    private static final long serialVersionUID = -7831614642918134232L; 

    private JestClient client; 

    @Override 
    public void invoke(T value) throws Exception { 

     String document = convertToJsonDocument(value); 

     Index index = new Index.Builder(document).index("YOUR_INDEX_NAME").type("YOUR_DOCUMENT_TYPE").build(); 
     client.execute(index); 

    } 

    @Override 
    public void open(Configuration parameters) throws Exception { 

     // Construct a new Jest client according to configuration via factory 
     JestClientFactory factory = new JestClientFactory(); 
     factory.setHttpClientConfig(new HttpClientConfig.Builder("http://localhost:9200") 
       .multiThreaded(true) 
       // Per default this implementation will create no more than 2 concurrent 
       // connections per given route 
       .defaultMaxTotalConnectionPerRoute(2) 
       // and no more 20 connections in total 
       .maxTotalConnection(20) 
       // Basic username and password authentication 
       .defaultCredentials("YOUR_USER", "YOUR_PASSWORD") 
       .build()); 
     client = factory.getObject(); 
    } 

    private String convertToJsonDocument(T value) { 
     //TODO 
     return "{}"; 
    } 

} 

참고.

FLINK에 대한 농담 구현의 exemple

post

+0

을 만들어야합니다. 기술을 광고 할 예정이라면 적어도 그 사용법을 보여주고 OP 문제를 해결하는 방법을 명확히해야합니다. –

+0

이 질문에 대한 답을 제공하지 않습니다. 충분한 [평판] (https://stackoverflow.com/help/whats-reputation)이 있으면 [모든 게시물에 주석 달기] (https://stackoverflow.com/help/privileges/comment) 할 수 있습니다. 대신, [질문자의 설명이 필요없는 답변을 제공하십시오] (https://meta.stackexchange.com/questions/214173/why-do-i-need-50-reputation-to-comment-what-can- i-do- 대신). - [검토 중] (리뷰/저품각 게시물/18108859) – Arman

+1

이 링크는 질문에 대한 답변 일지 모르지만 여기에 답변의 핵심 부분을 포함하고 참조 용 링크를 제공하는 것이 좋습니다. 링크 된 페이지가 변경되면 링크 전용 답변이 유효하지 않게 될 수 있습니다. - [From Review] (리뷰/저품절 게시물/18108859) –