2014-07-16 6 views
6

DStream을 배열, 목록 등으로 변환하고 싶습니다. 그래서 json으로 변환하여 엔드 포인트에서 사용할 수 있습니다. 나는 아파치 스파크를 사용하고, 트위터 데이터를 주입하고있다. 이 작업을 Dstream statuses에서 어떻게 수행합니까? 나는 print() 이외의 다른 것을 얻을 수없는 것 같습니다.DStream의 각 RDD에 대해 이것을 배열이나 다른 일반적인 Java 데이터 유형으로 변환하려면 어떻게해야합니까?

import org.apache.spark._ 
import org.apache.spark.SparkContext._ 
import org.apache.spark.streaming._ 
import org.apache.spark.streaming.twitter._ 
import org.apache.spark.streaming.StreamingContext._ 
import TutorialHelper._ 
object Tutorial { 
    def main(args: Array[String]) { 

    // Location of the Spark directory 
    val sparkHome = "/opt/spark" 

    // URL of the Spark cluster 
    val sparkUrl = "local[8]" 

    // Location of the required JAR files 
    val jarFile = "target/scala-2.10/tutorial_2.10-0.1-SNAPSHOT.jar" 

    // HDFS directory for checkpointing 
    val checkpointDir = "/tmp" 

    // Configure Twitter credentials using twitter.txt 
    TutorialHelper.configureTwitterCredentials() 

    val ssc = new StreamingContext(sparkUrl, "Tutorial", Seconds(1), sparkHome, Seq(jarFile)) 

    val filters = Array("#americasgottalent", "iamawesome") 
    val tweets = TwitterUtils.createStream(ssc, None, filters) 

    val statuses = tweets.map(status => status.getText()) 

    val arry = Array("firstval") 
    statuses.foreachRDD { 
     arr :+ _.collect() 
    } 

    ssc.checkpoint(checkpointDir) 

    ssc.start() 
    ssc.awaitTermination() 
    } 
} 

답변

10

RDD가 statuses 인 경우 수행 할 수 있습니다.

DStream이 거대 할 수 있기 때문에 드라이버에서 원하는 것보다 더 많은 데이터가 될 수 있습니다.

+0

내가 statuses.print()를 제거하고 statuses.foreachRDD를 추가하는 경우 {발 편곡 = _.collect()} 나는 오류 얻을 : 확장 기능 ((X $ 1) => X $ 1없는 매개 변수 유형을. – CodingIsAwesome

+0

@ 코딩 극적으로 스칼라 타입 시스템이 약간 이상해 보입니다. RDD에 문자열이 맞습니까? – aaronman

+0

@CodingIsAwesome 정상적인 스칼라 lambda'(x : RDD [String]) => arr ++ x.collect())' – aaronman

4

우리가 당신을 닫았지만, 내가 결국 찾았습니다.

statuses.foreachRDD(rdd => { 
    for(item <- rdd.collect().toArray) { 
     println(item); 
    } 
}) 
+0

이걸 믿지 말고 대답은 거의 동일하고 확실히 RDD의 내용을 드라이버에게 알려줍니다. 배열 버퍼를 잘못 사용하고 있습니다. 배열을 잘못 인쇄하고 있습니다.'println (arr)'이 예상 한대로 작동하지 않습니다. – aaronman