2017-05-20 7 views
3

Jupyter/IPython 확장을 생성하여 Apache Spark Jobs를 모니터링하고 싶습니다.Python의 pySpark에서 SparkListener를 추가하는 방법은 무엇입니까?

스파크는 REST API를 제공합니다.

그러나 서버를 폴링하는 대신 콜백을 통해 이벤트 업데이트를 보내고 싶습니다.

SparkContext.addSparkListener()으로 SparkListener을 등록하려고합니다. 이 기능은 Python의 PySpark SparkContext 개체에서 사용할 수 없습니다. 그렇다면 파이썬에서 컨텍스트의 스칼라/자바 버전에 파이썬 리스너를 어떻게 등록 할 수 있습니까? py4j을 통해이 작업을 수행 할 수 있습니까? 리스너에서 이벤트가 발생하면 파이썬 함수를 호출해야합니다.

답변

10

약간 관련이 있지만 가능합니다. Py4j callback mechanism을 사용하여 SparkListener에서 메시지를 전달할 수 있습니다. 먼저 필요한 모든 클래스가있는 스칼라 패키지를 만듭니다.디렉토리 구조 :

. 
├── build.sbt 
└── src 
    └── main 
     └── scala 
      └── net 
       └── zero323 
        └── spark 
         └── examples 
          └── listener 
           ├── Listener.scala 
           ├── Manager.scala 
           └── TaskListener.scala 

build.sbt :

name := "listener" 

organization := "net.zero323" 

scalaVersion := "2.11.7" 

val sparkVersion = "2.1.0" 

libraryDependencies ++= List(
    "org.apache.spark" %% "spark-core" % sparkVersion, 
    "net.sf.py4j" % "py4j" % "0.10.4" // Just for the record 
) 

Listener.scala는 우리가 나중에

package net.zero323.spark.examples.listener 

/* You can add arbitrary methods here, 
* as long as these match corresponding Python interface 
*/ 
trait Listener { 
    /* This will be implemented by a Python class. 
    * You can of course use more specific types, 
    * for example here String => Unit */ 
    def notify(x: Any): Any 
} 

Manager.scala 파이썬 리스너에 메시지를 전달하는 데 사용됩니다 구현하려고하는 파이썬 인터페이스를 정의

package net.zero323.spark.examples.listener 

object Manager { 
    var listeners: Map[String, Listener] = Map() 

    def register(listener: Listener): String = { 
    this.synchronized { 
     val uuid = java.util.UUID.randomUUID().toString 
     listeners = listeners + (uuid -> listener) 
     uuid 
    } 
    } 

    def unregister(uuid: String) = { 
    this.synchronized { 
     listeners = listeners - uuid 
    } 
    } 

    def notifyAll(message: String): Unit = { 
    for { (_, listener) <- listeners } listener.notify(message) 
    } 

} 
0 마지막으로 간단한

:

package net.zero323.spark.examples.listener 

import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} 
import org.json4s._ 
import org.json4s.JsonDSL._ 
import org.json4s.jackson.JsonMethods._ 

/* A simple listener which captures SparkListenerTaskEnd, 
* extracts numbers of records written by the task 
* and converts to JSON. You can of course add handlers 
* for other events as well. 
*/ 
class PythonNotifyListener extends SparkListener { 
    override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { 
    val recordsWritten = taskEnd.taskMetrics.outputMetrics.recordsWritten 
    val message = compact(render(
     ("recordsWritten" -> recordsWritten) 
    )) 
    Manager.notifyAll(message) 
    } 
} 

가 있습니다 '패키지 우리의 확장 :

sbt package 

와 클래스 경로에 jar 생성 및 등록 리스너 추가 PySpark 세션을 시작합니다

$SPARK_HOME/bin/pyspark \ 
    --driver-class-path target/scala-2.11/listener_2.11-0.1-SNAPSHOT.jar \ 
    --conf spark.extraListeners=net.zero323.spark.examples.listener.PythonNotifyListener 

