1

각 창에서 값을 세고 상위 값을 찾고 각 값의 상위 10 개 값만 모든 값 대신 hdfs로 저장하려고합니다.부분 스파크 DStream 창을 HDFS로 저장

eegStreams(a) = KafkaUtils.createStream(ssc, zkQuorum, group, Map(args(a) -> 1),StorageLevel.MEMORY_AND_DISK_SER).map(_._2) 
    val counts = eegStreams(a).map(x => (math.round(x.toDouble), 1)).reduceByKeyAndWindow(_ + _, _ - _, Seconds(4), Seconds(4)) 
    val sortedCounts = counts.map(_.swap).transform(rdd => rdd.sortByKey(false)).map(_.swap) 
    ssc.sparkContext.parallelize(rdd.take(10)).saveAsTextFile("hdfs://ec2-23-21-113-136.compute-1.amazonaws.com:9000/user/hduser/output/" + (a+1))}  


    //sortedCounts.foreachRDD(rdd =>println("\nTop 10 amplitudes:\n" + rdd.take(10).mkString("\n"))) 
    sortedCounts.map(tuple => "%s,%s".format(tuple._1, tuple._2)).saveAsTextFiles("hdfs://ec2-23-21-113-136.compute-1.amazonaws.com:9000/user/hduser/output/" + (a+1)) 

위와 같이 (주석 처리 된) 상위 10 개를 인쇄 할 수 있습니다.

또한

sortedCounts.foreachRDD{ rdd => ssc.sparkContext.parallelize(rdd.take(10)).saveAsTextFile("hdfs://ec2-23-21-113-136.compute-1.amazonaws.com:9000/user/hduser/output/" + (a+1))} 

을 시도했지만 나는 다음과 같은 오류가 발생합니다. org.apache.spark.streaming.StreamingContext java.io.NotSerializableException : org.apache.spark 내 배열

15/01/05 17시 12분 23초 오류 actor.OneForOneStrategy 직렬화되지 않습니다. streaming.StreamingContext

답변

0

시도해 볼 수 있습니까?

sortedCounts.foreachRDD(rdd => rdd.filterWith(ind => ind)((v, ind) => ind <= 10).saveAsTextFile(...)) 

참고 : 나는 ... 스 니펫을 테스트하지 않았다

0

첫 번째 버전은 작동합니다. 스트리밍 컨텍스트가 처음 생성 된 @transient ssc = ...을 선언하십시오.

두 번째 버전은 b/c에서 작동하지 않습니다. StreamingContext은 폐쇄에서 직렬화 할 수 없습니다.