2016-08-18 5 views
0

일부 네트워크 데이터 세션을 기반으로 스파크 스트리밍 응용 프로그램을 구현하려고합니다. 나는 RDD를위한 완전한 상태 프로그래밍을 사용하고있다.Spark Streaming에서 mapwithState에서 상태 삭제

엄청난 수의 레코드와 키로 인해 스트리밍 논리의 일부 조건을 준수하면 mapwithsate 함수에서 일부 상태를 삭제해야합니다.

내가 왜 이런 일을하는지, 내가 아는 스펙에서 알 수 있듯이 시간이있다. 그러나 이것은 내가 찾고있는 기능이 아니라 오히려 메모리 양을 줄이기 위해 메모리에서 상태를 삭제해야한다. 스트리밍 응용 프로그램이 소모됩니다. 아래의 예를 들어

당신이 할 수있는

답변

1

.. 내가 어떤 조건 내 스트리밍 응용 프로그램이 필요로하는 메모리를 해제 그렇게 적용되는 경우 키의 상태를 삭제하는 방법을 궁금해 샘플 satefull 기능

def trackStateFunc(batchTime: Time, key: String, value: Option[Int],state:  State[Long]): Option[(String, Long)] = { 
val sum = value.getOrElse(0).toLong + state.getOption.getOrElse(0L) 
val output = (key, sum) 
state.update(sum) 
Some(output) 
} 

입니다 어떤 상태에 따라 state.remove() 함수를 호출하십시오. 약도는 https://spark.apache.org/docs/2.0.0/api/java/org/apache/spark/streaming/State.html

입니다.