sparkContext.broadcast를 공유 redis 연결 풀 (JedisPool) 용 spark 스트리밍 응용 프로그램에서 사용하고있었습니다. 이 같은sparkContext broadcast JedisPool이 작동하지 않습니다.
코드 :
lazy val redisPool = {
val pool = Redis.createRedisPool(redisHost, redisPort)
ssc.sparkContext.broadcast(pool)
}
Redis.createRedisPool은 다음과 같습니다
object Redis {
def createRedisPool(host: String, port: Int, maxIdle: Int, maxTotal: Int, timeout: Int): JedisPool = {
val pc = new JedisPoolConfig
pc.setMaxIdle(maxIdle)
pc.setMaxTotal(maxTotal)
pc.setMaxWaitMillis(timeout)
new JedisPool(pc, host, port)
}
def createRedisPool(host: String, port: Int): JedisPool = {
createRedisPool(host, port, 5, 5, 5000)
}
}
그것은 지역 배포 모드에서 작동하지만, 나는
spark-submit --master "yarn-client" --class ...
같은 원사/독립 실행 형 모드에서이 프로그램을 실행할 때
에 오류가 발생합니다 :
,363,210Exception in thread "main" java.io.NotSerializableException: redis.clients.jedis.JedisPool
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1165)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:329)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:210)
at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:83)
at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:68)
at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:36)
at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:29)
at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
at org.apache.spark.SparkContext.broadcast(SparkContext.scala:809)
at org.culiu.bd.streaming.AdSysStreaming$.redisPool$lzycompute$1(AdSysStreaming.scala:84)
at org.culiu.bd.streaming.AdSysStreaming$.redisPool$1(AdSysStreaming.scala:82)
at org.culiu.bd.streaming.AdSysStreaming$.main(AdSysStreaming.scala:154)
at org.culiu.bd.streaming.AdSysStreaming.main(AdSysStreaming.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
내가 시도 내 응용 프로그램에 spark.serializer = org.apache.spark.serializer.KryoSerializer을 설정 한 다음과 같은 오류가있어 :
Exception in thread "main" com.esotericsoftware.kryo.KryoException: java.util.ConcurrentModificationException
Serialization trace:
classes (sun.misc.Launcher$AppClassLoader)
classloader (java.security.ProtectionDomain)
context (java.security.AccessControlContext)
acc (org.apache.spark.executor.ExecutorURLClassLoader)
factoryClassLoader (org.apache.commons.pool2.impl.GenericObjectPool)
internalPool (redis.clients.jedis.JedisPool)
at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585)
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318)
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549)
at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570)
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:119)
at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:210)
at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:83)
at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:68)
at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:36)
at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:29)
at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
at org.apache.spark.SparkContext.broadcast(SparkContext.scala:809)
at org.culiu.bd.streaming.AdSysStreaming$.redisPool$lzycompute$1(AdSysStreaming.scala:85)
at org.culiu.bd.streaming.AdSysStreaming$.redisPool$1(AdSysStreaming.scala:83)
at org.culiu.bd.streaming.AdSysStreaming$.main(AdSysStreaming.scala:155)
at org.culiu.bd.streaming.AdSysStreaming.main(AdSysStreaming.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.util.ConcurrentModificationException
at java.util.AbstractList$Itr.checkForComodification(AbstractList.java:372)
at java.util.AbstractList$Itr.next(AbstractList.java:343)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:74)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:18)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
... 39 more
가 어떻게이 문제를 해결할 수 있습니까?
def 추출 본문 내에 컨텍스트 객체에 대한 참조가 있습니다. –
yah, ssc는 StreamingContext 인스턴스로 정의되고, ssc.sparkContext는 SparkContext – Guy