2014-02-26 10 views
8

Kafka-0.8 Log4j appender를 실행하려고하는데 할 수 없습니다. 내 응용 프로그램이 log4j appender를 통해 kafka로 직접 로그를 보내길 원합니다.Kafka 0.8 Log4j appender 사용 방법

여기 내 log4j.properties입니다. 적절한 인코더를 찾을 수 없었으므로 기본 인코더를 사용하도록 구성했습니다. (예 : 댓글을 달았습니다.)

log4j.rootLogger=INFO, stdout, KAFKA 

log4j.appender.stdout=org.apache.log4j.ConsoleAppender 
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout 
log4j.appender.stdout.layout.ConversionPattern=%5p [%t] (%F:%L) - %m%n 

log4j.appender.KAFKA=kafka.producer.KafkaLog4jAppender 
log4j.appender.KAFKA.layout=org.apache.log4j.PatternLayout 
log4j.appender.KAFKA.layout.ConversionPattern=%-5p: %c - %m%n 
log4j.appender.KAFKA.BrokerList=hnode01:9092 
log4j.appender.KAFKA.Topic=DKTestEvent 

#log4j.appender.KAFKA.SerializerClass=kafka.log4j.AppenderStringEncoder 

그리고 내 샘플 응용 프로그램입니다.

import org.apache.log4j.Logger; 
import org.apache.log4j.BasicConfigurator; 
import org.apache.log4j.PropertyConfigurator; 

public class HelloWorld { 

     static Logger logger = Logger.getLogger(HelloWorld.class.getName()); 

     public static void main(String[] args) { 
      PropertyConfigurator.configure(args[0]); 

      logger.info("Entering application."); 
      logger.debug("Debugging!."); 
      logger.info("Exiting application."); 
     } 
} 

필자는 컴파일 할 때 maven을 사용했습니다. 는 내 pom.xml 파일

에 kafka_2.8.2-0.8.0 및 log4j_1.2.17을 포함 그리고 나는이 오류가 무엇입니까 :

