0
TwitterUtils를 사용하여 Twitter 스트림을 읽는 스트림 응용 프로그램에서 스트림 및지도에서 foreachRDD를 사용하여 Twitter 메시지를 데이터베이스에 저장합니다. 그것들은 모두 위대한 작품.JavaDStream 및 TwitterUtils.createStream (...)을 분리/닫기하는 방법
내 질문 : 모든 것이 실행되면 트위터 스트림에서 분리하는 가장 적합한 방법은 무엇입니까? 1000 개의 메시지 만 수집하거나 컬렉션을 60 초 동안 실행하려고한다고 가정합니다. 다음
코드는 :
SparkConf sparkConf = new SparkConf().setAppName("Java spark twitter stream");
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new Duration(1000));
JavaDStream<Status> tweets = TwitterUtils.createStream(ssc, filters);
JavaDStream<String> statuses = tweets.map(
new Function<Status, String>() {
public String call(Status status) {
//combine the strings here.
GeoLocation geoLocation = status.getGeoLocation();
if (geoLocation != null) {
String text = status.getText().replaceAll("[\r\n]", " ");
String line = geoLocation.getLongitude() + ",,,,"
+ geoLocation.getLatitude() + ",,,,"
+ status.getCreatedAt().getTime()
+ ",,,," + status.getUser().getId()
+ ",,,," + text;
return line;
} else {
return null;
}
}
}
).filter(new Function<String, Boolean>() {
public Boolean call(String input) {
return input != null;
}
});
statuses.print();
statuses.foreachRDD(new Function2<JavaRDD<String>, Time, Void>() {
@Override
public Void call(JavaRDD<String> rdd, Time time) {
SQLContext sqlContext
= JavaSQLContextSingleton
.getInstance(rdd.context());
sqlContext.setConf("spark.sql.tungsten.enabled", "false");
JavaRDD<Row> tweetRowRDD
= rdd.map(new TweetMapLoadFunction());
DataFrame statusesDataFrame
= sqlContext
.createDataFrame(
tweetRowRDD,
tweetSchema.createTweetStructType());
return null;
}
});
ssc.start();
ssc.awaitTermination();