2
스파크 스트리밍 1.1.0을 로컬로 사용하고 있습니다 (클러스터가 아님). 데이터 (약 10.000 개의 항목)를 구문 분석하여 스트림에 저장 한 다음 일부 변환을 수행하는 간단한 응용 프로그램을 만들었습니다.스파크 스트리밍 성능이 느립니다.
def main(args : Array[String]){
val master = "local[8]"
val conf = new SparkConf().setAppName("Tester").setMaster(master)
val sc = new StreamingContext(conf, Milliseconds(110000))
val stream = sc.receiverStream(new MyReceiver("localhost", 9999))
val parsedStream = parse(stream)
parsedStream.foreachRDD(rdd =>
println(rdd.first()+"\nRULE STARTS "+System.currentTimeMillis()))
val result1 = parsedStream
.filter(entry => entry.symbol.contains("walking")
&& entry.symbol.contains("true") && entry.symbol.contains("id0"))
.map(_.time)
val result2 = parsedStream
.filter(entry =>
entry.symbol == "disappear" && entry.symbol.contains("id0"))
.map(_.time)
val result3 = result1
.transformWith(result2, (rdd1, rdd2: RDD[Int]) => rdd1.subtract(rdd2))
result3.foreachRDD(rdd =>
println(rdd.first()+"\nRULE ENDS "+System.currentTimeMillis()))
sc.start()
sc.awaitTermination()
}
def parse(stream: DStream[String]) = {
stream.flatMap { line =>
val entries = line.split("assert").filter(entry => !entry.isEmpty)
entries.map { tuple =>
val pattern = """\s*[(](.+)[,]\s*([0-9]+)+\s*[)]\s*[)]\s*[,|\.]\s*""".r
tuple match {
case pattern(symbol, time) =>
new Data(symbol, time.toInt)
}
}
}
}
case class Data (symbol: String, time: Int)
나는 하나 개의 배치에서 모든 데이터를 수신하기 위해 110.000 밀리 세컨드의 배치 기간을 가지고 : 여기에 코드입니다. 나는 로컬에서도 스파크가 매우 빠르다고 믿었다. 이 경우 규칙 실행 ("규칙 스타트"와 "규칙 종료"사이)에는 약 3.5 초가 걸립니다. 틀린 일을하고 있습니까? 아니면 예상되는 시간입니까? 모든 조언