2017-05-19 3 views
1

카프카와 스톰에 문제가 있습니다. KafkaSpout 설정에 문제가 있거나 설정을 제대로하지 못했거나 제대로 응답하지 않았습니까?왜 Apache Storm KafkaSpout에서 카프카 항목에서 너무 많은 항목을 방출합니까?

내 카프카 토픽에 50 개의 항목을 대기열에 넣었지 만, 내 스파우트는 1300 개 이상의 튜플을 방출했습니다. 또한 Spout은 거의 모든 것이 "실패"했다고보고합니다. 토폴로지는 실제로 성공적으로 데이터베이스에 쓰고, 실패하지 않지만 분명히 모든 재생 이유 (그게 무엇을하고 있는지) 난 그냥

큰 문제는 너무 많이 알고하지 않습니다

왜 내가 카프카에게 50 세에 지나쳤을 때 너무 많은 튜플을 내고 있습니까? 여기

enter image description here

내가 토폴로지와 KafkaSpout

public static void main(String[] args) { 
    try { 
     String databaseServerIP = ""; 
     String kafkaZookeepers = ""; 
     String kafkaTopicName = ""; 
     int numWorkers = 1; 
     int numAckers = 1; 
     int numSpouts = 1; 
     int numBolts = 1; 
     int messageTimeOut = 10; 
     String topologyName = ""; 

     if (args == null || args[0].isEmpty()) { 
     System.out.println("Args cannot be null or empty. Exiting"); 
     return; 
     } else { 
     if (args.length == 8) { 
      for (String arg : args) { 
      if (arg == null) { 
       System.out.println("Parameters cannot be null. Exiting"); 
       return; 
      } 
      } 
      databaseServerIP = args[0]; 
      kafkaZookeepers = args[1]; 
      kafkaTopicName = args[2]; 
      numWorkers = Integer.valueOf(args[3]); 
      numAckers = Integer.valueOf(args[4]); 
      numSpouts = Integer.valueOf(args[5]); 
      numBolts = Integer.valueOf(args[6]); 
      topologyName = args[7]; 
     } else { 
      System.out.println("Bad parameters: found " + args.length + ", required = 8"); 
      return; 
     } 
     } 

     Config conf = new Config(); 

     conf.setNumWorkers(numWorkers); 
     conf.setNumAckers(numAckers); 
     conf.setMessageTimeoutSecs(messageTimeOut); 

     conf.put("databaseServerIP", databaseServerIP); 
     conf.put("kafkaZookeepers", kafkaZookeepers); 
     conf.put("kafkaTopicName", kafkaTopicName); 

     /** 
     * Now would put kafkaSpout instance below instead of TemplateSpout() 
     */ 
     TopologyBuilder builder = new TopologyBuilder(); 
     builder.setSpout(topologyName + "-flatItems-from-kafka-spout", getKafkaSpout(kafkaZookeepers, kafkaTopicName), numSpouts); 
     builder.setBolt(topologyName + "-flatItem-Writer-Bolt", new ItemWriterBolt(), numBolts).shuffleGrouping(topologyName + "-flatItems-from-kafka-spout"); 


     StormTopology topology = builder.createTopology(); 

     StormSubmitter.submitTopology(topologyName, conf, topology); 

    } catch (Exception e) { 
     System.out.println("There was a problem starting the topology. Check parameters."); 
     e.printStackTrace(); 
    } 
    } 

    private static KafkaSpout getKafkaSpout(String zkHosts, String topic) throws Exception { 

    //String topic = "FLAT-ITEMS"; 
    String zkNode = "/" + topic + "-subscriber-pipeline"; 
    String zkSpoutId = topic + "subscriberpipeline"; 
    KafkaTopicInZkCreator.createTopic(topic, zkHosts); 


    SpoutConfig spoutConfig = new SpoutConfig(new ZkHosts(zkHosts), topic, zkNode, zkSpoutId); 
    spoutConfig.startOffsetTime = kafka.api.OffsetRequest.LatestTime(); 

    // spoutConfig.useStartOffsetTimeIfOffsetOutOfRange = true; 
    //spoutConfig.startOffsetTime = System.currentTimeMillis(); 
    spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); 

    return new KafkaSpout(spoutConfig); 

    } 

을 설정하고 어떻게 여기 당신이 필요

public static void createTopic(String topicName, String zookeeperHosts) throws Exception { 
    ZkClient zkClient = null; 
    ZkUtils zkUtils = null; 
    try { 

     int sessionTimeOutInMs = 15 * 1000; // 15 secs 
     int connectionTimeOutInMs = 10 * 1000; // 10 secs 

     zkClient = new ZkClient(zookeeperHosts, sessionTimeOutInMs, connectionTimeOutInMs, ZKStringSerializer$.MODULE$); 
     zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperHosts), false); 

     int noOfPartitions = 1; 
     int noOfReplication = 1; 
     Properties topicConfiguration = new Properties(); 

     boolean topicExists = AdminUtils.topicExists(zkUtils, topicName); 
     if (!topicExists) { 
     AdminUtils.createTopic(zkUtils, topicName, noOfPartitions, noOfReplication, topicConfiguration, RackAwareMode.Disabled$.MODULE$); 
     } 
    } catch (Exception ex) { 
     ex.printStackTrace(); 
    } finally { 
     if (zkClient != null) { 
     zkClient.close(); 
     } 
    } 
    } 

답변

1

을 중요한 경우 주제의 창조 볼트의 메시지가 실패했는지 확인하십시오.

모두 실패한 경우 볼트에서 메시지를 확인하지 않았거나 볼트 코드에서 예외가 발생했을 수 있습니다.

볼트 메시지가 승인되면 시간 초과 가능성이 더 높습니다. 토폴로지 시간 초과 구성이나 paralisim을 늘리면 문제가 해결됩니다.

+0

감사합니다. 볼트를 잡는 적절한 방법은 무엇입니까? 토폴로지 시간 초과를 늘리려면 어떻게해야합니까? – markg

+0

@markg BaseBasicBolt를 사용하는 경우 ack를 처리 할 필요가 없습니다. BaseRichBolt를 사용하면 execute 메소드에서 ack()를 호출해야합니다. – Solo

+0

@markg 토폴로지 시간 초과는 "topology.message.timeout"구성입니다. 토폴로지 코드 또는 관리자의 storm.yaml에서 설정할 수 있습니다 – Solo