2017-10-17 7 views
0

온라인 시스템에서 폭풍우 Bolt는 NullPointerException을 얻습니다. 가끔씩 NullPointerException을 얻습니다.AbstractStringBuilder.ensureCapacityInternal은 폭풍우에서 NullPointerException을 얻습니다.

import ***.KeyUtils; 
import ***.redis.PipelineHelper; 
import ***.redis.PipelinedCacheClusterClient; 
import **.redis.R2mClusterClient; 
import org.apache.commons.lang3.StringUtils; 
import org.apache.storm.task.OutputCollector; 
import org.apache.storm.task.TopologyContext; 
import org.apache.storm.topology.IRichBolt; 
import org.apache.storm.topology.OutputFieldsDeclarer; 
import org.apache.storm.tuple.Tuple; 
import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 
import org.springframework.context.ApplicationContext; 
import org.springframework.context.support.ClassPathXmlApplicationContext; 

import java.util.Map; 

/** 
* RedisBolt batch operate 
*/ 
public class RedisBolt implements IRichBolt { 
    static final long serialVersionUID = 737015318988609460L; 
    private static ApplicationContext applicationContext; 
    private static long logEmitNumber = 0; 
    private static StringBuffer totalCmds = new StringBuffer(); 
    private Logger logger = LoggerFactory.getLogger(getClass()); 
    private OutputCollector _collector; 
    private R2mClusterClient r2mClusterClient; 

    @Override 
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { 
     _collector = outputCollector; 
     if (applicationContext == null) { 
      applicationContext = new ClassPathXmlApplicationContext("spring/spring-config-redisbolt.xml"); 
     } 
     if (r2mClusterClient == null) { 
      r2mClusterClient = (R2mClusterClient) applicationContext.getBean("r2mClusterClient"); 
     } 


    } 

    @Override 
    public void execute(Tuple tuple) { 
     String log = tuple.getString(0); 
     String lastCommands = tuple.getString(1); 

     try { 
      //log count 
      if (StringUtils.isNotEmpty(log)) { 
       logEmitNumber++; 
      } 

      if (StringUtils.isNotEmpty(lastCommands)) { 
       if(totalCmds==null){ 
        totalCmds = new StringBuffer(); 
       } 
       totalCmds.append(lastCommands);//line 61 
      } 

      //日志数量控制 
      int numberLimit = 1; 
      String flow_log_limit = r2mClusterClient.get(KeyUtils.KEY_PIPELINE_LIMIT); 
      if (StringUtils.isNotEmpty(flow_log_limit)) { 
       try { 
        numberLimit = Integer.parseInt(flow_log_limit); 
       } catch (Exception e) { 
        numberLimit = 1; 
        logger.error("error", e); 
       } 
      } 

      if (logEmitNumber >= numberLimit) { 
       StringBuffer _totalCmds = new StringBuffer(totalCmds); 
       try { 
        //pipeline submit 
        PipelinedCacheClusterClient pip = r2mClusterClient.pipelined(); 
        String[] commandArray = _totalCmds.toString().split(KeyUtils.REDIS_CMD_SPILT); 
        PipelineHelper.cmd(pip, commandArray); 
        pip.sync(); 
        pip.close(); 
        totalCmds = new StringBuffer(); 
       } catch (Exception e) { 
        logger.error("error", e); 
       } 

       logEmitNumber = 0; 
      } 
     } catch (Exception e) { 
      logger.error(new StringBuffer("====RedisBolt error for log=[ ").append(log).append("] \n commands=[").append(lastCommands).append("]").toString(), e); 
      _collector.reportError(e); 
      _collector.fail(tuple); 
     } 

     _collector.ack(tuple); 
    } 

    @Override 
    public void cleanup() { 

    } 

    @Override 
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { 
    } 

    @Override 
    public Map<String, Object> getComponentConfiguration() { 
     return null; 
    } 

} 

예외 정보 :

java.lang.AbstractStringBuilder.append (AbstractStringBuilder.java:415)에서 java.lang.AbstractStringBuilder.ensureCapacityInternal (AbstractStringBuilder.java:113)에서

java.lang.NullPointerException이에 java.lang.StringBuffer.append (StringBuffer.java:237) at com.jd.jr.dataeye.storm.bolt.RedisBolt.execute (RedisBolt.java:61) at org.apache.storm.daemon.executor $ fn__5044 $ org.apache.storm.disruptor에서 $ mk_task_receiver $ fn__4965.invoke (executor.clj : 459) $ clojure_handler $ reify__4480.onEvent (disruptor. clj : 40) at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor (D org.apache.storm.dutruptor의 $ consume_batch_when_available.invoke (disruptor.clj : 73)에서 org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable (DisruptorQueue.java:451) .daemon.executor $ fn__5044 $ fn__5057 $ fn__5110.invoke (executor.clj : 846) at org.apache.storm.util $ async_loop $ fn__557.invoke (util.clj : 484) at clojure.lang.AFn.run (AFn .java : 22) at java.lang.Thread.run (Thread.java:745)

누구든지 내게 그 이유를 찾기 위해 조언을 해줄 수 있습니까?

+0

과 자바 버전에서 손실 일부 데이터가 "1.7 .0_71 " – Fanl

답변

0

그건 정말 이상한 일입니다. 두 클래스의 코드를 읽으십시오.

https://github.com/openjdk-mirror/jdk7u-jdk/blob/master/src/share/classes/java/lang/AbstractStringBuilder.java

https://github.com/openjdk-mirror/jdk7u-jdk/blob/master/src/share/classes/java/lang/StringBuffer.java

AbstractStringBuilder은 NPE 인 '값'필드를 액세스하게하는 필드 '값'을 할당하지 않음 인수로 생성자를 갖는다. StringBuffer의 constructor은, 그 constructor을 사용합니다. 어쩌면 이상한 일이 serialization/deserialization에서 발생하고 불행히도 AbstractStringBuilder의 'value'필드가 null이 될 수 있습니다.

아마도 prepare()의 totalCmds를 초기화하는 것이 좋을뿐만 아니라 볼트 간 동기화 (스레드 안전성)를 고려해야합니다. prepare()는 bolt 인스턴스별로 호출 할 수 있으므로 필드는 스레드로부터 안전하지만 클래스 필드는 스레드로부터 안전하지 않습니다.

0

아마 내가 문제를 발견했다고 생각합니다. "StringBuffer를 _totalCmds = 새의 StringBuffer (totalCmds);"

핵심은

이다

및 "totalCmds.append (lastCommands) // 라인 61"

새로운 객체, 그것은 Serval의 단계 얻어

:

(2)

초기화

(1) 기준 메모리를 할당 및 반환을

뒤에 (1) 및 before (2)를 추가하면 StringBuffer.java가 AbstractStringBuilder를 확장합니다.자바

/** 
* The value is used for character storage. 
*/ 
char[] value; 

값은 초기화되지 않고, 그래서이 얻을 것이다 널 (null) :

@Override 
public synchronized void ensureCapacity(int minimumCapacity) { 
    if (minimumCapacity > value.length) { 
     expandCapacity(minimumCapacity); 
    } 
} 

이 얼룩은 또 다른 질문, 어쩌면 다중 스레드 환경