2014-12-03 6 views
3

sparkContext.broadcast를 공유 redis 연결 풀 (JedisPool) 용 spark 스트리밍 응용 프로그램에서 사용하고있었습니다. 이 같은sparkContext broadcast JedisPool이 작동하지 않습니다.

코드 :

lazy val redisPool = { 
    val pool = Redis.createRedisPool(redisHost, redisPort) 
    ssc.sparkContext.broadcast(pool) 
} 

Redis.createRedisPool은 다음과 같습니다

object Redis { 

    def createRedisPool(host: String, port: Int, maxIdle: Int, maxTotal: Int, timeout: Int): JedisPool = { 
    val pc = new JedisPoolConfig 
    pc.setMaxIdle(maxIdle) 
    pc.setMaxTotal(maxTotal) 
    pc.setMaxWaitMillis(timeout) 
    new JedisPool(pc, host, port) 
    } 

    def createRedisPool(host: String, port: Int): JedisPool = { 
    createRedisPool(host, port, 5, 5, 5000) 
    } 
} 

그것은 지역 배포 모드에서 작동하지만, 나는

spark-submit --master "yarn-client" --class ... 
같은 원사/독립 실행 형 모드에서이 프로그램을 실행할 때

에 오류가 발생합니다 :

,363,210
Exception in thread "main" java.io.NotSerializableException: redis.clients.jedis.JedisPool 
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1165) 
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:329) 
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) 
    at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:210) 
    at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:83) 
    at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:68) 
    at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:36) 
    at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:29) 
    at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62) 
    at org.apache.spark.SparkContext.broadcast(SparkContext.scala:809) 
    at org.culiu.bd.streaming.AdSysStreaming$.redisPool$lzycompute$1(AdSysStreaming.scala:84) 
    at org.culiu.bd.streaming.AdSysStreaming$.redisPool$1(AdSysStreaming.scala:82) 
    at org.culiu.bd.streaming.AdSysStreaming$.main(AdSysStreaming.scala:154) 
    at org.culiu.bd.streaming.AdSysStreaming.main(AdSysStreaming.scala) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) 
    at java.lang.reflect.Method.invoke(Method.java:597) 
    at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328) 
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) 
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 

내가 시도 내 응용 프로그램에 spark.serializer = org.apache.spark.serializer.KryoSerializer을 설정 한 다음과 같은 오류가있어 :

Exception in thread "main" com.esotericsoftware.kryo.KryoException:  java.util.ConcurrentModificationException 
Serialization trace: 
classes (sun.misc.Launcher$AppClassLoader) 
classloader (java.security.ProtectionDomain) 
context (java.security.AccessControlContext) 
acc (org.apache.spark.executor.ExecutorURLClassLoader) 
factoryClassLoader (org.apache.commons.pool2.impl.GenericObjectPool) 
internalPool (redis.clients.jedis.JedisPool) 
    at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585) 
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) 
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) 
    at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) 
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) 
    at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) 
    at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318) 
    at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293) 
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) 
    at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) 
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) 
    at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549) 
    at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570) 
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) 
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) 
    at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) 
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) 
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) 
    at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) 
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) 
    at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) 
    at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:119) 
    at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:210) 
    at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:83) 
    at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:68) 
    at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:36) 
    at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:29) 
    at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62) 
    at org.apache.spark.SparkContext.broadcast(SparkContext.scala:809) 
    at org.culiu.bd.streaming.AdSysStreaming$.redisPool$lzycompute$1(AdSysStreaming.scala:85) 
    at org.culiu.bd.streaming.AdSysStreaming$.redisPool$1(AdSysStreaming.scala:83) 
    at org.culiu.bd.streaming.AdSysStreaming$.main(AdSysStreaming.scala:155) 
    at org.culiu.bd.streaming.AdSysStreaming.main(AdSysStreaming.scala) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) 
    at java.lang.reflect.Method.invoke(Method.java:597) 
    at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328) 
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) 
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 
Caused by: java.util.ConcurrentModificationException 
    at java.util.AbstractList$Itr.checkForComodification(AbstractList.java:372) 
    at java.util.AbstractList$Itr.next(AbstractList.java:343) 
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:74) 
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:18) 
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) 
    at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) 
    ... 39 more 

가 어떻게이 문제를 해결할 수 있습니까?

+0

def 추출 본문 내에 컨텍스트 객체에 대한 참조가 있습니다. –

+0

yah, ssc는 StreamingContext 인스턴스로 정의되고, ssc.sparkContext는 SparkContext – Guy

답변

3

여기서 문제가되는 것은 redis.clients.jedis.JedisPool 클래스가 직렬화되지 않는다는 것입니다. 이 클래스를 직렬화하려는 모든 시도가 실패 할 것이라고 생각하기 때문에 Spark 관련 문제는 아닌 것 같습니다.

+0

에 대한 참조입니까? –