다음으로 우리는을 구현하는 Python 객체를 정의해야합니다 89,인터페이스 :

class PythonListener(object): 
    package = "net.zero323.spark.examples.listener" 

    @staticmethod 
    def get_manager(): 
     jvm = SparkContext.getOrCreate()._jvm 
     manager = getattr(jvm, "{}.{}".format(PythonListener.package, "Manager")) 
     return manager 

    def __init__(self): 
     self.uuid = None 

    def notify(self, obj): 
     """This method is required by Scala Listener interface 
     we defined above. 
     """ 
     print(obj) 

    def register(self): 
     manager = PythonListener.get_manager() 
     self.uuid = manager.register(self) 
     return self.uuid 

    def unregister(self): 
     manager = PythonListener.get_manager() 
     manager.unregister(self.uuid) 
     self.uuid = None 

    class Java: 
     implements = ["net.zero323.spark.examples.listener.Listener"] 

시작 콜백 서버 :

sc._gateway.start_callback_server() 

만들고 리스너 등록 :

listener = PythonListener() 

를 등록 :

listener.register() 

및 테스트 :

출구에
>>> sc.parallelize(range(100), 3).saveAsTextFile("/tmp/listener_test") 
{"recordsWritten":33} 
{"recordsWritten":34} 
{"recordsWritten":33} 

당신이해야 종료 콜백 서버 :

sc._gateway.shutdown_callback_server() 

:

내부 콜백 서버를 사용하여 불꽃 스트리밍, 작업 할 때이주의해서 사용해야합니다

.

편집 :

class SparkListener(object): 
    def onApplicationEnd(self, applicationEnd): 
     pass 
    def onApplicationStart(self, applicationStart): 
     pass 
    def onBlockManagerRemoved(self, blockManagerRemoved): 
     pass 
    def onBlockUpdated(self, blockUpdated): 
     pass 
    def onEnvironmentUpdate(self, environmentUpdate): 
     pass 
    def onExecutorAdded(self, executorAdded): 
     pass 
    def onExecutorMetricsUpdate(self, executorMetricsUpdate): 
     pass 
    def onExecutorRemoved(self, executorRemoved): 
     pass 
    def onJobEnd(self, jobEnd): 
     pass 
    def onJobStart(self, jobStart): 
     pass 
    def onOtherEvent(self, event): 
     pass 
    def onStageCompleted(self, stageCompleted): 
     pass 
    def onStageSubmitted(self, stageSubmitted): 
     pass 
    def onTaskEnd(self, taskEnd): 
     pass 
    def onTaskGettingResult(self, taskGettingResult): 
     pass 
    def onTaskStart(self, taskStart): 
     pass 
    def onUnpersistRDD(self, unpersistRDD): 
     pass 
    class Java: 
     implements = ["org.apache.spark.scheduler.SparkListenerInterface"] 

그것을 확장 :

class TaskEndListener(SparkListener): 
    def onTaskEnd(self, taskEnd): 
     print(taskEnd.toString()) 

를 직접 사용 :이 많은 혼전에 경우

당신은 org.apache.spark.scheduler.SparkListenerInterface을 정의 할 수

>>> sc._gateway.start_callback_server() 
True 
>>> listener = TaskEndListener() 
>>> sc._jsc.sc().addSparkListener(listener) 
>>> sc.parallelize(range(100), 3).saveAsTextFile("/tmp/listener_test_simple") 
SparkListenerTaskEnd(0,0,ResultTask,Success,[email protected],[email protected]) 
SparkListenerTaskEnd(0,0,ResultTask,Success,[email protected],[email protected]) 
SparkListenerTaskEnd(0,0,ResultTask,Success,[email protected]) 

간단하지만이 방법은 선택적이 아닙니다 (JVM과 Python 간의 트래픽 증가). Python 세션에서 Java 객체를 처리해야합니다.