2017-04-12 4 views
0

다음은 제작자와 소비자를위한 클래스입니다. 언제 데이터를 생성 할 수 있으며 다음 코드로 데이터를 소비 할 수 없습니다. 한번도 나를 도울 수 있습니까? 코딩에 잘못이 있었습니까? 제 목표는 소비자로부터 CustomMessage 개체를 읽고 DB에 데이터를 저장하는 것입니다.kafka에서 생산자에서 소비자로 물체를 소비하는 방법은 무엇입니까?

내 cmd 프롬프트에서 zookeeper 용 인스턴스 1, kafka 용 1, 생산자 용 1 및 소비자 용 1을 열었습니다. 나는 정말로 이해하지 못한다. 제작자 클래스와 소비자 클래스를 실행할 때 모든 인스턴스를 유지해야합니까?

모든 포인터가 정말 도움이 될 것입니다.

미리 감사드립니다.

producer class::: 

    package com.kafka.test.demo; 

    import java.io.IOException; 
    import java.util.Properties; 

    import javax.xml.parsers.ParserConfigurationException; 

    import org.apache.kafka.clients.producer.KafkaProducer; 
    import org.apache.kafka.clients.producer.ProducerRecord; 
    import org.xml.sax.SAXException; 

    public class KafkaaProducer { 
     public static void main(String[] args) throws ParserConfigurationException, SAXException, IOException { 
      Properties props = new Properties(); 
//customMessage is a pojo object which should be send to the consumer.. 
      CustomMessage customMessage= new CustomMessage(); 
      customMessage.setMessage("hello kafka"); 
      customMessage.setFan("1234213123"); 
      customMessage.setSourceSystem("Dmap"); 
      customMessage.setStatus("Unenrolled"); 
      customMessage.setMessageTyep("Simple Message"); 
      customMessage.setCreatedTime("5"); 
      customMessage.setProcessedTime("6"); 
      customMessage.setRetryCount("3"); 
      props.put("metadata.broker.list", "localhost:9092"); 
      props.put("serializer.class", "kafka.serializer.StringEncoder"); 
      props.put("request.required.acks", "1"); 
      props.put("bootstrap.servers", "localhost:9092,localhost:9093"); 
      //CustomMessageSerializer 
props.put("key.serializer","com.kafka.test.demo.CustomMessageSerializer"); 
      props.put("value.serializer", "com.kafka.test.demo.CustomMessageSerializer"); 
      try { 
       KafkaProducer<String, CustomMessage> producer = new KafkaProducer<String, CustomMessage>(props); 
       producer.send(new ProducerRecord<String, CustomMessage>("NewMessageTopic", "customMessage",customMessage)); 
       //producer.send(new ProducerRecord<String, CustomMessage>("NewMessageTopic", customMessage)); 
       System.out.println("Message " + "" + " sent !!"); 
      } catch (Exception e) { 
       e.printStackTrace(); 
      } 
     } 
    } 

    consumer class:: 
    package com.kafka.test.demo; 

    import java.net.UnknownHostException; 
    import java.util.Collections; 
    import java.util.Properties; 

    import org.apache.kafka.clients.consumer.ConsumerRecord; 
    import org.apache.kafka.clients.consumer.ConsumerRecords; 
    import org.apache.kafka.clients.consumer.KafkaConsumer; 

    import com.mongodb.BasicDBObject; 
    import com.mongodb.DB; 
    import com.mongodb.DBCollection; 
    import com.mongodb.DBObject; 
    import com.mongodb.MongoClient; 

    public class KafkaaConsumer { 
     public static void main(String[] args) throws InterruptedException { 
      Properties props = new Properties(); 
      props.put("zookeeper.connect", "localhost:2181"); 
      props.put("group.id", "testgroup"); 
      props.put("zookeeper.session.timeout.ms", "4000"); 
      props.put("zookeeper.sync.time.ms", "300"); 
      props.put("rebalance.backoff.ms", "40000"); 
      props.put("bootstrap.servers", "localhost:9092,localhost:9093"); 
      props.put("value.deserializer", "com.kafka.test.demo.CustomMessageDeserializer"); 
      props.put("key.deserializer", "com.kafka.test.demo.CustomMessageDeserializer"); 
      //perisitMessage(); 
      try{ 
       KafkaConsumer<String,CustomMessage> consumer = new KafkaConsumer<String, CustomMessage>(props); 
       consumer.subscribe(Collections.singletonList("NewMessageTopic")); 
       while (true) { 
        ConsumerRecords<String, CustomMessage> messages = consumer.poll(100); 
        for (ConsumerRecord<String, CustomMessage> message : messages) { 
         System.out.println("Message received " + message); 
        } 
        perisitMessage(); 
       } 
      } catch (Exception e) { 
       e.printStackTrace(); 
      } 
     } 

     private static void perisitMessage() { 
      // TODO Auto-generated method stub 
      CustomMessage customMessage = new CustomMessage(); 
      customMessage.setMessage("hello kafka"); 
      customMessage.setFan("1234213123"); 
      customMessage.setSourceSystem("Dmap"); 
      customMessage.setStatus("Unenrolled"); 
      customMessage.setMessageTyep("Simple Message"); 
      customMessage.setCreatedTime("5"); 
      customMessage.setProcessedTime("6"); 
      customMessage.setRetryCount("3"); 
      try { 
       MongoClient mongoClient = new MongoClient("localhost" , 27017); 
       DB db = mongoClient.getDB("DeviceTrack"); 
       DBCollection msgCollection = db.getCollection("messages"); 
       BasicDBObject document = new BasicDBObject(); 
       document.put("message", customMessage.getMessage()); 
       document.put("fan", customMessage.getFan()); 
       document.put("SourceSystem", customMessage.getSourceSystem()); 
       document.put("RetryCount", customMessage.getRetryCount()); 
       document.put("ProcessedTime", customMessage.getProcessedTime()); 
       document.put("CreatedTime", customMessage.getCreatedTime()); 
       document.put("MessageTyep", customMessage.getMessageTyep()); 
       document.put("Status", customMessage.getStatus()); 
       msgCollection.insert(document); 
       System.out.println("Inserted in the data in DB succesfully"); 

      } catch (UnknownHostException e) { 
       // TODO Auto-generated catch block 
       e.printStackTrace(); 
      } 
     } 
    } 

