2013-07-13 3 views
7

Storm에서 Trident를 처음 사용했습니다. 나는 TridentState 이상으로 머리를 부러 뜨린다. 나의 이해 트라이던트가 각 배치 (즉, 배치의 모든 튜플이 데이터베이스의 트랜잭션 ID를 유지함으로써 완전히 처리되었는지 여부)에 대한 상태 (즉, 메타 데이터)를 유지하고 다음 문장이하는 것을 완전히 확신하지 못한다.폭풍의 삼지창 상태는 무엇입니까?

TridentState urlToTweeters = 
    topology.newStaticState(getUrlToTweetersState()); 

위의 코드를 정의 할 때 실제로 어떤 일이 발생하는지 설명 할 수 있습니까?

+0

이 문맥에서 "Trident"를 정의 할 수 있습니까? Trident라고하는 여러 가지가 있습니다. – Charles

+1

컨텍스트가 "Storm"입니다. https://github.com/nathanmarz/storm/wiki/Documentation#trident – Dan

답변

0

트라이던트 상태 on the storm wiki에 대한 좋은 설명서가 있습니다. 귀하의 질문에 간단한 대답은 urlToTweeters은 쿼리 할 수있는 상태 개체입니다. 나는 위의 문을 가정 trident tutorial에서이다있어, 아래의 재현 : 그에서 소요 (

이 예에서
TridentState urlToTweeters = topology.newStaticState(getUrlToTweetersState()); 
TridentState tweetersToFollowers = topology.newStaticState(getTweeterToFollowersState()); 
topology.newDRPCStream("reach") 
    .stateQuery(urlToTweeters, new Fields("args"), new MapGet(), new Fields("tweeters")).each(new Fields("tweeters"), new ExpandList(), new Fields("tweeter")) 
    /* At this point we have the tweeters for each url passed in args */ 
    .shuffle()   
    .stateQuery(tweetersToFollowers, new Fields("tweeter"), new MapGet(), new Fields("followers")) 
    .parallelismHint(200) 
    .each(new Fields("followers"), new ExpandList(), new Fields("follower")) 
    .groupBy(new Fields("follower")) 
    .aggregate(new One(), new Fields("one")) 
    .parallelismHint(20) 
    .aggregate(new Count(), new Fields("reach")); 

urlToTweeters이 트위터에 URL을 매핑을 저장하는 것, 그리고 DRPC reach 쿼리가 다음 줄에 정의 URL을 인수로 사용하면 궁극적으로 도달 범위를 얻을 수 있습니다. 그러나 (인라인 주석으로 표시된) 길에 각 URL의 트위터 스트림 즉, urlToTweeters에 대한 쿼리 결과가 표시됩니다.

+0

이 작업에 도움이 될 수 있습니까? http://stackoverflow.com/questions/35445165/total-number-of-non - 반복 - 단어 - 각 - tweet – user1

9

나는 그것이 적어도 다른 사람이 유용하게 내 대답을 찾을 수 있습니다 :)

그래서, topology.newStaticState()는 쿼리 가능한 데이터 저장의 트라이던트의 추상화, 답변을 너무 늦게 결코 바랍니다. newStaticState()에 대한 매개 변수는 메서드 계약에 따라 storm.trident.state.StateFactory의 구현이어야합니다. 공장은 storm.trident.state.State의 인스턴스를 반환하는 makeState() 메서드를 구현해야합니다. 그러나 상태를 쿼리하려는 경우 일반 storm.trident.state.State에는 실제 데이터 소스를 쿼리하는 메서드가 없으므로 storm.trident.state.map.ReadOnlyMapState의 결과를 반환해야합니다. ReadOnlyMapState을 사용하려고 시도하면 실제로 클래스 캐스트 예외가 발생합니다.).

그럼 시도해 보겠습니다.

더미 상태 구현 :

public static class ExampleStaticState implements ReadOnlyMapState<String> { 

    private final Map<String, String> dataSourceStub; 

