2017-10-12 4 views
3

모든 간격마다 특정 검색어로 트윗을 가져옵니다. 이러한 트윗은 트윗을 계산하고 조작하는 서비스로 전달되어야합니다. 이러한 서비스는 내 게시자에 가입되어 있습니다. 따라서 publisher.hasSubscribers()는 true를 반환합니다. 그러나 submit 또는 offer 함수는 내 가입자의 onNext를 호출하지 않습니다. 따라서 "수정"으로 구독자를 차례로 돌아다녀 직접 호출합니다. 하지만 그렇게해서는 안됩니다.SubmissionPublisher on submit 구독자의 onNext를 호출하지 말 것.

이것은 내 게시자의 생성자입니다.

public TwitterStreamer(Executor executor, int maxBufferCapacity, long period, TimeUnit unit, String searchQuery){ 
    super(executor, maxBufferCapacity); 
    this.searchQuery = searchQuery; 
    scheduler = new ScheduledThreadPoolExecutor(1); 
    this.tweetGetter = scheduler.scheduleAtFixedRate(
      () -> { 
       List<String> tweets = getTweets(searchQuery); 
       /* this.lastCall = LocalDateTime.now(); 
       for(Flow.Subscriber sub : this.getSubscribers()){ 
        sub.onNext(tweets); 
       }*/ 
       this.submit(tweets); 
       if(tweets.size() >= 20) this.close(); 
      }, 0, period, unit); 
} 

내가

public static void main(String[] args) { 
    ScheduledExecutorService executor = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors()); 

    String searchQuery; 
    try{ 
     searchQuery = args[0] != null ? args[0] : "#capgemini50"; 
    }catch (ArrayIndexOutOfBoundsException ex){ 
     searchQuery = "#capgemini50"; 
    } 

    TwitterStreamer streamer = new TwitterStreamer(executor, 5, 15L, SECONDS, searchQuery); 

    MySubscriber subscriber1 = new MySubscriber("LogFileSubscriber", new LogToFileService("./tweetsLogger.txt")); 
    MySubscriber subscriber2 = new MySubscriber("TotalTweetSubscriber",new CalculateTweetStatsService()); 
    streamer.subscribe(subscriber1); 
    streamer.subscribe(subscriber2); 

} 

답변

4

당신은 가입자가 필요 게시자에 가입하는 곳은 주 내 가입자

package myFlowAPI; 

import Interfaces.IProcess; 
import Services.LogToFileService; 

import java.util.List; 
import java.util.concurrent.Flow; 
import java.util.concurrent.atomic.AtomicInteger; 

public class MySubscriber implements Flow.Subscriber<List<String>> { 
private Flow.Subscription subscription; 
private AtomicInteger count; 

private IProcess processor; 

private String name; 
private int DEMAND = 0; 

public MySubscriber(String name, IProcess processor){ 
    this.name = name; 
    this.processor = processor; 
} 

@Override 
public void onSubscribe(Flow.Subscription subscription) { 
    this.subscription = subscription; 
} 


@Override 
public void onNext(List<String> item) { 
    Object result = this.processor.process(item); 
    this.readResult(result); 

    switch (this.processor.getClass().getSimpleName()){ 
     case "CalculateTweetStatsService": 
      if((Integer) result >= 20){ 
       this.subscription.cancel(); 
      } 
      break; 
    } 
} 

@Override 
public void onError(Throwable throwable) { 
    System.out.println("Error is thrown " + throwable.getMessage()); 
} 

@Override 
public void onComplete() { 
    if(this.processor instanceof LogToFileService){ 
     ((LogToFileService) processor).closeResource(); 
    } 
    System.out.println("complete"); 
} 

private void readResult(Object result){ 
    System.out.println("Result of " + this.processor.getClass().getSimpleName() + " processor is " + result.toString()); 
} 
} 

입니다에 명시 적으로 예를 들어, 데이터를 요청 가입시 (http://download.java.net/java/jdk9/docs/api/java/util/concurrent/Flow.Subscription.html#request-long- 참조) onNext의 처리에 따라

@Override 
public void onSubscribe(Flow.Subscription subscription) { 
    this.subscription = subscription; 
    this.subscription.request(1); 
} 

동일() 다음 항목을 요청할 수 있습니다.

+2

내 영웅. 고마워요 !! @Override public void onNext (목록 항목) { 개체 결과 = this.processor.process (항목); this.readResult (result); switch (this.processor.getClass(). getSimpleName()) { case "CalculateTweetStatsService": if ((정수) result> = 20) { this.subscription.cancel(); } 휴식; } this.subscription.request (1); }' – user1008531