package com.kafka.test.demo; 

import java.util.Map; 

import org.apache.kafka.common.serialization.Deserializer; 

import com.fasterxml.jackson.databind.ObjectMapper; 

public class CustomMessageDeserializer implements Deserializer { 

    public Object deserialize(String arg0, byte[] arg1) { 
     ObjectMapper mapper = new ObjectMapper(); 
     System.out.println("arg1"+arg1); 
     CustomMessage message = null; 
     try { 
      message = mapper.readValue(arg1, CustomMessage.class); 
     } catch (Exception e) { 

      e.printStackTrace(); 
     } 
     System.out.println(""+message); 
     return message; 
    } 



    public void close() { 
     // TODO Auto-generated method stub 

    } 

    public void configure(Map arg0, boolean arg1) { 
     // TODO Auto-generated method stub 

    } 

} 

    package com.kafka.test.demo; 

    import java.util.Map; 

    import org.apache.kafka.common.serialization.Serializer; 

    import com.fasterxml.jackson.databind.ObjectMapper; 

    public class CustomMessageSerializer implements Serializer { 

     public byte[] serialize(String arg0, Object arg1) { 
      byte[] retVal = null; 
      ObjectMapper objectMapper = new ObjectMapper(); 
      try { 
       retVal = objectMapper.writeValueAsString(arg1).getBytes(); 
      } catch (Exception e) { 
       e.printStackTrace(); 
      } 
      System.out.println("value ::::::"+retVal); 
      return retVal; 
     } 

     public void close() { 
      // TODO Auto-generated method stub 

     } 

     public void configure(Map arg0, boolean arg1) { 
      // TODO Auto-generated method stub 

     } 
    } 

package com.kafka.test.demo; 

public class CustomMessage { 

    private String messageId; 
    private String parentMsgId; 
    private String fan; 
    private String message; 
    private String sourceSystem; 
    private String status; 
    private String messageTyep; 
    private String createdTime; 
    private String processedTime; 
    private String retryCount; 

