2017-04-24 15 views
1

나는 Kryo 직렬화와 함께 Spark 2.0.2를 사용하고 있습니다.스파크 스트리밍 : Google Pubsub의 맞춤 수신기 kryo 등록

내가 스파크 스트리밍으로 구글 PubSub에서 메시지를 섭취에 대한 사용자 지정 수신기를 구현하려고 해요 :, 내가 일을 직렬화해야한다는이 수신기를 활용하는 스파크 작업을 실행하는 경우에는

class PubSubReceiver(project: String, topic: String, subscription: String) 
    extends Receiver[Array[Byte]](StorageLevel.MEMORY_AND_DISK_2) with Logging { 
    val projectFullName = ProjectName.create(project) 
    val topicName = TopicName.create(project, topic) 
    val subscriptionName = SubscriptionName.create(project, subscription) 
    val subscriber = Subscriber.defaultBuilder(subscriptionName, new receiver).build 

    def onStart() { 
    new Thread() { 
     override def run() { 
     subscriber.startAsync() 
     //ensure subscriber is running as well as spark receiver 
     while (subscriber.isRunning && !isStopped()) { 
      logger.info(s"${subscriber.getSubscriptionName} receiver running") 
      //sleep 10s 
      Thread.sleep(10000) 
     } 
     logger.info(s"${subscriber.getSubscriptionName} receiver stopping") 
     } 
    }.start() 
    } 

    def onStop(): Unit = { 
    // There is nothing much to do as the thread calling receive() 
    // is designed to stop by itself if isStopped() returns false 
    } 

    private class receiver extends MessageReceiver { 
    override def receiveMessage(message: PubsubMessage, consumer: AckReplyConsumer): Unit = { 
     store(ArrayBuffer(message.getData.toByteArray), message.getAttributesMap) 
    } 
    } 

} 

보인다 그 자체가 올바른 것은 아닙니다 (스파크 컨텍스트가 직렬화 될 것입니다).

object PubSubStreamingIngestionJob extends App { 
    //... setup 

    lazy val ssc = new StreamingContext(spark.sparkContext, batchInterval) 

    lazy val pubsubUnionStream =the stream 
    ssc.receiverStream(new PubSubReceiver(projectName, topicName, subscriptionName)) 

    pubsubUnionStream.map(messageBytes => ...business logic...) 

    ssc.start() 
    ssc.awaitTermination() 

} 

다음과 같은 오류가 발생합니다 :

java.io.IOException: com.esotericsoftware.kryo.KryoException: java.lang.IllegalArgumentException: Class is not registered: com.c2fo.atlas.jobs.streaming.gcp.PubSubStreamingIngestionJob 
Note: To register this class use: kryo.register(com.mycompany.package.PubSubStreamingIngestionJob.class); 
Serialization trace: 
classes (sun.misc.Launcher$AppClassLoader) 
contextClassLoader (java.lang.Thread) 
threads (java.lang.ThreadGroup) 
parent (java.lang.ThreadGroup) 
group (java.util.concurrent.Executors$DefaultThreadFactory) 
val$backingThreadFactory (com.google.common.util.concurrent.ThreadFactoryBuilder$1) 
threadFactory (java.util.concurrent.ScheduledThreadPoolExecutor) 
e (java.util.concurrent.Executors$DelegatedScheduledExecutorService) 
executor (com.google.cloud.pubsub.spi.v1.Subscriber) 
subscriber (com.mycompany.package.PubSubReceiver) 
array (scala.collection.mutable.WrappedArray$ofRef) 

이 구현하는 더 나은 방법이 있나요?

답변

0

전체 클로저 직렬화를 방지하기 위해 Subscriber 인스턴스가 스레드 로컬이어야하는 문제가있었습니다.

package org.apache.spark.streaming.gcp 

import com.c2fo.atlas.util.LazyLogging 
import com.google.cloud.pubsub.spi.v1._ 
import com.google.iam.v1.ProjectName 
import com.google.pubsub.v1._ 
import org.apache.spark.storage.StorageLevel 
import org.apache.spark.streaming.receiver.Receiver 

import scala.collection.mutable.ArrayBuffer 

class PubSubReceiver(project: String, topic: String, subscription: String) 
    extends Receiver[PubsubMessage](StorageLevel.MEMORY_AND_DISK_2) with LazyLogging{ 
    val projectFullName = ProjectName.create(project) 
    val topicName = TopicName.create(project, topic) 
    val subscriptionName = SubscriptionName.create(project, subscription) 
    def onStart() { 
    new Thread() { 
     **//crucial change below**  
     val subscriber = Subscriber.defaultBuilder(subscriptionName, new receiver).build 
     override def run() { 
     subscriber.startAsync() 
     //ensure subscriber is running as well as spark receiver 
     while (subscriber.isRunning && !isStopped()) { 
      logger.info(s"${subscriber.getSubscriptionName} receiver running") 
      //sleep 10s 
      Thread.sleep(10000) 
     } 
     logger.info(s"${subscriber.getSubscriptionName} receiver stopping") 
     } 
    }.start() 
    } 

    def onStop(): Unit = { 
    // There is nothing much to do as the thread calling receive() 
    // is designed to stop by itself if isStopped() returns false 
    } 

    class receiver extends MessageReceiver { 
    override def receiveMessage(message: PubsubMessage, consumer: AckReplyConsumer): Unit = { 
     store(ArrayBuffer(message), message.getAttributesMap) 
    } 
    } 
}