2016-06-10 1 views
0

TwitterUtils를 사용하여 Twitter 스트림을 읽는 스트림 응용 프로그램에서 스트림 및지도에서 foreachRDD를 사용하여 Twitter 메시지를 데이터베이스에 저장합니다. 그것들은 모두 위대한 작품.JavaDStream 및 TwitterUtils.createStream (...)을 분리/닫기하는 방법

내 질문 : 모든 것이 실행되면 트위터 스트림에서 분리하는 가장 적합한 방법은 무엇입니까? 1000 개의 메시지 만 수집하거나 컬렉션을 60 초 동안 실행하려고한다고 가정합니다. 다음

코드는 :

SparkConf sparkConf = new SparkConf().setAppName("Java spark twitter stream"); 
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new Duration(1000)); 
JavaDStream<Status> tweets = TwitterUtils.createStream(ssc, filters); 

JavaDStream<String> statuses = tweets.map(
     new Function<Status, String>() { 
      public String call(Status status) { 
       //combine the strings here. 
       GeoLocation geoLocation = status.getGeoLocation(); 
       if (geoLocation != null) { 
        String text = status.getText().replaceAll("[\r\n]", " "); 
        String line = geoLocation.getLongitude() + ",,,," 
          + geoLocation.getLatitude() + ",,,," 
          + status.getCreatedAt().getTime() 
          + ",,,," + status.getUser().getId() 
          + ",,,," + text; 
        return line; 
       } else { 
        return null; 
       } 
      } 
     } 
     ).filter(new Function<String, Boolean>() { 
      public Boolean call(String input) { 
       return input != null; 
      } 
     }); 
     statuses.print(); 


statuses.foreachRDD(new Function2<JavaRDD<String>, Time, Void>() { 
     @Override 
     public Void call(JavaRDD<String> rdd, Time time) { 
      SQLContext sqlContext 
        = JavaSQLContextSingleton 
          .getInstance(rdd.context()); 
      sqlContext.setConf("spark.sql.tungsten.enabled", "false"); 

      JavaRDD<Row> tweetRowRDD 
        = rdd.map(new TweetMapLoadFunction()); 

      DataFrame statusesDataFrame 
        = sqlContext 
          .createDataFrame(
           tweetRowRDD, 
           tweetSchema.createTweetStructType()); 
      return null; 
     } 
    }); 

    ssc.start(); 
    ssc.awaitTermination(); 

답변

0

이 직선 the documentation로부터 :

처리가 될 수있는 수동()를 사용하여 정지 streamingContext.stop.

기억해야 할 : 컨텍스트가 시작되었습니다

  • 되면, 새로운 스트리밍 계산은 설정하지하거나 추가 할 수 있습니다.
  • 컨텍스트가 중지되면 다시 시작할 수 없습니다.
  • 동시에 하나의 StreamingContext 만 JVM에서 활성화 될 수 있습니다.
  • StreamingContext의 stop()은 SparkContext도 중지합니다. StreamingContext 만 중지하려면 stopSparkContext라는 stop()의 선택적 매개 변수를 false로 설정하십시오.
  • SparkContext는 다음 StreamingContext가 만들어지기 전에 이전 StreamingContext가 중지 된 경우 (SparkContext를 중지하지 않고) 여러 StreamingContext를 만드는 데 다시 사용할 수 있습니다.