2015-01-08 2 views
0

나는 kafka Simple Consumer를 사용했다. 내가 원하는 것은 생산자로부터 메시지를받을 것이고,이 데이터는 STORM에서 처리되고 그 후에는 Cassandra에 저장 될 것입니다. 모든 것이 잘되지만 문제는 kReadka 값이 커질수록 maxReads 값이 증가 할 때입니다. 무한 루프 및 데이터 처리 및 저장 캐산드라에 결코 일어나지 않을거야. 내 질문은 여기에 maxreads 변수의 의미는 무엇입니까? 생산자가 가져온 메시지를 보내고이 튜플을 폭풍에 대비 시키면 어떻게 만들 수 있을까요? 그리고 생산자가 멈 추면 소비자가 중단되고 언젠가 생산자가 메시지를 보낸 다음 폭풍우에 통과 한 다음 간다 이 프로세스는 더 이상 작동합니다. 여기 maxread 값이 크고 메시지 reaceived가 적은 양의 kafka 소비자가 무한히가는 이유는 무엇입니까?

내가 당신의 단순한 소비자에서하지만 문제 (이 경우 카프카 주둥이에) 스파우트와 적절한 토폴로지의 특정 문제에 대해 확실하지 오전 볼트가 더있을 것입니다 내 카프카 - 소비자

package com.sethiyaji.kafka; 

import java.nio.ByteBuffer; 
import java.util.ArrayList; 
import java.util.Collections; 
import java.util.HashMap; 
import java.util.List; 
import java.util.Map; 

import kafka.api.FetchRequest; 
import kafka.api.FetchRequestBuilder; 
import kafka.api.OffsetRequest; 
import kafka.api.PartitionOffsetRequestInfo; 
import kafka.common.ErrorMapping; 
import kafka.common.TopicAndPartition; 
import kafka.javaapi.FetchResponse; 
import kafka.javaapi.OffsetResponse; 
import kafka.javaapi.PartitionMetadata; 
import kafka.javaapi.TopicMetadata; 
import kafka.javaapi.TopicMetadataRequest; 
import kafka.javaapi.TopicMetadataResponse; 
import kafka.javaapi.consumer.SimpleConsumer; 
import kafka.message.MessageAndOffset; 

public class ConsumerKafka { 
    private List<String> m_replicaBrokers; 

    public ConsumerKafka() { 
     m_replicaBrokers = new ArrayList<String>(); 
    } 

    public void run(long maxReads, String topic, int partition,List<String> seedBrokers,int port) throws Exception{ 
     PartitionMetadata partitionMetaData = findLeader(seedBrokers,port,topic,partition); 
     if(partitionMetaData == null){ 
      System.out.println("Metadata not found"); 
      return; 
     } 
     if(partitionMetaData.leader() == null){ 
      System.out.println("Leader Not Found"); 
      return; 
     } 
     String leadBroker = partitionMetaData.leader().host(); 
     //String leadBroker = seedBrokers.get(0); 
     String clientName = "Client_"+topic+"_"+partition; 

     SimpleConsumer simpleConsumer = new SimpleConsumer(leadBroker, port, 100000, 64*1024, clientName); 
     long readOffset = getLastOffset(simpleConsumer,topic,partition,OffsetRequest.EarliestTime(),clientName); 
     int numErrors = 0; 
     while(maxReads > 0){ 
      if(simpleConsumer == null){ 
       simpleConsumer = new SimpleConsumer(leadBroker, port, 100000,64*1024,clientName); 
      } 
      FetchRequest fetchRequest = new FetchRequestBuilder().clientId(clientName).addFetch(topic, partition, readOffset, 100000).build(); 
      //FetchRequest fetchRequest = new FetchRequestBuilder().addFetch(topic, partition, readOffset, 100000).build(); 
      //System.out.println("FETCH_REQUEST_PARTITION:"+fetchRequest.numPartitions()); 
      FetchResponse fetchResponse = simpleConsumer.fetch(fetchRequest); 

      if(fetchResponse.hasError()){ 
       numErrors++; 
       short code=fetchResponse.errorCode(topic, partition); 
       if(numErrors > 5) break; 
       if(code == ErrorMapping.OffsetOutOfRangeCode()){ 
        readOffset = getLastOffset(simpleConsumer,topic,partition,OffsetRequest.LatestTime(),clientName); 
        continue; 
       } 
       simpleConsumer.close(); 
       simpleConsumer=null; 
       leadBroker = findNewLeader(leadBroker,topic,partition,port); 
       continue; 
      } 
      numErrors=0; 

      long numRead = 0; 
      for(MessageAndOffset messageAndOffset: fetchResponse.messageSet(topic, partition)){ 
       long currentOffset = messageAndOffset.offset(); 
       if(currentOffset<readOffset){ 
        System.out.println("Found Old Offset:"+currentOffset+" Expecting: "+readOffset); 
        continue; 
       } 
       readOffset = messageAndOffset.nextOffset(); 
       ByteBuffer payload = messageAndOffset.message().payload(); 
       byte[] bytes = new byte[payload.limit()]; 
       payload.get(bytes); 
       System.out.println(String.valueOf(messageAndOffset.offset())+":"+new String(bytes,"UTF-8")); 
       numRead++; 
       maxReads--; 
      } 
      if(numRead == 0){ 
      try{ 
       Thread.sleep(1000); 
      }catch(InterruptedException e){ 
       System.out.println("Error:"+e); 
       } 
      } 
     } 
     //if(simpleConsumer!=null) simpleConsumer.close();  
    } 