    /** 
    * @return the messageId 
    */ 
    public String getMessageId() { 
     return messageId; 
    } 
    /** 
    * @param messageId the messageId to set 
    */ 
    public void setMessageId(String messageId) { 
     this.messageId = messageId; 
    } 
    /** 
    * @return the parentMsgId 
    */ 
    public String getParentMsgId() { 
     return parentMsgId; 
    } 
    /** 
    * @param parentMsgId the parentMsgId to set 
    */ 
    public void setParentMsgId(String parentMsgId) { 
     this.parentMsgId = parentMsgId; 
    } 
    /** 
    * @return the fan 
    */ 
    public String getFan() { 
     return fan; 
    } 
    /** 
    * @param fan the fan to set 
    */ 
    public void setFan(String fan) { 
     this.fan = fan; 
    } 
    /** 
    * @return the message 
    */ 
    public String getMessage() { 
     return message; 
    } 
    /** 
    * @param message the message to set 
    */ 
    public void setMessage(String message) { 
     this.message = message; 
    } 
    /** 
    * @return the sourceSystem 
    */ 
    public String getSourceSystem() { 
     return sourceSystem; 
    } 
    /** 
    * @param sourceSystem the sourceSystem to set 
    */ 
    public void setSourceSystem(String sourceSystem) { 
     this.sourceSystem = sourceSystem; 
    } 
    /** 
    * @return the status 
    */ 
    public String getStatus() { 
     return status; 
    } 
    /** 
    * @param status the status to set 
    */ 
    public void setStatus(String status) { 
     this.status = status; 
    } 
    /** 
    * @return the messageTyep 
    */ 
    public String getMessageTyep() { 
     return messageTyep; 
    } 
    /** 
    * @param messageTyep the messageTyep to set 
    */ 
    public void setMessageTyep(String messageTyep) { 
     this.messageTyep = messageTyep; 
    } 
    /** 
    * @return the createdTime 
    */ 
    public String getCreatedTime() { 
     return createdTime; 
    } 
    /** 
    * @param createdTime the createdTime to set 
    */ 
    public void setCreatedTime(String createdTime) { 
     this.createdTime = createdTime; 
    } 
    /** 
    * @return the processedTime 
    */ 
    public String getProcessedTime() { 
     return processedTime; 
    } 
    /** 
    * @param processedTime the processedTime to set 
    */ 
    public void setProcessedTime(String processedTime) { 
     this.processedTime = processedTime; 
    } 
    /** 
    * @return the retryCount 
    */ 
    public String getRetryCount() { 
     return retryCount; 
    } 
    /** 
    * @param retryCount the retryCount to set 
    */ 
    public void setRetryCount(String retryCount) { 
     this.retryCount = retryCount; 
    } 
} 
+0

어떤 카프카 버전을 사용하고 있습니까? ps 팩스의 결과는 무엇입니까 | 그레프 카프카 – divyesh

답변

0

동물원과 카프카 인스턴스 만 있으면됩니다.

  1. 시작 사육사
  2. 시작 카프카
  3. ("NewMessageTopic")
  4. 시작하여 주제를 작성하여 생산자와 소비자의 코드 내가 바로 당신을 이해한다면

유 사용 "kafka- 콘솔 생산자 "&"kafka-console-consumer "? 카프카 클러스터를 사용할 필요가 없습니다. 코드가 잘 작동한다면이 소리가 좋을 것입니다. cmd를 통해 kafka를 시작하는 것이 많은 일이면 .bat를 쓸 수 있습니다. 그것은 바로 볼 코드의 첫 번째 모습에

:startZK 
echo Zookeeper wird gestartet 
Start "Zookeper" C:\zookeeper-3.4.9\bin\zkServer.cmd 
echo Bitte warten bis Zookeeper gestartet ist. 
pause 
echo Kafka Wird Gestartet 
Start "Kafka" C:\kafka_2.11-0.10.2.0\bin\windows\kafka-server-start.bat C:\kafka_2.11-0.10.2.0\config\server.properties 

goto Top 

처럼

. 나는 단지 당신이 당신의 system.out에 도착하는 데이터를 인쇄하는 것을 안다?

  while (true) { 
       ConsumerRecords<String, CustomMessage> messages = consumer.poll(100); 
       for (ConsumerRecord<String, CustomMessage> message : messages) { 
        System.out.println("Message received " + message);<-- just a syso not more :/ 
       } 
       perisitMessage(); <-- maybe give him the message ? 
      } 

개요 메시지가 보이면 ?? 괜찮 았어. 오늘 밤 가까이서 볼 수있는 사람이 없다면. 나에게 힌트를 줘. 하지만 MongoDB에 대한 경험이 없습니다.