    public ExampleStaticState() { 
     dataSourceStub = new HashMap<>(); 
     dataSourceStub.put("tuple-00", "Trident"); 
     dataSourceStub.put("tuple-01", "definitely"); 
     dataSourceStub.put("tuple-02", "lacks"); 
     dataSourceStub.put("tuple-03", "documentation"); 
    } 

    @Override 
    public List<String> multiGet(List<List<Object>> keys) { 

     System.out.println("DEBUG: MultiGet, keys is " + keys); 

     List<String> result = new ArrayList<>(); 

     for (List<Object> inputTuple : keys) { 
      result.add(dataSourceStub.get(inputTuple.get(0))); 
     } 

     return result; 
    } 

    @Override 
    public void beginCommit(Long txid) { 
     // never gets executed... 
     System.out.println("DEBUG: Begin commit, txid=" + txid); 
    } 

    @Override 
    public void commit(Long txid) { 
     // never gets executed... 
     System.out.println("DEBUG: Commit, txid=" + txid); 
    } 
} 

팩토리 :

public static class ExampleStaticStateFactory implements StateFactory { 
    @Override 
    public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) { 
     return new ExampleStaticState(); 
    } 
} 

간단한 psvm (일명 public static void main)

public static void main(String... args) { 
    TridentTopology tridentTopology = new TridentTopology(); 
    FeederBatchSpout spout = new FeederBatchSpout(Arrays.asList(new String[]{ 
      "foo" 
    })); 
    TridentState state = tridentTopology.newStaticState(new ExampleStaticStateFactory()); 
    tridentTopology 
      .newStream("spout", spout) 
      .stateQuery(state, new Fields("foo"), new MapGet(), new Fields("bar")) 
      .each(new Fields("foo", "bar"), new Debug()) 
      ; 

    Config conf = new Config(); 
    conf.setNumWorkers(6); 

    LocalCluster localCluster = new LocalCluster(); 
    localCluster.submitTopology("tridentTopology", conf, tridentTopology.build()); 

    spout.feed(Arrays.asList(new Values[]{ 
      new Values("tuple-00"), 
      new Values("tuple-01"), 
      new Values("tuple-02"), 
      new Values("tuple-03") 
    })); 

    localCluster.shutdown(); 
} 

그리고, 마지막으로, 출력 :

DEBUG: MultiGet, keys is [[tuple-00], [tuple-01], [tuple-02], [tuple-03]] 
DEBUG: [tuple-00, Trident] 
DEBUG: [tuple-01, definitely] 
DEBUG: [tuple-02, lacks] 
DEBUG: [tuple-03, documentation] 

stateQuery()는 입력 배치에서 값을 가져 와서 '데이터 저장소'에있는 값에 매핑합니다.

다이빙 조금 더 깊이, 당신은이 다음 MapGet 클래스 (그 예를 토폴로지 내에서 쿼리에 사용되는 사람)의 소스를 살펴보고 찾을 수 있습니다 그래서

public class MapGet extends BaseQueryFunction<ReadOnlyMapState, Object> { 
    @Override 
    public List<Object> batchRetrieve(ReadOnlyMapState map, List<TridentTuple> keys) { 
     return map.multiGet((List) keys); 
    }  

    @Override 
    public void execute(TridentTuple tuple, Object result, TridentCollector collector) { 
     collector.emit(new Values(result)); 
    }  
} 

를 후드 단순히 ReadOnlyMapState 구현의 multiGet() 메소드를 호출 한 다음 데이터 저장소에서 찾은 값을 이미 존재하는 튜플에 추가합니다.당신은 할 수있는 최선의 일이 아닐지라도, 좀 더 복잡한 것을 수행하는 BaseQueryFunction<ReadOnlyMapState, Object>의 구현을 직접 만들 수 있습니다.

+1

고마워 ... 너무 늦었 학습에 관해서 ... – Ezhil

+0

당신이 이것에 도움이 될 수 있습니다 http://stackoverflow.com/questions/35445165/total-number-of -non-repeated-words-in-each-tweet – user1