INFO [main] (Logging.scala:67) - Verifying properties 
INFO [main] (Logging.scala:67) - Property metadata.broker.list is overridden to hnode01:9092 
INFO [main] (Logging.scala:67) - Property serializer.class is overridden to kafka.serializer.StringEncoder 
INFO [main] (HelloWorld.java:14) - Entering application. 
INFO [main] (HelloWorld.java:14) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 0 for 1 topic(s) Set(DKTestEvent) 
INFO [main] (HelloWorld.java:14) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 1 for 1 topic(s) Set(DKTestEvent) 
INFO [main] (HelloWorld.java:14) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 2 for 1 topic(s) Set(DKTestEvent) 
INFO [main] (HelloWorld.java:14) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 3 for 1 topic(s) Set(DKTestEvent) 
INFO [main] (HelloWorld.java:14) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 4 for 1 topic(s) Set(DKTestEvent) 
INFO [main] (HelloWorld.java:14) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 5 for 1 topic(s) Set(DKTestEvent) 
. 
. 
. 
INFO [main] (HelloWorld.java:14) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 60 for 1 topic(s) Set(DKTestEvent) 
INFO [main] (HelloWorld.java:14) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 61 for 1 topic(s) Set(DKTestEvent) 
INFO [main] (HelloWorld.java:14) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 62 for 1 topic(s) Set(DKTestEvent) 
INFO [main] (Logging.scala:67) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 63 for 1 topic(s) Set(DKTestEvent) 
INFO [main] (Logging.scala:67) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 64 for 1 topic(s) Set(DKTestEvent) 
INFO [main] (Logging.scala:67) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 65 for 1 topic(s) Set(DKTestEvent) 
INFO [main] (Logging.scala:67) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 66 for 1 topic(s) Set(DKTestEvent) 
INFO [main] (Logging.scala:67) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 67 for 1 topic(s) Set(DKTestEvent) 
. 
. 
. 
INFO [main] (Logging.scala:67) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 534 for 1 topic(s) Set(DKTestEvent) 
ERROR [main] (Logging.scala:67) - 
ERROR [main] (Logging.scala:67) - 
ERROR [main] (Logging.scala:67) - 
ERROR [main] (Logging.scala:67) - 
ERROR [main] (Logging.scala:67) - 
ERROR [main] (Logging.scala:67) - 
java.lang.StackOverflowError 
    at java.lang.ClassLoader.defineClass1(Native Method) 
    at java.lang.ClassLoader.defineClass(ClassLoader.java:643) 
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) 
at java.net.URLClassLoader.defineClass(URLClassLoader.java:277) 
at java.net.URLClassLoader.access$000(URLClassLoader.java:73) 
at java.net.URLClassLoader$1.run(URLClassLoader.java:212) 
at java.security.AccessController.doPrivileged(Native Method) 
at java.net.URLClassLoader.findClass(URLClassLoader.java:205) 
at java.lang.ClassLoader.loadClass(ClassLoader.java:323) 
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:294) 
at java.lang.ClassLoader.loadClass(ClassLoader.java:268) 
at java.lang.ClassLoader.defineClass1(Native Method) 
at java.lang.ClassLoader.defineClass(ClassLoader.java:643) 
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) 
at java.net.URLClassLoader.defineClass(URLClassLoader.java:277) 
at java.net.URLClassLoader.access$000(URLClassLoader.java:73) 
at java.net.URLClassLoader$1.run(URLClassLoader.java:212) 
at java.security.AccessController.doPrivileged(Native Method) 
at java.net.URLClassLoader.findClass(URLClassLoader.java:205) 
at java.lang.ClassLoader.loadClass(ClassLoader.java:323) 
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:294) 
at java.lang.ClassLoader.loadClass(ClassLoader.java:268) 
at org.apache.log4j.spi.ThrowableInformation.getThrowableStrRep(ThrowableInformation.java:87) 
at org.apache.log4j.spi.LoggingEvent.getThrowableStrRep(LoggingEvent.java:413) 
at org.apache.log4j.WriterAppender.subAppend(WriterAppender.java:313) 
at org.apache.log4j.WriterAppender.append(WriterAppender.java:162) 
at org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251) 
at org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:66) 
at org.apache.log4j.Category.callAppenders(Category.java:206) 
at org.apache.log4j.Category.forcedLog(Category.java:391) 
at org.apache.log4j.Category.error(Category.java:322) 
at kafka.utils.Logging$$anonfun$swallowError$1.apply(Logging.scala:105) 
at kafka.utils.Logging$$anonfun$swallowError$1.apply(Logging.scala:105) 
at kafka.utils.Utils$.swallow(Utils.scala:189) 
at kafka.utils.Logging$class.swallowError(Logging.scala:105) 
at kafka.utils.Utils$.swallowError(Utils.scala:46) 
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67) 
at kafka.producer.Producer.send(Producer.scala:76) 
at kafka.producer.KafkaLog4jAppender.append(KafkaLog4jAppender.scala:96) 
at org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251) 
at org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:66) 
at org.apache.log4j.Category.callAppenders(Category.java:206) 
at org.apache.log4j.Category.forcedLog(Category.java:391) 
at org.apache.log4j.Category.info(Category.java:666) 
at kafka.utils.Logging$class.info(Logging.scala:67) 
at kafka.client.ClientUtils$.info(ClientUtils.scala:31) 
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:51) 
at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) 
at kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:67) 
at kafka.utils.Utils$.swallow(Utils.scala:187) 
at kafka.utils.Logging$class.swallowError(Logging.scala:105) 
at kafka.utils.Utils$.swallowError(Utils.scala:46) 
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67) 
at kafka.producer.Producer.send(Producer.scala:76) 
at kafka.producer.KafkaLog4jAppender.append(KafkaLog4jAppender.scala:96) 
at org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251) 
at org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:66) 
. 
. 
. 

