2013-10-09 13 views
0

내 일반적인 질문은 "Accumulo BatchScanner가 범위 당 첫 번째 결과 만 가져올 수 있습니까?"accumulo - batchscanner : 범위 당 하나의 결과

이제 어쨌든이 방법에 접근하는 더 좋은 방법이있을 수 있으므로 사용 사례에 대한 일부 세부 정보가 있습니다. 다른 시스템의 메시지를 나타내는 데이터가 있습니다. 다양한 유형의 메시지가있을 수 있습니다. 내 사용자는 "이 모든 시스템에 대해 특정 시간에 특정 유형의 최신 메시지를 나에게 알려줘"와 같은 시스템 질문을 할 수 있기를 원합니다. 이

rowid: system_name, family: message_type, qualifier: masked_timestamp, value: message_text 

아이디어는 사용자가 나에게 그들은 메시지의 유형 및 특정 타임 스탬프, 신경 시스템의 목록을 제공한다는 것입니다 같은

내 테이블 레이아웃 보인다. 테이블을 가장 최근에 정렬하도록 마스크 된 타임 스탬프를 사용했습니다. 그런 식으로 타임 스탬프를 검색 할 때 첫 번째 결과는 그 시간 이전의 가장 최근 결과입니다. 쿼리 당 검색 할 시스템이 여러 대 있기 때문에 BatchScanner를 사용하고 있습니다. BatchScanner가 각 범위에 대한 첫 번째 결과 만 가져올 수 있습니까? 가장 최근의 날짜가 사용자가 지정한 날짜와 일치하지 않을 수 있으므로 특정 키를 지정할 수 없습니다.

현재 BatchScanner를 사용하고 있으며 Key 당 첫 번째 결과를 제외한 모든 것을 무시하고 있습니다. 지금 당장은 작동하지만 시스템/유형별로 첫 번째 결과 만 신경 쓰면 네트워크를 통해 특정 시스템/유형에 대한 모든 데이터를 다시 가져 오는 것이 낭비처럼 보입니다.

편집

FirstEntryInRowIterator를 사용하여 내 시도

@Test 
public void testFirstEntryIterator() throws Exception 
{ 
    Connector connector = new MockInstance("inst").getConnector("user", new PasswordToken("password")); 
    connector.tableOperations().create("testing"); 

    BatchWriter writer = writer(connector, "testing"); 
    writer.addMutation(mutation("row", "fam", "qual1", "val1")); 
    writer.addMutation(mutation("row", "fam", "qual2", "val2")); 
    writer.addMutation(mutation("row", "fam", "qual3", "val3")); 
    writer.close(); 

    Scanner scanner = connector.createScanner("testing", new Authorizations()); 
    scanner.addScanIterator(new IteratorSetting(50, FirstEntryInRowIterator.class)); 

    Key begin = new Key("row", "fam", "qual2"); 
    scanner.setRange(new Range(begin, begin.followingKey(PartialKey.ROW_COLFAM_COLQUAL))); 

    int numResults = 0; 
    for (Map.Entry<Key, Value> entry : scanner) 
    { 
     Assert.assertEquals("qual2", entry.getKey().getColumnQualifier().toString()); 
     numResults++; 
    } 

    Assert.assertEquals(1, numResults); 
} 

내 목표는 반환 된 항목이 될 것입니다 ("행", "식구들", "qual2", "을 val2")하지만 0 결과를 얻으십시오. 아마 Iterator가 Range 이전에 적용되고있는 것처럼 보입니다. 나는 아직 파고 가지 않았다.

답변

3

Accumulo의 SortedKeyValueIterator 중 하나, 특히 FirstEntryInRowIterator (누적 코어 아티팩트에 포함 된) 중 하나를 사용하는 좋은 사례 인 것처럼 들립니다.

FirstEntryInRowIterator를 사용하여 IteratorSetting을 만들고이를 BatchScanner에 추가하십시오. 이 경우 해당 system_name의 첫 번째 키/값이 반환되고 클라이언트의 오버 헤드가 다른 모든 결과를 무시하지 않도록 방지됩니다.

FirstEntryInRowIterator의 빠른 수정은 당신이 원하는 걸 얻을 수 있습니다 원래 포스터 행당 범위를 만드는 것처럼

/* 
* Licensed to the Apache Software Foundation (ASF) under one or more 
* contributor license agreements. See the NOTICE file distributed with 
* this work for additional information regarding copyright ownership. 
* The ASF licenses this file to You under the Apache License, Version 2.0 
* (the "License"); you may not use this file except in compliance with 
* the License. You may obtain a copy of the License at 
* 
*  http://www.apache.org/licenses/LICENSE-2.0 
* 
* Unless required by applicable law or agreed to in writing, software 
* distributed under the License is distributed on an "AS IS" BASIS, 
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
* See the License for the specific language governing permissions and 
* limitations under the License. 
*/ 
package org.apache.accumulo.core.iterators; 

import java.io.IOException; 
import java.util.Collection; 
import java.util.HashMap; 
import java.util.Map; 

