2017-05-04 8 views
0

java.io.NotSerializableException : io.netty.channel. (SerializableSerializer.java : 41) ~ [storm-core-1.0.1.2.5.0.0-1245.jar : 1.0.1.2.5.0.0-1245]에서의 DefaultChannelHandlerContext (Kryo.java:628) com.esotericsoftware.kryo.writeClassAndObject) ~ [kryo-3.0.3.jar :?] at com.esotericsoftware.kryo.serializers.MapSerializer.write (MapSerializer.java:39) ~ [kryo-3.0.3.jar :?] at com.esotericsoftware.kryo.Kryo.writeClassAndObject (Kryo.java:628) ~ [kryo-3.0.3.jar :?] at com.esotericsoftware.kryo.serializers.CollectionSerializer.write (CollectionSerializer.java:100) ~ [kryo-3.0.3.jar :?] at com. (Kryo.java:534) ~ [kryo-3.0.3.jar :?] kryo-3.0.3.jar :?] at org.apache.storm.serialization.KryoValuesSerializer.serializeInto (KryoValuesSerializer.java:44) ~ [storm-core-1.0.1.2.5.0.0-1 245.jar : 1.0.1.2.5.0.0-1245] at org.apache.storm.serialization.KryoTupleSerializer.serialize (KryoTupleSerializer.java:44) ~ [storm-core-1.0.1.2.5.0.0-1245] .jar : 1.0.1.2.5.0.0-1245] at org.apache.storm.daemon.worker $ mk_transfer_fn $ transfer_fn__6723.invoke (worker.clj : 192) ~ [storm-core-1.0.1.2.5.0. 0-1245.jar : 1.0.1.2.5.0.0-1245] at org.apache.storm.daemon.executor $ start_batch_transfer__GT_worker_handler_BANG_ $ fn__6411.invoke (executor.clj : 313) ~ [storm-core-1.0.1.2 .5.0.0-1245.jar : 1.0.1.2.5.0.0-1245] at org.apache.storm.disruptor $ clojure_handler $ reify__6005.onEvent (disruptor.clj : 40) ~ [storm-core-1.0. 1.2.5.0.0-1245.jar : 1.0.1.2.5.0.0-1245] at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor (DisruptorQueue. 자바 : 451) ~ [폭풍 코어 1.0.1.2.5.0.0-1245.jar : 1.0.1.2.5.0.0-1245] ... 6 개에 의해 발생 : java.lang.RuntimeException가 : java.io.NotSerializableException : java.lang.RuntimeException가 :에 의한 io.netty.channel.DefaultChannelHandlerContext

내가 사용하는 폭풍 로컬 모드 문제가 없지만 클러스터에서 오류로보고됩니다. 나는 폭풍 자체에 대해별로 지식을 가지고 있지만 직렬화 가능하지 않은 (그 맵에 저장된) ChannelHandlerContext을에는 직렬화하려고 것 같다

public class NettySpout extends BaseRichSpout { 

private static final long serialVersionUID = 1L; 
/** 
* colloctor for spout 
*/ 
private SpoutOutputCollector collector; 

@Override 
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) { 
    collector=spoutOutputCollector; 
    StormServer stormServer=new StormServer(); 
    stormServer.run(); 
} 

@Override 
public void nextTuple() { 
    Values tuple; 
    try { 
     while ((tuple = ServerHandler.queue.take()) != null) { 
      collector.emit(tuple); 
     } 
    } catch (Exception e) { 
    } 
} 

@Override 
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { 
    outputFieldsDeclarer.declare(new Fields("value","channl")); 
} 


public class ServerHandler extends ChannelInboundHandlerAdapter{ 

private static Logger logger = LogManager.getLogger(ServerHandler.class); 
public static LinkedBlockingQueue<Values> queue = new LinkedBlockingQueue<Values>(); 
public static Map<String,ChannelHandlerContext> ctxes; 

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 
    JSONObject message = (JSONObject) msg; 
    queue.put(new Values(new StreamData(message.toString().getBytes()), new HashMap<>(ctxes))); 

} 

답변

1

:

내 코드입니다.

+0

맞습니다. ChannelHandlerContext를 직렬화 할 수 없습니다. 어쩌면이 방법이 잘못되었습니다. 다른 방법을 시도해야합니다. 답을 알려줘서 고맙습니다. – Tdz