나는 다음을 수행 토폴로지 작성하려고 해요 :는
- 트위터 피드 구독하는 주둥이를
- 집계 볼트 (키워드 기준)이 컬렉션에 트윗 (N)을 집계하여 프린터 볼트를 보냅니다.
- 컬렉션을 콘솔에 한꺼번에 인쇄하는 간단한 볼트입니다.
실제로 컬렉션에 대해 더 많은 처리를하고 싶습니다.
로컬로 테스트했는데 제대로 작동하는 것 같습니다. 그러나 볼트에 그룹화를 올바르게 설정했는지, 그리고 실제 폭풍우 클러스터에 배치 할 때 올바르게 작동하는지 확실하지 않습니다. 누군가가이 토폴로지를 검토하고 오류, 변경 또는 개선을 제안하는 데 도움을 주시면 감사하겠습니다.
감사합니다.
내 토폴로지는 다음과 같습니다.
builder.setSpout("spout", new TwitterFilterSpout("pittsburgh"));
builder.setBolt("sampleaggregate", new SampleAggregatorBolt())
.shuffleGrouping("spout");
builder.setBolt("printaggreator",new PrinterBolt()).shuffleGrouping("sampleaggregate");
집계 볼트
public class SampleAggregatorBolt implements IRichBolt {
protected OutputCollector collector;
protected Tuple currentTuple;
protected Logger log;
/**
* Holds the messages in the bolt till you are ready to send them out
*/
protected List<Status> statusCache;
@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.collector = collector;
log = Logger.getLogger(getClass().getName());
statusCache = new ArrayList<Status>();
}
@Override
public void execute(Tuple tuple) {
currentTuple = tuple;
Status currentStatus = null;
try {
currentStatus = (Status) tuple.getValue(0);
} catch (ClassCastException e) {
}
if (currentStatus != null) {
//add it to the status cache
statusCache.add(currentStatus);
collector.ack(tuple);
//check the size of the status cache and pass it to the next stage if you have enough messages to emit
if (statusCache.size() > 10) {
collector.emit(new Values(statusCache));
}
}
}
@Override
public void cleanup() {
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("tweets"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null; //To change body of implemented methods use File | Settings | File Templates.
}
protected void setupNonSerializableAttributes() {
}
}
내가 좋아 보인다 볼 수있는에서 프린터 볼트
public class PrinterBolt extends BaseBasicBolt {
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
System.out.println(tuple.size() + " " + tuple);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer ofd) {
}
}
는 I 위 게이터 볼트 (실행 메소드 참조)에 대한 코드를 작성했다. 지금은 N 개 (위의 예제에서 10 개)의 메시지가 누적 될 때까지 대기하고 있으며 10 개의 메시지가있는 즉시 메시지를 나눕니다. BTW 나는 방금 해결할 버그를 발견했습니다. 값을 내면 캐시를 지워야합니다. 따라서 하나 이상의 애그리 게이터를 사용해야하는 경우 어떤 변경이 필요합니다. –