카프카와 스톰에 문제가 있습니다. KafkaSpout 설정에 문제가 있거나 설정을 제대로하지 못했거나 제대로 응답하지 않았습니까?왜 Apache Storm KafkaSpout에서 카프카 항목에서 너무 많은 항목을 방출합니까?
내 카프카 토픽에 50 개의 항목을 대기열에 넣었지 만, 내 스파우트는 1300 개 이상의 튜플을 방출했습니다. 또한 Spout은 거의 모든 것이 "실패"했다고보고합니다. 토폴로지는 실제로 성공적으로 데이터베이스에 쓰고, 실패하지 않지만 분명히 모든 재생 이유 (그게 무엇을하고 있는지) 난 그냥
큰 문제는 너무 많이 알고하지 않습니다
왜 내가 카프카에게 50 세에 지나쳤을 때 너무 많은 튜플을 내고 있습니까? 여기
내가 토폴로지와 KafkaSpout
public static void main(String[] args) {
try {
String databaseServerIP = "";
String kafkaZookeepers = "";
String kafkaTopicName = "";
int numWorkers = 1;
int numAckers = 1;
int numSpouts = 1;
int numBolts = 1;
int messageTimeOut = 10;
String topologyName = "";
if (args == null || args[0].isEmpty()) {
System.out.println("Args cannot be null or empty. Exiting");
return;
} else {
if (args.length == 8) {
for (String arg : args) {
if (arg == null) {
System.out.println("Parameters cannot be null. Exiting");
return;
}
}
databaseServerIP = args[0];
kafkaZookeepers = args[1];
kafkaTopicName = args[2];
numWorkers = Integer.valueOf(args[3]);
numAckers = Integer.valueOf(args[4]);
numSpouts = Integer.valueOf(args[5]);
numBolts = Integer.valueOf(args[6]);
topologyName = args[7];
} else {
System.out.println("Bad parameters: found " + args.length + ", required = 8");
return;
}
}
Config conf = new Config();
conf.setNumWorkers(numWorkers);
conf.setNumAckers(numAckers);
conf.setMessageTimeoutSecs(messageTimeOut);
conf.put("databaseServerIP", databaseServerIP);
conf.put("kafkaZookeepers", kafkaZookeepers);
conf.put("kafkaTopicName", kafkaTopicName);
/**
* Now would put kafkaSpout instance below instead of TemplateSpout()
*/
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(topologyName + "-flatItems-from-kafka-spout", getKafkaSpout(kafkaZookeepers, kafkaTopicName), numSpouts);
builder.setBolt(topologyName + "-flatItem-Writer-Bolt", new ItemWriterBolt(), numBolts).shuffleGrouping(topologyName + "-flatItems-from-kafka-spout");
StormTopology topology = builder.createTopology();
StormSubmitter.submitTopology(topologyName, conf, topology);
} catch (Exception e) {
System.out.println("There was a problem starting the topology. Check parameters.");
e.printStackTrace();
}
}
private static KafkaSpout getKafkaSpout(String zkHosts, String topic) throws Exception {
//String topic = "FLAT-ITEMS";
String zkNode = "/" + topic + "-subscriber-pipeline";
String zkSpoutId = topic + "subscriberpipeline";
KafkaTopicInZkCreator.createTopic(topic, zkHosts);
SpoutConfig spoutConfig = new SpoutConfig(new ZkHosts(zkHosts), topic, zkNode, zkSpoutId);
spoutConfig.startOffsetTime = kafka.api.OffsetRequest.LatestTime();
// spoutConfig.useStartOffsetTimeIfOffsetOutOfRange = true;
//spoutConfig.startOffsetTime = System.currentTimeMillis();
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
return new KafkaSpout(spoutConfig);
}
을 설정하고 어떻게 여기 당신이 필요
public static void createTopic(String topicName, String zookeeperHosts) throws Exception {
ZkClient zkClient = null;
ZkUtils zkUtils = null;
try {
int sessionTimeOutInMs = 15 * 1000; // 15 secs
int connectionTimeOutInMs = 10 * 1000; // 10 secs
zkClient = new ZkClient(zookeeperHosts, sessionTimeOutInMs, connectionTimeOutInMs, ZKStringSerializer$.MODULE$);
zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperHosts), false);
int noOfPartitions = 1;
int noOfReplication = 1;
Properties topicConfiguration = new Properties();
boolean topicExists = AdminUtils.topicExists(zkUtils, topicName);
if (!topicExists) {
AdminUtils.createTopic(zkUtils, topicName, noOfPartitions, noOfReplication, topicConfiguration, RackAwareMode.Disabled$.MODULE$);
}
} catch (Exception ex) {
ex.printStackTrace();
} finally {
if (zkClient != null) {
zkClient.close();
}
}
}
감사합니다. 볼트를 잡는 적절한 방법은 무엇입니까? 토폴로지 시간 초과를 늘리려면 어떻게해야합니까? – markg
@markg BaseBasicBolt를 사용하는 경우 ack를 처리 할 필요가 없습니다. BaseRichBolt를 사용하면 execute 메소드에서 ack()를 호출해야합니다. – Solo
@markg 토폴로지 시간 초과는 "topology.message.timeout"구성입니다. 토폴로지 코드 또는 관리자의 storm.yaml에서 설정할 수 있습니다 – Solo