2014-07-16 4 views
8

이것은 스칼라로 작성된 스칼라 스트리밍 프로그램입니다. 소켓에서 1 초마다 단어 수를 셉니다. 결과는 단어 수, 예를 들어 시간 0에서 1까지의 단어 수, 시간 1에서 2까지의 단어 수입니다. 그러나이 프로그램을 변경할 수있는 방법이 있는지 궁금합니다. 단어 수? 즉, 0부터 지금까지의 단어 수입니다.스파크 스트리밍 누적 된 단어 수

val sparkConf = new SparkConf().setAppName("NetworkWordCount") 
val ssc = new StreamingContext(sparkConf, Seconds(1)) 

// Create a socket stream on target ip:port and count the 
// words in input stream of \n delimited text (eg. generated by 'nc') 
// Note that no duplication in storage level only for running locally. 
// Replication necessary in distributed scenario for fault tolerance. 
val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER) 
val words = lines.flatMap(_.split(" ")) 
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) 
wordCounts.print() 
ssc.start() 
ssc.awaitTermination() 

답변

9

StateDStream을 사용할 수 있습니다. example of stateful word count from sparks examples이 있습니다. 작동

object StatefulNetworkWordCount { 
    def main(args: Array[String]) { 
    if (args.length < 2) { 
     System.err.println("Usage: StatefulNetworkWordCount <hostname> <port>") 
     System.exit(1) 
    } 

    StreamingExamples.setStreamingLogLevels() 

    val updateFunc = (values: Seq[Int], state: Option[Int]) => { 
     val currentCount = values.foldLeft(0)(_ + _) 

     val previousCount = state.getOrElse(0) 

     Some(currentCount + previousCount) 
    } 

    val sparkConf = new SparkConf().setAppName("StatefulNetworkWordCount") 
    // Create the context with a 1 second batch size 
    val ssc = new StreamingContext(sparkConf, Seconds(1)) 
    ssc.checkpoint(".") 

    // Create a NetworkInputDStream on target ip:port and count the 
    // words in input stream of \n delimited test (eg. generated by 'nc') 
    val lines = ssc.socketTextStream(args(0), args(1).toInt) 
    val words = lines.flatMap(_.split(" ")) 
    val wordDstream = words.map(x => (x, 1)) 

    // Update the cumulative count using updateStateByKey 
    // This will give a Dstream made of state (which is the cumulative count of the words) 
    val stateDstream = wordDstream.updateStateByKey[Int](updateFunc) 
    stateDstream.print() 
    ssc.start() 
    ssc.awaitTermination() 
    } 
} 

하는 방법은 다음 어큐뮬레이터와 같은 역할을하는 Option[T]를 업데이트, 각 배치에 대한 Seq[T]를 얻을 수있다. Option 인 이유는 첫 번째 일괄 처리가 None 일 것이므로 업데이트되지 않는 한 그대로 유지해야하기 때문입니다. 이 예에서 카운트는 int입니다. 많은 데이터를 처리하려는 경우에도 Long 또는 BigInt

을 가질 수도 있습니다.