2017-09-28 4 views
0

Kafka 브로커가 포트 9093으로 SSL을 수신 할 때 내 코드 하나가 예외를 throw합니다. 일반 텍스트 수신기에서 코드 스 니펫이 정상적으로 작동합니다.SSL/TLS Listener에서 실행중인 Kafka에서 BlockingChannel을 읽을 때 EOF 예외 받기

어떤 아이디어가 잘못 되었습니까? ??

 public KafkaMetadataHelper(String kafkaConnect) throws Exception { 
    // use lowlevel kafka.api to query consumer group metadata (ie max committed offset) 
    String[] hostAndPort = kafkaConnect.split(":"); 
    String host = hostAndPort[0]; 
    int port = Integer.parseInt(hostAndPort[1]); 
    channel = new BlockingChannel(host, port, 
            BlockingChannel.UseDefaultBufferSize(), 
            BlockingChannel.UseDefaultBufferSize(), 
            10000); 
    channel.connect(); 
    GroupCoordinatorRequest request = new GroupCoordinatorRequest(MY_GROUP, 
                    GroupCoordinatorRequest.CurrentVersion(), 
                    correlationId++, 
                    MY_CLIENTID); 
    channel.send(request); 
    GroupCoordinatorResponse metadataResponse = null; 
    try { 
     metadataResponse = GroupCoordinatorResponse.readFrom(channel.receive().payload());// This is where the exception is thrown 

    } catch (Exception e) { 
     e.printStackTrace(); 
    } 

}

내가 무엇입니까 오류 메시지는 이것이다.

java.io.EOFException 
    at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:103) 
    at kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:131) 
    at kafka.network.BlockingChannel.receive(BlockingChannel.scala:122) 

답변

0

TLS를 통해 연결하려면 클라이언트에 몇 가지 설정이 필요합니다! BlockingChannel은 호출자가 설정을 지정할 수 없습니다.

ConsumerGroupCommand.scala [1]을보고 소비자 그룹에 대한 세부 정보를 검색하는 방법은 AdminClient [2]을 사용하는 것이 좋습니다. https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala#L496

  1. https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/admin/AdminClient.scala#L197
+0

덕분에 나는이 공부하고 내 입력을 공유합니다 ... –