2016-12-28 1 views
0

여기가 제 코드입니다. 나는 주제를 만들 수 있지만 어떤 이유로 인해 주제 안에 데이터를 보낼 수 없습니다. 오랜 시간이 지나면 이러한 오류가 발생합니다. 나는이에 대한이 server.log 파일입니다 카프카 버전 2.11-0.8.2.1카프카 제작자가 서버에 데이터를 보낼 수 없습니다.

[email protected]74c6c 
[email protected]6995df 

을 사용하고 카프카

다음
[2016-12-27 21:05:54,873] ERROR Closing socket for /127.0.0.1 because of error (kafka.network.Processor) 
java.io.IOException: An established connection was aborted by the software in your host machine 
at sun.nio.ch.SocketDispatcher.read0(Native Method) 
at sun.nio.ch.SocketDispatcher.read(Unknown Source) 
at sun.nio.ch.IOUtil.readIntoNativeBuffer(Unknown Source) 
at sun.nio.ch.IOUtil.read(Unknown Source) 
at sun.nio.ch.SocketChannelImpl.read(Unknown Source) 
at kafka.utils.Utils$.read(Utils.scala:380) 
at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) 
at kafka.network.Processor.read(SocketServer.scala:444) 
at kafka.network.Processor.run(SocketServer.scala:340) 
at java.lang.Thread.run(Unknown Source) 
[2016-12-27 21:07:54,727] INFO Closing socket connection to /127.0.0.1. (kafka.network.Processor) 
[2016-12-27 21:16:08,559] INFO Closing socket connection to /127.0.0.1. (kafka.network.Processor) 

카프카 시스템으로 정수 번호를 보내 내 자바 코드는 다음과 같습니다

여기
Properties props = new Properties(); 
    props.put("bootstrap.servers", "localhost:9092"); 
    props.put("acks", "all"); 
    props.put("retries", 0); 
    props.put("batch.size", 16384); 
    props.put("linger.ms", 1); 
    props.put("buffer.memory", 33554432); 
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 
    props.put("timeout.ms", "50"); 

    Producer<String, String> producer = new KafkaProducer<>(props); 
     for(int i = 0; i < 2; i++) 
      System.out.println(producer.send(new ProducerRecord<String, String>("testtopic", Integer.toString(i), 
        Integer.toString(i))).toString()); 

producer.close(); 

은 pom.xml 파일입니다

<dependencies> 
    <dependency> 
    <groupId>org.apache.kafka</groupId> 
    <artifactId>kafka-clients</artifactId> 
    <version>0.10.1.0</version> 
</dependency> 
<dependency> 
    <groupId>org.apache.kafka</groupId> 
    <artifactId>kafka_2.11</artifactId> 
    <version>0.8.2.1</version> 
</dependency> 
<dependency> 
    <groupId>org.slf4j</groupId> 
    <artifactId>slf4j-simple</artifactId> 
    <version>1.6.4</version> 
</dependency> 
<dependency> 
    <groupId>log4j</groupId> 
    <artifactId>log4j</artifactId> 
    <version>1.2.16</version> 
    <exclusions> 
    <exclusion> 
     <groupId>javax.jms</groupId> 
     <artifactId>jms</artifactId> 
    </exclusion> 
    </exclusions> 
</dependency> 
</dependencies> 
+0

시도는()에서 System.out.println없이 데이터를 전송하는; 무슨 일이 있었는지 알려주시겠습니까? – user4342532

답변

0

카프카 버전을 kafka_2.10-0.9.0.0으로 다운 그레이드했으며 다음 속성이 함께 작동합니다. 다음과 같이

Properties props = new Properties(); 
    props.put("metadata.broker.list", "localhost:9092"); 
    props.put("acks", "all"); 
    props.put("retries", 0); 
    props.put("batch.size", 16384); 
    props.put("linger.ms", 1); 
    props.put("buffer.memory", 33554432); 
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
    props.put("serializer.class", "kafka.serializer.StringEncoder"); 
    ProducerConfig producerConfig = new ProducerConfig(props); 
    kafka.javaapi.producer.Producer<String, String> producer 
    = new kafka.javaapi.producer.Producer<String, String>(producerConfig); 

내 pom.xml 파일은 다음과 같습니다

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 
<modelVersion>4.0.0</modelVersion> 
<groupId>TwitterKafkaPostgre</groupId> 
<artifactId>TwitterKafkaPostgre</artifactId> 
<version>0.0.1-SNAPSHOT</version> 
<dependencies> 
    <dependency> 
    <groupId>com.twitter</groupId> 
    <artifactId>hbc-core</artifactId> <!-- or hbc-twitter4j --> 
    <version>2.2.0</version> <!-- or whatever the latest version is --> 
    </dependency> 
    <dependency> 
    <groupId>org.apache.kafka</groupId> 
    <artifactId>kafka-clients</artifactId> 
    <version>0.9.0.0</version> 
</dependency> 
<dependency> 
    <groupId>org.apache.kafka</groupId> 
    <artifactId>kafka_2.11</artifactId> 
    <version>0.9.0.0</version> 
</dependency> 
<dependency> 
    <groupId>log4j</groupId> 
    <artifactId>log4j</artifactId> 
    <version>1.2.16</version> 
    <exclusions> 
    <exclusion> 
     <groupId>javax.jms</groupId> 
     <artifactId>jms</artifactId> 
    </exclusion> 
    </exclusions> 
</dependency> 
<dependency> 
    <groupId>org.slf4j</groupId> 
    <artifactId>slf4j-simple</artifactId> 
    <version>1.6.4</version> 
</dependency> 
<dependency> 
    <groupId>com.google.guava</groupId> 
    <artifactId>guava</artifactId> 
    <version>18.0</version> 
</dependency> 

0

것도 기본적으로 카프카의 5 분입니다 기본 폴링 간격보다 더해야

props.put("timeout.ms", "50"); 

요청 제한 시간을 제외하고 눈에 띄는 없습니다. 그래서 그것이 작동해야합니다 (그냥 5 분 이상) 기본값으로두면 같아요.

+0

그 줄을 주석 처리하려고했지만 여전히 같은 문제가 있습니다. pom.xml에 문제가있을 수 있습니까? 다음은 내가 사용하는 종속성입니다. 나는 kafka_2.11-0.8.2.1 버전을 사용하고 있습니다. –