온라인 시스템에서 폭풍우 Bolt는 NullPointerException을 얻습니다. 가끔씩 NullPointerException을 얻습니다.AbstractStringBuilder.ensureCapacityInternal은 폭풍우에서 NullPointerException을 얻습니다.
import ***.KeyUtils;
import ***.redis.PipelineHelper;
import ***.redis.PipelinedCacheClusterClient;
import **.redis.R2mClusterClient;
import org.apache.commons.lang3.StringUtils;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Tuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import java.util.Map;
/**
* RedisBolt batch operate
*/
public class RedisBolt implements IRichBolt {
static final long serialVersionUID = 737015318988609460L;
private static ApplicationContext applicationContext;
private static long logEmitNumber = 0;
private static StringBuffer totalCmds = new StringBuffer();
private Logger logger = LoggerFactory.getLogger(getClass());
private OutputCollector _collector;
private R2mClusterClient r2mClusterClient;
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
_collector = outputCollector;
if (applicationContext == null) {
applicationContext = new ClassPathXmlApplicationContext("spring/spring-config-redisbolt.xml");
}
if (r2mClusterClient == null) {
r2mClusterClient = (R2mClusterClient) applicationContext.getBean("r2mClusterClient");
}
}
@Override
public void execute(Tuple tuple) {
String log = tuple.getString(0);
String lastCommands = tuple.getString(1);
try {
//log count
if (StringUtils.isNotEmpty(log)) {
logEmitNumber++;
}
if (StringUtils.isNotEmpty(lastCommands)) {
if(totalCmds==null){
totalCmds = new StringBuffer();
}
totalCmds.append(lastCommands);//line 61
}
//日志数量控制
int numberLimit = 1;
String flow_log_limit = r2mClusterClient.get(KeyUtils.KEY_PIPELINE_LIMIT);
if (StringUtils.isNotEmpty(flow_log_limit)) {
try {
numberLimit = Integer.parseInt(flow_log_limit);
} catch (Exception e) {
numberLimit = 1;
logger.error("error", e);
}
}
if (logEmitNumber >= numberLimit) {
StringBuffer _totalCmds = new StringBuffer(totalCmds);
try {
//pipeline submit
PipelinedCacheClusterClient pip = r2mClusterClient.pipelined();
String[] commandArray = _totalCmds.toString().split(KeyUtils.REDIS_CMD_SPILT);
PipelineHelper.cmd(pip, commandArray);
pip.sync();
pip.close();
totalCmds = new StringBuffer();
} catch (Exception e) {
logger.error("error", e);
}
logEmitNumber = 0;
}
} catch (Exception e) {
logger.error(new StringBuffer("====RedisBolt error for log=[ ").append(log).append("] \n commands=[").append(lastCommands).append("]").toString(), e);
_collector.reportError(e);
_collector.fail(tuple);
}
_collector.ack(tuple);
}
@Override
public void cleanup() {
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
예외 정보 :
java.lang.AbstractStringBuilder.append (AbstractStringBuilder.java:415)에서 java.lang.AbstractStringBuilder.ensureCapacityInternal (AbstractStringBuilder.java:113)에서java.lang.NullPointerException이에 java.lang.StringBuffer.append (StringBuffer.java:237) at com.jd.jr.dataeye.storm.bolt.RedisBolt.execute (RedisBolt.java:61) at org.apache.storm.daemon.executor $ fn__5044 $ org.apache.storm.disruptor에서 $ mk_task_receiver $ fn__4965.invoke (executor.clj : 459) $ clojure_handler $ reify__4480.onEvent (disruptor. clj : 40) at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor (D org.apache.storm.dutruptor의 $ consume_batch_when_available.invoke (disruptor.clj : 73)에서 org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable (DisruptorQueue.java:451) .daemon.executor $ fn__5044 $ fn__5057 $ fn__5110.invoke (executor.clj : 846) at org.apache.storm.util $ async_loop $ fn__557.invoke (util.clj : 484) at clojure.lang.AFn.run (AFn .java : 22) at java.lang.Thread.run (Thread.java:745)
누구든지 내게 그 이유를 찾기 위해 조언을 해줄 수 있습니까?
과 자바 버전에서 손실 일부 데이터가 "1.7 .0_71 " – Fanl