약간 관련이 있지만 가능합니다. Py4j callback mechanism을 사용하여 SparkListener
에서 메시지를 전달할 수 있습니다. 먼저 필요한 모든 클래스가있는 스칼라 패키지를 만듭니다.디렉토리 구조 :
.
├── build.sbt
└── src
└── main
└── scala
└── net
└── zero323
└── spark
└── examples
└── listener
├── Listener.scala
├── Manager.scala
└── TaskListener.scala
build.sbt
:
name := "listener"
organization := "net.zero323"
scalaVersion := "2.11.7"
val sparkVersion = "2.1.0"
libraryDependencies ++= List(
"org.apache.spark" %% "spark-core" % sparkVersion,
"net.sf.py4j" % "py4j" % "0.10.4" // Just for the record
)
Listener.scala
는 우리가 나중에
package net.zero323.spark.examples.listener
/* You can add arbitrary methods here,
* as long as these match corresponding Python interface
*/
trait Listener {
/* This will be implemented by a Python class.
* You can of course use more specific types,
* for example here String => Unit */
def notify(x: Any): Any
}
Manager.scala
파이썬 리스너에 메시지를 전달하는 데 사용됩니다 구현하려고하는 파이썬 인터페이스를 정의
package net.zero323.spark.examples.listener
object Manager {
var listeners: Map[String, Listener] = Map()
def register(listener: Listener): String = {
this.synchronized {
val uuid = java.util.UUID.randomUUID().toString
listeners = listeners + (uuid -> listener)
uuid
}
}
def unregister(uuid: String) = {
this.synchronized {
listeners = listeners - uuid
}
}
def notifyAll(message: String): Unit = {
for { (_, listener) <- listeners } listener.notify(message)
}
}
0 마지막으로 간단한
:
package net.zero323.spark.examples.listener
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
import org.json4s._
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._
/* A simple listener which captures SparkListenerTaskEnd,
* extracts numbers of records written by the task
* and converts to JSON. You can of course add handlers
* for other events as well.
*/
class PythonNotifyListener extends SparkListener {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
val recordsWritten = taskEnd.taskMetrics.outputMetrics.recordsWritten
val message = compact(render(
("recordsWritten" -> recordsWritten)
))
Manager.notifyAll(message)
}
}
가 있습니다 '패키지 우리의 확장 :
sbt package
와 클래스 경로에 jar
생성 및 등록 리스너 추가 PySpark 세션을 시작합니다
$SPARK_HOME/bin/pyspark \
--driver-class-path target/scala-2.11/listener_2.11-0.1-SNAPSHOT.jar \
--conf spark.extraListeners=net.zero323.spark.examples.listener.PythonNotifyListener
을
다음으로 우리는을 구현하는 Python 객체를 정의해야합니다 89,인터페이스 :
class PythonListener(object):
package = "net.zero323.spark.examples.listener"
@staticmethod
def get_manager():
jvm = SparkContext.getOrCreate()._jvm
manager = getattr(jvm, "{}.{}".format(PythonListener.package, "Manager"))
return manager
def __init__(self):
self.uuid = None
def notify(self, obj):
"""This method is required by Scala Listener interface
we defined above.
"""
print(obj)
def register(self):
manager = PythonListener.get_manager()
self.uuid = manager.register(self)
return self.uuid
def unregister(self):
manager = PythonListener.get_manager()
manager.unregister(self.uuid)
self.uuid = None
class Java:
implements = ["net.zero323.spark.examples.listener.Listener"]
시작 콜백 서버 :
sc._gateway.start_callback_server()
만들고 리스너 등록 :
listener = PythonListener()
를 등록 :
listener.register()
및 테스트 :
출구에
>>> sc.parallelize(range(100), 3).saveAsTextFile("/tmp/listener_test")
{"recordsWritten":33}
{"recordsWritten":34}
{"recordsWritten":33}
당신이해야 종료 콜백 서버 :
sc._gateway.shutdown_callback_server()
주 :
내부 콜백 서버를 사용하여 불꽃 스트리밍, 작업 할 때이주의해서 사용해야합니다
.
편집 :
class SparkListener(object):
def onApplicationEnd(self, applicationEnd):
pass
def onApplicationStart(self, applicationStart):
pass
def onBlockManagerRemoved(self, blockManagerRemoved):
pass
def onBlockUpdated(self, blockUpdated):
pass
def onEnvironmentUpdate(self, environmentUpdate):
pass
def onExecutorAdded(self, executorAdded):
pass
def onExecutorMetricsUpdate(self, executorMetricsUpdate):
pass
def onExecutorRemoved(self, executorRemoved):
pass
def onJobEnd(self, jobEnd):
pass
def onJobStart(self, jobStart):
pass
def onOtherEvent(self, event):
pass
def onStageCompleted(self, stageCompleted):
pass
def onStageSubmitted(self, stageSubmitted):
pass
def onTaskEnd(self, taskEnd):
pass
def onTaskGettingResult(self, taskGettingResult):
pass
def onTaskStart(self, taskStart):
pass
def onUnpersistRDD(self, unpersistRDD):
pass
class Java:
implements = ["org.apache.spark.scheduler.SparkListenerInterface"]
그것을 확장 :
class TaskEndListener(SparkListener):
def onTaskEnd(self, taskEnd):
print(taskEnd.toString())
를 직접 사용 :이 많은 혼전에 경우
당신은 org.apache.spark.scheduler.SparkListenerInterface
을 정의 할 수
>>> sc._gateway.start_callback_server()
True
>>> listener = TaskEndListener()
>>> sc._jsc.sc().addSparkListener(listener)
>>> sc.parallelize(range(100), 3).saveAsTextFile("/tmp/listener_test_simple")
SparkListenerTaskEnd(0,0,ResultTask,Success,[email protected],[email protected])
SparkListenerTaskEnd(0,0,ResultTask,Success,[email protected],[email protected])
SparkListenerTaskEnd(0,0,ResultTask,Success,[email protected])
간단하지만이 방법은 선택적이 아닙니다 (JVM과 Python 간의 트래픽 증가). Python 세션에서 Java 객체를 처리해야합니다.