2016-06-22 8 views
1

나는 이전에 폭풍우를 사용 중이었고 더 많은 배치 기능이 필요했기 때문에 폭풍 속에서 배치 작업을 검색했습니다. 그리고 실시간으로 마이크로 일괄 처리를 수행하는 Trident를 발견했습니다.스톰 트라이던트를 일괄 처리 튜플에 사용하는 방법은 무엇입니까?

그러나 어떻게 든 Trident가 마이크로 일괄 처리 (흐름, 일괄 처리 크기, 일괄 처리 간격)를 처리하여 내가 필요한 것을 실제로 처리하고 있음을 알 수 없습니다.

내가하고 싶은 것은 간격으로 스파우트에 의해 방출 된 튜플을 수집/저장하고 다른 시간 간격으로 다운 스트림 구성 요소/볼트/기능에 다시 방출하는 것입니다. (예를 들어, 초당 하나 개의 튜플을 방출 주둥이, 다음 삼지창 기능은 튜플을 저장/수집하고 다음 기능에 분당 50 개 튜플을 방출합니다.)

나는이 경우에는 삼지창을 적용 할 수있는 방법을

누군가가 나를 인도 할 수 있습니까? 또는 폭풍 기능을 사용하는 다른 적용 가능한 방법은 무엇입니까?

답변

1

우수 질문! 그러나 슬프게도 이런 종류의 마이크로 일괄 처리는 Trident 상자에서 지원되지 않습니다.

하지만 자신 만의 주파수 기반 마이크로 일괄 처리를 구현해 볼 수 있습니다. 이 골격 예와 같이 뭔가 :

import java.util.ArrayList; 
import java.util.List; 
import java.util.Map; 
import java.util.concurrent.LinkedBlockingQueue; 

import org.apache.storm.task.OutputCollector; 
import org.apache.storm.task.TopologyContext; 
import org.apache.storm.topology.OutputFieldsDeclarer; 
import org.apache.storm.topology.base.BaseRichBolt; 
import org.apache.storm.tuple.Tuple; 
import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 

public class MicroBatchingBolt extends BaseRichBolt { 

    private static final long serialVersionUID = 8500984730263268589L; 
    private static final Logger LOG = LoggerFactory.getLogger(MicroBatchingBolt.class); 

    protected LinkedBlockingQueue<Tuple> queue = new LinkedBlockingQueue<Tuple>(); 

    /** The threshold after which the batch should be flushed out. */ 
    int batchSize = 100; 

    /** 
    * The batch interval in sec. Minimum time between flushes if the batch sizes 
    * are not met. This should typically be equal to 
    * topology.tick.tuple.freq.secs and half of topology.message.timeout.secs 
    */ 
    int batchIntervalInSec = 45; 

    /** The last batch process time seconds. Used for tracking purpose */ 
    long lastBatchProcessTimeSeconds = 0; 

    private OutputCollector collector; 

    @Override 
    @SuppressWarnings("rawtypes") 
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { 
     this.collector = collector; 
    } 

    @Override 
    public void execute(Tuple tuple) { 
     // Check if the tuple is of type Tick Tuple 
     if (isTickTuple(tuple)) { 
     // If so, it is indication for batch flush. But don't flush if previous 
     // flush was done very recently (either due to batch size threshold was 
     // crossed or because of another tick tuple 

     if ((System.currentTimeMillis()/1000 - lastBatchProcessTimeSeconds) >= batchIntervalInSec) { 
      LOG.debug("Current queue size is " + this.queue.size() 
       + ". But received tick tuple so executing the batch"); 

      finishBatch(); 
     } else { 
      LOG.debug("Current queue size is " + this.queue.size() 
       + ". Received tick tuple but last batch was executed " 
       + (System.currentTimeMillis()/1000 - lastBatchProcessTimeSeconds) 
       + " seconds back that is less than " + batchIntervalInSec 
       + " so ignoring the tick tuple"); 
     } 
     } else { 
     // Add the tuple to queue. But don't ack it yet. 
     this.queue.add(tuple); 
     int queueSize = this.queue.size(); 
     LOG.debug("current queue size is " + queueSize); 
     if (queueSize >= batchSize) { 
      LOG.debug("Current queue size is >= " + batchSize 
       + " executing the batch"); 

      finishBatch(); 
     } 
     } 
    } 

    private boolean isTickTuple(Tuple tuple) { 
     // Check if it is tick tuple here 
     return false; 
    } 

    /** 
    * Finish batch. 
    */ 
    public void finishBatch() { 

     LOG.debug("Finishing batch of size " + queue.size()); 
     lastBatchProcessTimeSeconds = System.currentTimeMillis()/1000; 
     List<Tuple> tuples = new ArrayList<Tuple>(); 
     queue.drainTo(tuples); 

     for (Tuple tuple : tuples) { 
     // Prepare your batch here (may it be JDBC, HBase, ElasticSearch, Solr or 
     // anything else. 
     // List<Response> responses = externalApi.get("..."); 
     } 

     try { 
     // Execute your batch here and ack or fail the tuples 
     LOG.debug("Executed the batch. Processing responses."); 
     //  for (int counter = 0; counter < responses.length; counter++) { 
     //   if (response.isFailed()) { 
     //   LOG.error("Failed to process tuple # " + counter); 
     //   this.collector.fail(tuples.get(counter)); 
     //   } else { 
     //   LOG.debug("Successfully processed tuple # " + counter); 
     //   this.collector.ack(tuples.get(counter)); 
     //   } 
     //  } 
     } catch (Exception e) { 
     LOG.error("Unable to process " + tuples.size() + " tuples", e); 
     // Fail entire batch 
     for (Tuple tuple : tuples) { 
      this.collector.fail(tuple); 
     } 
     } 
    } 

    @Override 
    public void declareOutputFields(OutputFieldsDeclarer declarer) { 
     // ... 
    } 

} 

출처 : http://hortonworks.com/blog/apache-storm-design-pattern-micro-batching/Using tick tuples with trident in storm