    public long getLastOffset(SimpleConsumer consumer, String topic, int partition,long whichTime,String clientName){ 
      TopicAndPartition topicAndPartition = new TopicAndPartition(topic,partition); 
      Map<TopicAndPartition,PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition,PartitionOffsetRequestInfo>(); 
      requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1)); 
      kafka.javaapi.OffsetRequest offsetRequest = new kafka.javaapi.OffsetRequest(requestInfo,OffsetRequest.CurrentVersion(), clientName); 
      OffsetResponse offsetResponse = consumer.getOffsetsBefore(offsetRequest); 
      if(offsetResponse.hasError()){ 
       System.out.println("Error feching data oddset Data the broker reaseon:"+offsetResponse.errorCode(topic,partition)); 
       return 0; 
      } 
      long[] offsets=offsetResponse.offsets(topic,partition); 
      //System.out.println(offsets.length); 
      return offsets[0]; 
    } 
    private String findNewLeader(String oldLeader,String topic,int partition,int port)throws Exception{ 
     for(int i=0 ; i< 3;i++){ 
      boolean goToSleep=false; 
      PartitionMetadata metaData=findLeader(m_replicaBrokers,port,topic,partition); 
      if(metaData == null){ 
       goToSleep=true; 
      } else if(metaData.leader()==null){ 
       goToSleep=true; 
      } else if(oldLeader.equalsIgnoreCase(metaData.leader().host()) && i==0){ 
       goToSleep=true; 
      } else{ 
       return metaData.leader().host(); 
      } 

      if(goToSleep){ 
       try{ 
        Thread.sleep(1000); 
       }catch(InterruptedException e){ 
        System.out.println("Error:"+e); 
       } 
      } 
     } 
     System.out.println("Unable to find new Leader after broker failure.Exiting"); 
     throw new Exception("Unable to find new Leader after broker failure.Exiting."); 
    } 
    private PartitionMetadata findLeader(List<String> seedBrokers,int port,String topic,int partition){ 
     PartitionMetadata returnMetadata=null; 
     loop: 
      for(String seed: seedBrokers){ 
       SimpleConsumer consumer=null; 
       try{ 
        consumer=new SimpleConsumer(seed,port,100000,64*1024,"id7"); 
        List<String> topicsList= Collections.singletonList(topic); 
        TopicMetadataRequest request = new TopicMetadataRequest(topicsList); 
        TopicMetadataResponse response = consumer.send(request); 
        List<TopicMetadata> metaDataList= response.topicsMetadata(); 
        for(TopicMetadata item: metaDataList){ 
         for(PartitionMetadata part:item.partitionsMetadata()){ 
          if(part.partitionId() == partition){ 
           returnMetadata = part; 
           break loop; 
          } 
         } 
        } 
       } catch(Exception e){ 
        System.out.println("Error communicating with Broker ["+seed+"] to find Leader for["+topic+", "+partition+"]Reason:"+e); 
       } finally{ 
        if(consumer!=null) consumer.close(); 
        for(kafka.cluster.Broker replica: returnMetadata.replicas()){ 
          m_replicaBrokers.add(replica.host()); 
        } 
       } 
      } 
     return returnMetadata; 
    } 
} 
+1

내 이해가 정확하다면 Apache Storm과 함께 Kafka를 사용하려고합니다. [카프카 스파우트] (https://github.com/HolmesNL/kafka-spout/wiki) – user2720864

답변

0

입니다 관련된.