모든 간격마다 특정 검색어로 트윗을 가져옵니다. 이러한 트윗은 트윗을 계산하고 조작하는 서비스로 전달되어야합니다. 이러한 서비스는 내 게시자에 가입되어 있습니다. 따라서 publisher.hasSubscribers()는 true를 반환합니다. 그러나 submit 또는 offer 함수는 내 가입자의 onNext를 호출하지 않습니다. 따라서 "수정"으로 구독자를 차례로 돌아다녀 직접 호출합니다. 하지만 그렇게해서는 안됩니다.SubmissionPublisher on submit 구독자의 onNext를 호출하지 말 것.
이것은 내 게시자의 생성자입니다.
public TwitterStreamer(Executor executor, int maxBufferCapacity, long period, TimeUnit unit, String searchQuery){
super(executor, maxBufferCapacity);
this.searchQuery = searchQuery;
scheduler = new ScheduledThreadPoolExecutor(1);
this.tweetGetter = scheduler.scheduleAtFixedRate(
() -> {
List<String> tweets = getTweets(searchQuery);
/* this.lastCall = LocalDateTime.now();
for(Flow.Subscriber sub : this.getSubscribers()){
sub.onNext(tweets);
}*/
this.submit(tweets);
if(tweets.size() >= 20) this.close();
}, 0, period, unit);
}
이
내가public static void main(String[] args) {
ScheduledExecutorService executor = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors());
String searchQuery;
try{
searchQuery = args[0] != null ? args[0] : "#capgemini50";
}catch (ArrayIndexOutOfBoundsException ex){
searchQuery = "#capgemini50";
}
TwitterStreamer streamer = new TwitterStreamer(executor, 5, 15L, SECONDS, searchQuery);
MySubscriber subscriber1 = new MySubscriber("LogFileSubscriber", new LogToFileService("./tweetsLogger.txt"));
MySubscriber subscriber2 = new MySubscriber("TotalTweetSubscriber",new CalculateTweetStatsService());
streamer.subscribe(subscriber1);
streamer.subscribe(subscriber2);
}
내 영웅. 고마워요 !! @Override public void onNext (목록 항목) { 개체 결과 = this.processor.process (항목); this.readResult (result); switch (this.processor.getClass(). getSimpleName()) { case "CalculateTweetStatsService": if ((정수) result> = 20) { this.subscription.cancel(); } 휴식; } this.subscription.request (1); }' –
user1008531