import org.apache.accumulo.core.client.IteratorSetting; 
import org.apache.accumulo.core.data.ByteSequence; 
import org.apache.accumulo.core.data.Key; 
import org.apache.accumulo.core.data.PartialKey; 
import org.apache.accumulo.core.data.Range; 
import org.apache.accumulo.core.data.Value; 
import org.apache.hadoop.io.Text; 

public class FirstEntryInRangeIterator extends SkippingIterator implements OptionDescriber { 

    // options 
    static final String NUM_SCANS_STRING_NAME = "scansBeforeSeek"; 

    // iterator predecessor seek options to pass through 
    private Range latestRange; 
    private Collection<ByteSequence> latestColumnFamilies; 
    private boolean latestInclusive; 

    // private fields 
    private Text lastRowFound; 
    private int numscans; 

    /** 
    * convenience method to set the option to optimize the frequency of scans vs. seeks 
    */ 
    public static void setNumScansBeforeSeek(IteratorSetting cfg, int num) { 
    cfg.addOption(NUM_SCANS_STRING_NAME, Integer.toString(num)); 
    } 

    // this must be public for OptionsDescriber 
    public FirstEntryInRangeIterator() { 
    super(); 
    } 

    public FirstEntryInRangeIterator(FirstEntryInRangeIterator other, IteratorEnvironment env) { 
    super(); 
    setSource(other.getSource().deepCopy(env)); 
    } 

    @Override 
    public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) { 
    return new FirstEntryInRangeIterator(this, env); 
    } 

    @Override 
    public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException { 
    super.init(source, options, env); 
    String o = options.get(NUM_SCANS_STRING_NAME); 
    numscans = o == null ? 10 : Integer.parseInt(o); 
    } 

    // this is only ever called immediately after getting "next" entry 
    @Override 
    protected void consume() throws IOException { 
    if (finished == true || lastRowFound == null) 
     return; 
    int count = 0; 
    while (getSource().hasTop() && lastRowFound.equals(getSource().getTopKey().getRow())) { 

     // try to efficiently jump to the next matching key 
     if (count < numscans) { 
     ++count; 
     getSource().next(); // scan 
     } else { 
     // too many scans, just seek 
     count = 0; 

     // determine where to seek to, but don't go beyond the user-specified range 
     Key nextKey = getSource().getTopKey().followingKey(PartialKey.ROW); 
     if (!latestRange.afterEndKey(nextKey)) 
      getSource().seek(new Range(nextKey, true, latestRange.getEndKey(), latestRange.isEndKeyInclusive()), latestColumnFamilies, latestInclusive); 
     else { 
      finished = true; 
      break; 
     } 
     } 
    } 
    lastRowFound = getSource().hasTop() ? getSource().getTopKey().getRow(lastRowFound) : null; 
    } 

    private boolean finished = true; 

    @Override 
    public boolean hasTop() { 
    return !finished && getSource().hasTop(); 
    } 

    @Override 
    public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException { 
    // save parameters for future internal seeks 
    latestRange = range; 
    latestColumnFamilies = columnFamilies; 
    latestInclusive = inclusive; 
    lastRowFound = null; 

    super.seek(range, columnFamilies, inclusive); 
    finished = false; 

    if (getSource().hasTop()) { 
     lastRowFound = getSource().getTopKey().getRow(); 
     if (range.beforeStartKey(getSource().getTopKey())) 
     consume(); 
    } 
    } 

    @Override 
    public IteratorOptions describeOptions() { 
    String name = "firstEntry"; 
    String desc = "Only allows iteration over the first entry per range"; 
    HashMap<String,String> namedOptions = new HashMap<String,String>(); 
    namedOptions.put(NUM_SCANS_STRING_NAME, "Number of scans to try before seeking [10]"); 
    return new IteratorOptions(name, desc, namedOptions, null); 
    } 

    @Override 
    public boolean validateOptions(Map<String,String> options) { 
    try { 
     String o = options.get(NUM_SCANS_STRING_NAME); 
     if (o != null) 
     Integer.parseInt(o); 
    } catch (Exception e) { 
     throw new IllegalArgumentException("bad integer " + NUM_SCANS_STRING_NAME + ":" + options.get(NUM_SCANS_STRING_NAME), e); 
    } 
    return true; 
    } 

} 
+0

것 같습니다. FirstEntryInRowIterator를 사용하는 경우에도 그렇게 할 필요는 없습니다. 일괄 스캔은 여전히 ​​유용 할 수 있지만 이러한 특정 기능을 달성하는 것은 아닙니다. – Christopher

+0

답장 elserj에 감사드립니다. 나는 이것으로 조금 놀기 시작했으나 내가 원하는 것을 얻을 수 없었다. 나는 좀 더 파고 싶다. 내가 현재보고있는 것에 대한 설명은 편집을 참조하십시오. – jeff

+0

아, FirstEntryInRowIterator는 사용자가 제공하는 Range의 startKey에서 행의 첫 번째 키/값을 찾으려고 시도합니다 (예를 들어, "line fam : qual2"에서 제공 한 Range의 startKey를 " 열"). 그래서 iterator는 사용자가 제공 한 Range 외부에있는 Key "rom fam : qual1"을 반환하려고 시도하므로 아무 결과도 얻지 못합니다. – elserj