2013-02-21 8 views
6

KafkaSpout을 사용중인 스톰 토폴로지를 다시 균형을 잡으려고합니다. 내 코드는 다음과 같습니다 그러나Java 코드를 사용하여 스톰 토폴로지 재조정

TopologyBuilder builder = new TopologyBuilder(); 
    Properties kafkaProps = new Properties(); 
    kafkaProps.put("zk.connect", "localhost:2181"); 
    kafkaProps.put("zk.connectiontimeout.ms", "1000000"); 
    kafkaProps.put("groupid", "storm"); 

    builder.setSpout("kafkaSpout" , new KafkaSpout(kafkaProps, "test"), 3); 
    builder.setBolt("eventBolt", new EventBolt(), 2).shuffleGrouping("kafkaSpout", "eventStream"); 
    builder.setBolt("tableBolt", new TableBolt(), 2).shuffleGrouping("kafkaSpout", "tableStream"); 

    Map<String, Object> conf = new HashMap<String, Object>(); 
    conf.put(Config.TOPOLOGY_DEBUG, true); 

    LocalCluster cluster = new LocalCluster(); 
    cluster.submitTopology("test", conf, builder.createTopology()); 

    Utils.sleep(1000*5); 

    List<TopologySummary> topologySummaries = cluster.getClusterInfo().get_topologies(); 
    for (TopologySummary summary : topologySummaries) { 
     StormTopology topology = cluster.getTopology(summary.get_id()); 
     RebalanceOptions options = new RebalanceOptions(); 
     options.set_wait_secs(0); 
     options.set_num_workers(4); 

     for (String name : topology.get_bolts().keySet()) { 
      System.err.println(name + " " + topology.get_bolts().get(name).get_common().get_json_conf()); 
      options.put_to_num_executors(name , 5); 
     } 
     for (String name : topology.get_spouts().keySet()) { 
      System.err.println(name + " " + topology.get_spouts().get(name).get_common().get_json_conf()); 
      options.put_to_num_executors(name , 5); 
     } 

     cluster.rebalance(summary.get_name() , options); 
    } 

, 균형을 다시하는 동안, 다음과 같은 오류 추적과 같습니다

10341 [storm_rishabh-1361473654345-95461d10_watcher_executor] INFO kafka.consumer.ZookeeperConsumerConnector - storm_rishabh-1361473654345-95461d10 begin rebalancing consumer storm_rishabh-1361473654345-95461d10 try #1 
10341 [storm_rishabh-1361473654345-3b26ed76_watcher_executor] INFO kafka.consumer.ZookeeperConsumerConnector - storm_rishabh-1361473654345-3b26ed76 begin rebalancing consumer storm_rishabh-1361473654345-3b26ed76 try #1 
10342 [storm_rishabh-1361473654345-95461d10_watcher_executor] ERROR kafka.consumer.ZookeeperConsumerConnector - storm_rishabh-1361473654345-95461d10 error during syncedRebalance 
java.lang.NullPointerException: null 
at kafka.utils.ZkUtils$.getChildrenParentMayNotExist(ZkUtils.scala:181) ~[kafka_2.9.2-0.7.0.jar:na] 
at kafka.utils.ZkUtils$.getCluster(ZkUtils.scala:202) ~[kafka_2.9.2-0.7.0.jar:na] 
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:447) ~[kafka_2.9.2-0.7.0.jar:na] 
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:78) ~[scala-library-2.9.2.jar:na] 
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:444) ~[kafka_2.9.2-0.7.0.jar:na] 
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anon$1.run(ZookeeperConsumerConnector.scala:401) ~[kafka_2.9.2-0.7.0.jar:na] 
10342 [storm_rishabh-1361473654345-3b26ed76_watcher_executor] ERROR kafka.consumer.ZookeeperConsumerConnector - storm_rishabh-1361473654345-3b26ed76 error during syncedRebalance 
java.lang.NullPointerException: null 
at kafka.utils.ZkUtils$.getChildrenParentMayNotExist(ZkUtils.scala:181) ~[kafka_2.9.2-0.7.0.jar:na] 
at kafka.utils.ZkUtils$.getCluster(ZkUtils.scala:202) ~[kafka_2.9.2-0.7.0.jar:na] 
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:447) ~[kafka_2.9.2-0.7.0.jar:na] 
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:78) ~[scala-library-2.9.2.jar:na] 
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:444) ~[kafka_2.9.2-0.7.0.jar:na] 
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anon$1.run(ZookeeperConsumerConnector.scala:401) ~[kafka_2.9.2-0.7.0.jar:na] 
10342 [storm_rishabh-1361473654345-95461d10_watcher_executor] INFO kafka.consumer.ZookeeperConsumerConnector - storm_rishabh-1361473654345-95461d10 stopping watcher executor thread for consumer storm_rishabh-1361473654345-95461d10 
10343 [storm_rishabh-1361473654345-3b26ed76_watcher_executor] INFO kafka.consumer.ZookeeperConsumerConnector - storm_rishabh-1361473654345-3b26ed76 stopping watcher executor thread for consumer storm_rishabh-1361473654345-3b26ed76 

사람이 문제가 될 수 있는지 말해 주시겠습니까? kafkaSpout에서 더 많은 것을 정의해야합니다. 재조정 할 때 올바르게 종료 된 다음 다시 시작됩니다.

답변

0

LocalCluster (개발 목적으로)에서 실행할 때 이와 동일한 문제가 발생했습니다. 테스트 구성 YAML을 변경하여 1 :

topology.workers: 1 

이 문제를 해결했습니다. 나는 실제 분산 클러스터에서 이것을 실행하려고 시도하지 않았으므로 이것이 단지 LocalCluster 모드로 실행되는 아티팩트 일지 모르겠다.

(내 코드에서 나는. LocalCluster.rebalance를 호출 할 수 없습니다) 관리자 또는 후광 노드에서

0

사용 폭풍 재조정 명령.

예를 들어, 폭풍 재조정 mytopology -n 5 -e blue-spout = 3 -e yellow-bolt = 10입니다.

이 사이트를 참조하십시오. www.michael-noll.com.