DStream을 배열, 목록 등으로 변환하고 싶습니다. 그래서 json으로 변환하여 엔드 포인트에서 사용할 수 있습니다. 나는 아파치 스파크를 사용하고, 트위터 데이터를 주입하고있다. 이 작업을 Dstream statuses
에서 어떻게 수행합니까? 나는 print()
이외의 다른 것을 얻을 수없는 것 같습니다.DStream의 각 RDD에 대해 이것을 배열이나 다른 일반적인 Java 데이터 유형으로 변환하려면 어떻게해야합니까?
import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.streaming._
import org.apache.spark.streaming.twitter._
import org.apache.spark.streaming.StreamingContext._
import TutorialHelper._
object Tutorial {
def main(args: Array[String]) {
// Location of the Spark directory
val sparkHome = "/opt/spark"
// URL of the Spark cluster
val sparkUrl = "local[8]"
// Location of the required JAR files
val jarFile = "target/scala-2.10/tutorial_2.10-0.1-SNAPSHOT.jar"
// HDFS directory for checkpointing
val checkpointDir = "/tmp"
// Configure Twitter credentials using twitter.txt
TutorialHelper.configureTwitterCredentials()
val ssc = new StreamingContext(sparkUrl, "Tutorial", Seconds(1), sparkHome, Seq(jarFile))
val filters = Array("#americasgottalent", "iamawesome")
val tweets = TwitterUtils.createStream(ssc, None, filters)
val statuses = tweets.map(status => status.getText())
val arry = Array("firstval")
statuses.foreachRDD {
arr :+ _.collect()
}
ssc.checkpoint(checkpointDir)
ssc.start()
ssc.awaitTermination()
}
}
내가 statuses.print()를 제거하고 statuses.foreachRDD를 추가하는 경우 {발 편곡 = _.collect()} 나는 오류 얻을 : 확장 기능 ((X $ 1) => X $ 1없는 매개 변수 유형을. – CodingIsAwesome
@ 코딩 극적으로 스칼라 타입 시스템이 약간 이상해 보입니다. RDD에 문자열이 맞습니까? – aaronman
@CodingIsAwesome 정상적인 스칼라 lambda'(x : RDD [String]) => arr ++ x.collect())' – aaronman