나는 Spark 2.0.2, Kafka 0.10.1 및 spark-streaming-kafka-0-8 통합을 사용합니다. 다음을 원합니다.스트리밍 작업에서 윈도우 기능의 성능이 좋지 않음
NetFlow 연결에서 스트리밍 작업의 기능을 추출하고 k- 평균 모델에 레코드를 적용합니다. 일부 기능은 레코드에서 직접 계산되는 간단한 기능입니다. 그러나 나는 또한 전에 지정된 시간 창에서 레코드에 의존하는 더 복잡한 기능을 가지고 있습니다. 마지막 순간에 얼마나 많은 연결이 현재 호스트 또는 서비스와 동일한 호스트 또는 서비스에 있는지 계산합니다. 나는 이것을 위해 SQL 윈도우 함수를 사용하기로 결정했다.
val hostCountWindow = Window.partitionBy("plainrecord.ip_dst").orderBy(desc("timestamp")).rangeBetween(-1L, 0L)
val serviceCountWindow = Window.partitionBy("service").orderBy(desc("timestamp")).rangeBetween(-1L, 0L)
그리고이 모든 배치에 특징 추출하기 위해 호출되는 함수 :
stream.map(...).map(...).foreachRDD { rdd =>
val dataframe = rdd.toDF(featureHeaders: _*).transform(extractTrafficFeatures(_))
...
}
다음과 같이이 기능을
def extractTrafficFeatures(dataset: Dataset[Row]) = {
dataset
.withColumn("host_count", count(dataset("plainrecord.ip_dst")).over(hostCountWindow))
.withColumn("srv_count", count(dataset("service")).over(serviceCountWindow))
}
를 사용
그래서 나는 창 사양을 구축
문제는 이것이 매우 나쁜 성능이라는 것입니다. 1 초당 100 레코드 미만의 평균 입력 속도의 경우 일괄 처리에는 1 ~ 3 초가 필요합니다. 파티션 분할에서 오는 것 같아요.
RDD API와 countByValueAndWindow()
을 사용하려고했습니다. 이것은 훨씬 빨라 보이지만, 코드는 DataFrame API로 더 멋지고 깨끗해 보입니다.
스트리밍 데이터에서 이러한 기능을 계산하는 더 좋은 방법이 있습니까? 아니면 여기서 뭔가 잘못하고있는 걸까요?