토폴로지는 다음과 같습니다. 폭풍우를 사용하면 여러 입력이있는 볼트가 모든 입력이 도착한 경우에만 어떻게 처리 할 수 있습니까?
어떻게 여러 입력이있는 볼트가 모든 입력이 도착할 때만 처리되도록 할 수 있습니까?
토폴로지는 다음과 같습니다. 폭풍우를 사용하면 여러 입력이있는 볼트가 모든 입력이 도착한 경우에만 어떻게 처리 할 수 있습니까?
어떻게 여러 입력이있는 볼트가 모든 입력이 도착할 때만 처리되도록 할 수 있습니까?
Bolt.execute()
은 제작자가 무엇인지에 관계없이 각 수신 튜플에 대해 호출되며 변경할 수 없습니다. 한 번에 다른 제작자의 여러 튜플을 처리하려면 사용자 정의 UDF 코드를 작성해야합니다.
LinkedList<Tuple>
) 각 들어오는 튜플input.getSourceComponent()
here (일괄 처리 참조)을 참조하십시오. 여러 입력 튜플에 대한 집계와 같이 더 복잡한 작업을 처리하는 볼트의 경우 BaseRichBolt를 확장하고 앵커 메커니즘을 직접 제어해야합니다.
private OutputCollector outputCollector;
을 그리고 그 준비 방법의 재정의 (override)를 통해 초기화 :
이@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
this.outputCollector = outputCollector;
}
BaseRichBolt에 대한 귀하의 실행 방법은 수신이 같은 자신의 출력 수집기를 선언 할 필요가이를 위해
튜플을 인수로 사용하려면 앵커를 유지하고 방출 할 때이를 사용하는 논리를 수행 할 수 있어야합니다.
private final List<Tuple> anchors = new ArrayList<Tuple>();
@Override
public void execute(Tuple tuple) {
if (!isTupleAggregationComplete(anchors, tuple)) {
anchors.add(tuple);
return;
}
// do your computations here!
collector.emit(anchors, new Values(foo,bar,xpto));
anchors.clear();
}
당신은 볼트가 처리를 진행하기 위해 모든 필요한 정보가있을 경우 확인에 필요한 논리 isTupleAggregationComplete를 구현해야합니다.