2013-06-04 1 views
10

나는 다음을 수행 토폴로지 작성하려고 해요 :

  1. 트위터 피드 구독하는 주둥이를
  2. 집계 볼트 (키워드 기준)이 컬렉션에 트윗 (N)을 집계하여 프린터 볼트를 보냅니다.
  3. 컬렉션을 콘솔에 한꺼번에 인쇄하는 간단한 볼트입니다.

실제로 컬렉션에 대해 더 많은 처리를하고 싶습니다.

로컬로 테스트했는데 제대로 작동하는 것 같습니다. 그러나 볼트에 그룹화를 올바르게 설정했는지, 그리고 실제 폭풍우 클러스터에 배치 할 때 올바르게 작동하는지 확실하지 않습니다. 누군가가이 토폴로지를 검토하고 오류, 변경 또는 개선을 제안하는 데 도움을 주시면 감사하겠습니다.

감사합니다.

내 토폴로지는 다음과 같습니다.

builder.setSpout("spout", new TwitterFilterSpout("pittsburgh")); 
    builder.setBolt("sampleaggregate", new SampleAggregatorBolt()) 
       .shuffleGrouping("spout"); 
    builder.setBolt("printaggreator",new PrinterBolt()).shuffleGrouping("sampleaggregate"); 

집계 볼트

public class SampleAggregatorBolt implements IRichBolt { 

    protected OutputCollector collector; 
    protected Tuple currentTuple; 
    protected Logger log; 
    /** 
    * Holds the messages in the bolt till you are ready to send them out 
    */ 
    protected List<Status> statusCache; 

    @Override 
    public void prepare(Map stormConf, TopologyContext context, 
         OutputCollector collector) { 
     this.collector = collector; 

     log = Logger.getLogger(getClass().getName()); 
     statusCache = new ArrayList<Status>(); 
    } 

    @Override 
    public void execute(Tuple tuple) { 
     currentTuple = tuple; 

     Status currentStatus = null; 
     try { 
      currentStatus = (Status) tuple.getValue(0); 
     } catch (ClassCastException e) { 
     } 
     if (currentStatus != null) { 

      //add it to the status cache 
      statusCache.add(currentStatus); 
      collector.ack(tuple); 


      //check the size of the status cache and pass it to the next stage if you have enough messages to emit 
      if (statusCache.size() > 10) { 
       collector.emit(new Values(statusCache)); 
      } 

     } 
    } 

    @Override 
    public void cleanup() { 


    } 

    @Override 
    public void declareOutputFields(OutputFieldsDeclarer declarer) { 
     declarer.declare(new Fields("tweets")); 

    } 

    @Override 
    public Map<String, Object> getComponentConfiguration() { 
     return null; //To change body of implemented methods use File | Settings | File Templates. 
    } 


    protected void setupNonSerializableAttributes() { 

    } 

} 

내가 좋아 보인다 볼 수있는에서 프린터 볼트

public class PrinterBolt extends BaseBasicBolt { 

    @Override 
    public void execute(Tuple tuple, BasicOutputCollector collector) { 
     System.out.println(tuple.size() + " " + tuple); 
    } 

    @Override 
    public void declareOutputFields(OutputFieldsDeclarer ofd) { 
    } 

} 

답변

4

. 악마는 세부 사항에 있습니다. 애그리 게이터 볼트가 무슨 일을하는지 잘 모르겠지만 전달 된 값에 대한 가정을한다면 적절한 필드 그룹을 고려해야합니다. 당신은 하나의 기본 병렬 힌트를 사용하고 이것은 차이의 큰 확인하지 않을 수 있습니다,하지만 당신은 당신이 아닌 셔플 그룹에 대한 호출 할 수 있도록 여러 집계 볼트 인스턴스 암시 적 논리 가정으로 확장하기로 결정한다.

+0

는 I 위 게이터 볼트 (실행 메소드 참조)에 대한 코드를 작성했다. 지금은 N 개 (위의 예제에서 10 개)의 메시지가 누적 될 때까지 대기하고 있으며 10 개의 메시지가있는 즉시 메시지를 나눕니다. BTW 나는 방금 해결할 버그를 발견했습니다. 값을 내면 캐시를 지워야합니다. 따라서 하나 이상의 애그리 게이터를 사용해야하는 경우 어떤 변경이 필요합니다. –

0

안녕하세요. 여러 키워드를 구독하려고하시는 즉시 문제가 발생할 것입니다. 나는 당신의 스파우트가 필터링에 사용 된 원래의 키워드를 방출한다고 제안합니다.

은 그런 대신 shuffleGrouping을하는 나는 fieldsGrouping

builder.setBolt("sampleaggregate", new SampleAggregatorBolt()) 
      .shuffleGrouping("spout", new Fields("keyword")); 

당신이 하나의 키워드의 결과가 동일한 볼트에있는 모든 시간을 최종 확인이 방법을 수행 할 것입니다. 따라서 집계를 올바르게 계산할 수 있습니다. fieldsGrouping을 생략하면 Storm은 집계 볼트의 양을 인스턴스화하고 스파우트의 메시지를 집계 볼트의 인스턴스로 보낼 수 있습니다. 집계 볼트의 인스턴스는 결국 잘못된 결과를 초래합니다.