0
HBase를 백엔드 영구 저장소로 사용하여 Ignite 캐시 저장소를 구현했습니다. 캐시 저장소의 코드는 다음과 같습니다.Ignite Cache Store - 리소스를 해제하는 방법
public class BitDataCachePersistentStore implements CacheStore<Long, byte[]> {
@IgniteInstanceResource
Ignite gridReference;
@CacheNameResource
private String cacheName;
@Override
public byte[] load(Long key) {
String hbaseKey;
try (Connection con = HBaseConnectionUtil.getHbaseConnection()) {
try (Table bitDataPersistentTable = con.getTable(TagDuplicateConstants.BIT_DATA_TABLE_NAME)) {
hbaseKey = TagDuplicateUtil.getPersistentStoreKeyForBitDataCache(cacheName, key.toString());
Get rowToBeFetched = new Get(Bytes.toBytes(hbaseKey));
Result rowFetched = bitDataPersistentTable.get(rowToBeFetched);
if (rowFetched == null || rowFetched.isEmpty()) {
return null; // Can't return an empty array as Ignite will
// load the entry
}
return rowFetched.getValue(TagDuplicateConstants.BIT_DATA_COLUMN_FAMILY_NAME_AS_BYTES,
TagDuplicateConstants.BIT_DATA_COLUMN_QUALIFIER_NAME_AS_BYTES);
}
} catch (IOException e) {
throw new ROCException(TagDuplicateErrorCodes.CACHE_ENTRY_READ_ERROR, e,
"Error while performing read operation for the key [ " + key + " ] of the cache [ " + cacheName
+ " ] ");
}
}
@Override
public Map<Long, byte[]> loadAll(Iterable<? extends Long> keys) {
String hbaseKey;
long startTime = System.currentTimeMillis();
long numberOfKeysLoaded = 0l;
try (Connection con = HBaseConnectionUtil.getHbaseConnection()) {
try (Table bitDataPersistentTable = con.getTable(TagDuplicateConstants.BIT_DATA_TABLE_NAME)) {
IgniteCache<Long, byte[]> cacheToBeLoaded = gridReference.cache(cacheName);
Get rowToBeFetched;
Result rowFetched;
for (Long key : keys) {
hbaseKey = TagDuplicateUtil.getPersistentStoreKeyForBitDataCache(cacheName, key.toString());
rowToBeFetched = new Get(Bytes.toBytes(hbaseKey));
rowFetched = bitDataPersistentTable.get(rowToBeFetched);
cacheToBeLoaded.put(key,
rowFetched.getValue(TagDuplicateConstants.BIT_DATA_COLUMN_FAMILY_NAME_AS_BYTES,
TagDuplicateConstants.BIT_DATA_COLUMN_QUALIFIER_NAME_AS_BYTES));
numberOfKeysLoaded++;
}
System.out.println("LoadAll for [ " + numberOfKeysLoaded + " ] keys of the cache [ " + cacheName
+ " ] took [ " + ((System.currentTimeMillis() - startTime)/1000.0) + " seconds ] ");
return null;
}
} catch (IOException e) {
throw new ROCException(TagDuplicateErrorCodes.CACHE_ENTRY_READ_ERROR, e,
"Error while reading multiple keys for the cache [ " + cacheName + " ] ");
}
}
@Override
public void write(Entry<? extends Long, ? extends byte[]> entry) {
String hbaseKey = TagDuplicateUtil.getPersistentStoreKeyForBitDataCache(cacheName, entry.getKey().toString());
try (Connection con = HBaseConnectionUtil.getHbaseConnection()) {
try (Table bitDataPersistentTable = con.getTable(TagDuplicateConstants.BIT_DATA_TABLE_NAME)) {
Put rowToBeWritten = new Put(Bytes.toBytes(hbaseKey));
rowToBeWritten.addColumn(TagDuplicateConstants.BIT_DATA_COLUMN_FAMILY_NAME_AS_BYTES,
TagDuplicateConstants.BIT_DATA_COLUMN_QUALIFIER_NAME_AS_BYTES, entry.getValue());
bitDataPersistentTable.put(rowToBeWritten);
}
} catch (IOException e) {
throw new ROCException(TagDuplicateErrorCodes.CACHE_ENTRY_WRITE_ERROR, e,
"Error while writing the entry for the key [ " + entry.getKey() + " ] for the cache [ " + cacheName
+ " ] ");
}
}
@Override
public void writeAll(Collection<Entry<? extends Long, ? extends byte[]>> entries) {
long startTime = System.currentTimeMillis();
String hbaseKey;
List<Put> rowsToBeWritten = new ArrayList<>();
Put currentRowToBeWritten;
try (Connection con = HBaseConnectionUtil.getHbaseConnection()) {
try (Table bitDataPersistentTable = con.getTable(TagDuplicateConstants.BIT_DATA_TABLE_NAME)) {
for (Entry<? extends Long, ? extends byte[]> entryToBeInserted : entries) {
hbaseKey = TagDuplicateUtil.getPersistentStoreKeyForBitDataCache(cacheName,
entryToBeInserted.getKey().toString());
currentRowToBeWritten = new Put(hbaseKey.getBytes());
currentRowToBeWritten.addColumn(TagDuplicateConstants.BIT_DATA_COLUMN_FAMILY_NAME_AS_BYTES,
TagDuplicateConstants.BIT_DATA_COLUMN_QUALIFIER_NAME_AS_BYTES,
entryToBeInserted.getValue());
rowsToBeWritten.add(currentRowToBeWritten);
}
bitDataPersistentTable.put(rowsToBeWritten);
}
System.out.println("Time taken to load [ " + entries.size() + " entries ] for the cache [ " + cacheName
+ " ] is " + ((System.currentTimeMillis() - startTime)/1000.0) + " seconds");
} catch (IOException e) {
throw new ROCException(TagDuplicateErrorCodes.CACHE_ENTRY_WRITE_ERROR, e,
"Error while writing multiple keys for the cache [ " + cacheName + " ] ");
}
}
@Override
public void delete(Object key) {
String hbaseKey = TagDuplicateUtil.getPersistentStoreKeyForBitDataCache(cacheName, key.toString());
try (Connection con = HBaseConnectionUtil.getHbaseConnection()) {
try (Table bitDataPersistentTable = con.getTable(TagDuplicateConstants.BIT_DATA_TABLE_NAME)) {
Delete rowToBeDeleted = new Delete(Bytes.toBytes(hbaseKey));
bitDataPersistentTable.delete(rowToBeDeleted);
}
} catch (IOException e) {
throw new ROCException(TagDuplicateErrorCodes.CACHE_ENTRY_REMOVAL_ERROR, e,
"Error while deleting the entry for the key [ " + hbaseKey + " ] for the cache [ " + cacheName
+ " ] ");
}
}
@Override
public void deleteAll(Collection<?> keys) {
String hbaseKey;
List<Delete> rowsToBeDeleted = new ArrayList<>();
try (Connection con = HBaseConnectionUtil.getHbaseConnection()) {
try (Table bitDataPersistentTable = con.getTable(TagDuplicateConstants.BIT_DATA_TABLE_NAME)) {
for (Object keyToBeDeleted : keys) {
hbaseKey = TagDuplicateUtil.getPersistentStoreKeyForBitDataCache(cacheName,
keyToBeDeleted.toString());
rowsToBeDeleted.add(new Delete(hbaseKey.getBytes()));
}
bitDataPersistentTable.delete(rowsToBeDeleted);
}
} catch (IOException e) {
throw new ROCException(TagDuplicateErrorCodes.CACHE_ENTRY_REMOVAL_ERROR, e,
"Error while deleting entries for the cache [ " + cacheName + " ] ");
}
}
@Override
public void loadCache(IgniteBiInClosure<Long, byte[]> clo, Object... args) {
// No implementation provided
}
@Override
public void sessionEnd(boolean commit) {
// No implementation provided
}
}
캐시 모드는 PARTITIONED입니다.
캐시 원 자성 모드는 ATOMIC입니다.
구현 된 각각의 방법에서 HBase에 대한 새로운 연결을 생성한다는 것은 저장소 구현에서 분명합니다.
데이터 소스 특정 리소스 (이 경우, HBase 연결)를 모든 메소드 호출에서 수행하는 대신 더 많은 매크로 레벨로 열고 닫는 것에 대해 더 많은 제어를 할 수있는 방법이나 방법이 있는지 알고 싶었습니다.
아마도 연결 풀링을 살펴보십시오. – GurV