나는 그것이 적어도 다른 사람이 유용하게 내 대답을 찾을 수 있습니다 :)
그래서, topology.newStaticState()
는 쿼리 가능한 데이터 저장의 트라이던트의 추상화, 답변을 너무 늦게 결코 바랍니다. newStaticState()
에 대한 매개 변수는 메서드 계약에 따라 storm.trident.state.StateFactory
의 구현이어야합니다. 공장은 storm.trident.state.State
의 인스턴스를 반환하는 makeState()
메서드를 구현해야합니다. 그러나 상태를 쿼리하려는 경우 일반 storm.trident.state.State
에는 실제 데이터 소스를 쿼리하는 메서드가 없으므로 storm.trident.state.map.ReadOnlyMapState
의 결과를 반환해야합니다. ReadOnlyMapState
을 사용하려고 시도하면 실제로 클래스 캐스트 예외가 발생합니다.).
그럼 시도해 보겠습니다.
더미 상태 구현 :
public static class ExampleStaticState implements ReadOnlyMapState<String> {
private final Map<String, String> dataSourceStub;
public ExampleStaticState() {
dataSourceStub = new HashMap<>();
dataSourceStub.put("tuple-00", "Trident");
dataSourceStub.put("tuple-01", "definitely");
dataSourceStub.put("tuple-02", "lacks");
dataSourceStub.put("tuple-03", "documentation");
}
@Override
public List<String> multiGet(List<List<Object>> keys) {
System.out.println("DEBUG: MultiGet, keys is " + keys);
List<String> result = new ArrayList<>();
for (List<Object> inputTuple : keys) {
result.add(dataSourceStub.get(inputTuple.get(0)));
}
return result;
}
@Override
public void beginCommit(Long txid) {
// never gets executed...
System.out.println("DEBUG: Begin commit, txid=" + txid);
}
@Override
public void commit(Long txid) {
// never gets executed...
System.out.println("DEBUG: Commit, txid=" + txid);
}
}
팩토리 :
public static class ExampleStaticStateFactory implements StateFactory {
@Override
public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
return new ExampleStaticState();
}
}
간단한 psvm
(일명 public static void main
)
public static void main(String... args) {
TridentTopology tridentTopology = new TridentTopology();
FeederBatchSpout spout = new FeederBatchSpout(Arrays.asList(new String[]{
"foo"
}));
TridentState state = tridentTopology.newStaticState(new ExampleStaticStateFactory());
tridentTopology
.newStream("spout", spout)
.stateQuery(state, new Fields("foo"), new MapGet(), new Fields("bar"))
.each(new Fields("foo", "bar"), new Debug())
;
Config conf = new Config();
conf.setNumWorkers(6);
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("tridentTopology", conf, tridentTopology.build());
spout.feed(Arrays.asList(new Values[]{
new Values("tuple-00"),
new Values("tuple-01"),
new Values("tuple-02"),
new Values("tuple-03")
}));
localCluster.shutdown();
}
그리고, 마지막으로, 출력 :
DEBUG: MultiGet, keys is [[tuple-00], [tuple-01], [tuple-02], [tuple-03]]
DEBUG: [tuple-00, Trident]
DEBUG: [tuple-01, definitely]
DEBUG: [tuple-02, lacks]
DEBUG: [tuple-03, documentation]
stateQuery()는 입력 배치에서 값을 가져 와서 '데이터 저장소'에있는 값에 매핑합니다.
다이빙 조금 더 깊이, 당신은이 다음 MapGet
클래스 (그 예를 토폴로지 내에서 쿼리에 사용되는 사람)의 소스를 살펴보고 찾을 수 있습니다 그래서
public class MapGet extends BaseQueryFunction<ReadOnlyMapState, Object> {
@Override
public List<Object> batchRetrieve(ReadOnlyMapState map, List<TridentTuple> keys) {
return map.multiGet((List) keys);
}
@Override
public void execute(TridentTuple tuple, Object result, TridentCollector collector) {
collector.emit(new Values(result));
}
}
를 후드 단순히 ReadOnlyMapState
구현의 multiGet()
메소드를 호출 한 다음 데이터 저장소에서 찾은 값을 이미 존재하는 튜플에 추가합니다.당신은 할 수있는 최선의 일이 아닐지라도, 좀 더 복잡한 것을 수행하는 BaseQueryFunction<ReadOnlyMapState, Object>
의 구현을 직접 만들 수 있습니다.
이 문맥에서 "Trident"를 정의 할 수 있습니까? Trident라고하는 여러 가지가 있습니다. – Charles
컨텍스트가 "Storm"입니다. https://github.com/nathanmarz/storm/wiki/Documentation#trident – Dan