2016-08-02 3 views

답변

0

Bolt.execute()은 제작자가 무엇인지에 관계없이 각 수신 튜플에 대해 호출되며 변경할 수 없습니다. 한 번에 다른 제작자의 여러 튜플을 처리하려면 사용자 정의 UDF 코드를 작성해야합니다.

  1. 당신은 (볼트 회원으로 어쩌면 LinkedList<Tuple>) 각 들어오는 튜플
  2. , 당신은 해당 버퍼에 튜플을 추가 들어오는 튜플 버퍼 수, 각 생산자에 대한 입력 버퍼가 필요합니다 (당신은 생산자에 액세스 할 수 있습니다 input.getSourceComponent()
  3. 튜플을 버퍼에 추가 한 후에는 각 버퍼에 적어도 하나의 튜플이 포함되어 있는지 확인합니다. 예라면 각 버퍼에서 하나의 튜플을 가져 와서 처리합니다 (처리 후 확인). 적어도 한 번 버퍼가 비워 질 때까지 버퍼를 다시 지정하십시오.) - 아니요, 그냥 반환하고 아무것도 처리하지 마십시오.
0

here (일괄 처리 참조)을 참조하십시오. 여러 입력 튜플에 대한 집계와 같이 더 복잡한 작업을 처리하는 볼트의 경우 BaseRichBolt를 확장하고 앵커 메커니즘을 직접 제어해야합니다.

private OutputCollector outputCollector; 

을 그리고 그 준비 방법의 재정의 (override)를 통해 초기화 :

@Override 
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { 
    this.outputCollector = outputCollector; 
} 

BaseRichBolt에 대한 귀하의 실행 방법은 수신이 같은 자신의 출력 수집기를 선언 할 필요가이를 위해

튜플을 인수로 사용하려면 앵커를 유지하고 방출 할 때이를 사용하는 논리를 수행 할 수 있어야합니다.

private final List<Tuple> anchors = new ArrayList<Tuple>(); 

@Override 
public void execute(Tuple tuple) { 
    if (!isTupleAggregationComplete(anchors, tuple)) { 
     anchors.add(tuple); 
     return; 
    } 

    // do your computations here! 

    collector.emit(anchors, new Values(foo,bar,xpto)); 

    anchors.clear(); 
} 

당신은 볼트가 처리를 진행하기 위해 모든 필요한 정보가있을 경우 확인에 필요한 논리 isTupleAggregationComplete를 구현해야합니다.