나는 내가 종료 돈`t 지속적 경우 오류 위에 얻고을 프로그램.

내가 뭔가를 놓친 경우 친절하게 알려주십시오. 이와 같이 상기 펜더 비동기를 설정

답변

1

보십시오 log4j.appender.KAFKA.ProducerType =

비동기은 카프카 제조자 자체 로그인 때문에 그것은 무한 루프에가는 것이 타당 ..

5

Jonas가 Kafka 프로듀서 로깅이 카프카 어 펜더에 기록되어 결국 무한 루프와 스택 오버플로가 발생하는 문제를 확인했다고 생각합니다. (말장난 없음) 모든 카프카 로그를 다른 어 펜더로 구성 할 수 있습니다 . 출력을 전송을 보여줍니다 표준 출력 :

log4j.logger.kafka=INFO, stdout 

그래서 당신은 내가 카프카 0.8.2.2에서 log4j를 통해 이벤트를 생성 할 수있게되었습니다

log4j.rootLogger=INFO, stdout, KAFKA 
log4j.logger.kafka=INFO, stdout 
log4j.logger.HelloWorld=INFO, KAFKA 
2

당신의 log4j.properties에 다음과 같은 결과가 발생해야한다 . 여기

<?xml version="1.0" encoding="UTF-8" ?> 
<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd"> 

<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/"> 

    <appender name="console" class="org.apache.log4j.ConsoleAppender"> 
     <param name="Target" value="System.out" /> 
     <layout class="org.apache.log4j.PatternLayout"> 
     <param name="ConversionPattern" value="%-5p %c{1} - %m%n" /> 
     </layout> 
    </appender> 
    <appender name="fileAppender" class="org.apache.log4j.RollingFileAppender"> 
     <param name="Threshold" value="INFO" /> 
     <param name="MaxBackupIndex" value="100" /> 
     <param name="File" value="/tmp/agna-LogFile.log" /> 
     <layout class="org.apache.log4j.PatternLayout"> 
     <param name="ConversionPattern" value="%d %-5p [%c{1}] %m %n" /> 
     </layout> 
    </appender> 
    <appender name="kafkaAppender" class="kafka.producer.KafkaLog4jAppender"> 
     <param name="Topic" value="kafkatopic" /> 
     <param name="BrokerList" value="localhost:9092" /> 
     <param name="syncSend" value="true" /> 
     <layout class="org.apache.log4j.PatternLayout"> 
     <param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L %% - %m%n" /> 
     </layout> 
    </appender> 
    <logger name="org.apache.kafka"> 
     <level value="error" /> 
     <appender-ref ref="console" /> 
    </logger> 
    <logger name="com.example.kafkaLogger"> 
     <level value="debug" /> 
     <appender-ref ref="kafkaAppender" /> 
    </logger> 
    <root> 
     <priority value="debug" /> 
     <appender-ref ref="console" /> 
    </root> 
</log4j:configuration> 

소스 코드입니다 :

https://github.com/ypant/kafka-json-producer.git

:

package com.example; 

import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 

import org.json.simple.JSONArray; 
import org.json.simple.JSONObject; 
import java.util.Properties; 
import java.util.concurrent.ExecutionException; 
import org.apache.kafka.clients.producer.ProducerConfig; 
import org.apache.kafka.clients.producer.KafkaProducer; 

import org.apache.kafka.clients.producer.ProducerRecord; 
import org.apache.kafka.common.serialization.StringSerializer; 

public class JsonProducer { 
    static Logger defaultLogger = LoggerFactory.getLogger(JsonProducer.class); 
    static Logger kafkaLogger = LoggerFactory.getLogger("com.example.kafkaLogger"); 

    public static void main(String args[]) { 

     JsonProducer obj = new JsonProducer(); 

     String str = obj.getJsonObjAsString(); 

     // Use the logger 
     kafkaLogger.info(str); 

     try { 
      // Construct and send message 
      obj.constructAndSendMessage(); 
     } catch (InterruptedException e) { 
      defaultLogger.error("Caught interrupted exception " + e); 
     } catch (ExecutionException e) { 
      defaultLogger.error("Caught execution exception " + e); 
     } 
    } 

    private String getJsonObjAsString() { 
     JSONObject obj = new JSONObject(); 
     obj.put("name", "John"); 
     obj.put("age", new Integer(55)); 
     obj.put("address", "123 MainSt, Palatine, IL"); 

     JSONArray list = new JSONArray(); 
     list.add("msg 1"); 
     list.add("msg 2"); 
     list.add("msg 3"); 

     obj.put("messages", list); 

     return obj.toJSONString(); 
    } 

    private void constructAndSendMessage() throws InterruptedException, ExecutionException { 
     Properties props = new Properties(); 
     props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); 
     props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); 
     props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); 

     KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props); 

     boolean sync = false; 
     String topic = "kafkatopic"; 
     String key = "mykey"; 
     String value = "myvalue1 mayvalue2 myvalue3"; 
     ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(topic, key, value); 
     if (sync) { 
      producer.send(producerRecord).get(); 
     } else { 
      producer.send(producerRecord); 
     } 
     producer.close(); 
    } 
} 

전체 프로젝트는 다음 링크를 아래에서 볼 수 있습니다 여기 내 log4j 구성입니다