Ignite 1.7.0을 사용 중이며 Apache Ignite의 쓰기 기능을 테스트하고있었습니다. 이 질문을하는 동기는 Apache Ignite에서 쓰기 저장 기능을 사용할 수있는 상황에서 발생하는 상황을보다 잘 이해하는 것입니다.내부에서 쓰기 무시 쓰기
테스트 캐시에 20 개의 항목을 삽입하는 Ignite Client Program이 있습니다 ("test_cache"라고 함).
Ignite 서버는 동일한 시스템에서 실행 중이지만 다른 JVM에서 실행 중입니다.
- 을 통해 읽기를 통해 쓰기 활성화 뒤에 쓰기 :
은 Ignite 캐시는 다음과 같은 구성 설정이 있습니다.
- 플러시 크기는 13
- 플러시 스레드 수는 다른 모든 속성이 기본값으로 설정되어 1
입니다.
더하여, 이것에 캐시하도록 구성된 캐시 저장소가 상기 코드는 다음과 같다 :
I가 의도적 Thread.sleep를 writeAll이라는의() 메소드() 메소드 위해 호출 한package com.ignite.genericpoc;
import java.util.Collection;
import java.util.Map;
import javax.cache.Cache.Entry;
import javax.cache.integration.CacheLoaderException;
import javax.cache.integration.CacheWriterException;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.store.CacheStore;
import org.apache.ignite.lang.IgniteBiInClosure;
import org.apache.ignite.resources.CacheNameResource;
import org.apache.ignite.resources.IgniteInstanceResource;
public class IgniteStoreTest implements CacheStore<String, String> {
@IgniteInstanceResource
Ignite gridReference;
@CacheNameResource
String cacheName;
@Override
public String load(String key) throws CacheLoaderException {
System.out.println("load method called for the key [ " + key + " ] and cache [ " + cacheName + " ] ");
return null;
}
@Override
public Map<String, String> loadAll(Iterable<? extends String> keys) throws CacheLoaderException {
IgniteCache<String, String> ic = gridReference.cache(cacheName);
int currentKeyNo = 0;
for (String key : keys) {
ic.put(key, "Value:" + currentKeyNo);
currentKeyNo++;
}
System.out.println("Got " + currentKeyNo + " entries");
return null;
}
@Override
public void write(Entry<? extends String, ? extends String> entry) throws CacheWriterException {
System.out.println("Write method called");
}
@Override
public void writeAll(Collection<Entry<? extends String, ? extends String>> entries) throws CacheWriterException {
System.out.println("Write all method called for [ " + entries.size() + " ] entries in the thread "
+ Thread.currentThread().getName());
System.out.println("Entries recieved by " + Thread.currentThread().getName() + " : " + entries.toString());
try {
Thread.sleep(60000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void delete(Object key) throws CacheWriterException {
System.out.println("Delete method called");
}
@Override
public void deleteAll(Collection<?> keys) throws CacheWriterException {
System.out.println("Delete All method called");
}
@Override
public void loadCache(IgniteBiInClosure<String, String> clo, Object... args) throws CacheLoaderException {
System.out.println("Load cache method called with " + args[0].toString());
}
@Override
public void sessionEnd(boolean commit) throws CacheWriterException {
System.out.println("Session End called");
}
}
, 느린 데이터베이스 쓰기를 시뮬레이트합니다. 다음과 같이
캐시에 데이터를로드에서 Ignite 클라이언트에 대한 코드는 다음과 같이
package com.ignite.genericpoc;
import java.util.ArrayList;
import java.util.List;
import javax.cache.configuration.FactoryBuilder;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.Ignition;
import org.apache.ignite.configuration.CacheConfiguration;
public class IgnitePersistentStoreClientTest {
public static void main(String[] args) throws InterruptedException {
List<String> addressess = new ArrayList<>();
addressess.add("*.*.*.*:47500"); // Hiding the IP
Ignition.setClientMode(true);
Ignite i = IgniteConfigurationUtil.startIgniteServer(
IgniteConfigurationUtil.getIgniteConfiguration(false, IgniteTestConstants.GRID_NAME, addressess));
System.out.println("Client Started");
CacheConfiguration<String, String> ccfg = new CacheConfiguration<>();
ccfg.setName("Persistent_Store_Test_Cache");
ccfg.setCacheStoreFactory(FactoryBuilder.factoryOf(IgniteStoreTest.class));
ccfg.setReadThrough(true);
ccfg.setWriteThrough(true);
ccfg.setWriteBehindEnabled(true);
ccfg.setWriteBehindFlushSize(13);
ccfg.setWriteBehindFlushThreadCount(1);
System.out.println(ccfg.getWriteBehindBatchSize());
IgniteCache<String, String> ic = i.getOrCreateCache(ccfg);
System.out.println("Cache Created");
for (int t = 1; t <= 20; t++) {
System.out.println("Loading key "+t);
ic.put("Key:" + t,"Value: "+t);
System.out.println("Key "+ t + " loaded ");
}
System.out.println("Cache Loaded");
i.close();
}
}
실행이 발생합니다
은 Ignite 서버
먼저 시작됩니다.데이터를로드하는 Ignite Client는 서버 다음에 시작됩니다.
writeAll() 메서드에 대해 60 초의 절전 모드가 정의되어 있기 때문에 Ignite Client는 20 번째 항목을 쓰는 동안 멈추게됩니다.
또한 서버 로그에서 writeAll() 메서드가 두 개의 스레드에 대해 호출 된 것을 볼 수 있습니다. 그 중 Flush 스레드는 저장소에 쓰기 위해 15 개의 항목을 받았으며 시스템 스레드는 1 개의 항목을 받았습니다. 상점에 씁니다. 다음은 Ignite 서버 로그는 다음과 같습니다
쓰기를 요구하는 모든 방법 [15] 변기-0- 스레드 # 66 % test_grid %의
쓰기 스레드 SYS에서 [1] 항목을 촉구 모든 방법의 항목 - # 22 % test_grid %
캐시 뒤에 쓰기가 가득하고 모든 플러시 스레드는 바쁜 데이터를 기록하기 때문에 나는 다음 Ignite 클라이언트 넣어이 항목 20를 기록에 갇혀 있음을 이해할 수있다.
다음을
가 포인트가 된 나는 명확한 이해가 필요하다 : 14 항목을 삽입하는 동안 클라이언트가 20 항목을 삽입에서 차단되는 이유는, 그것은 차단 했어야을 (를 기반으로 나는 512
에 그것을 기본값을 일괄 처리 크기를 설정하지 않은으로 13 개 항목의 최대 캐시 크기)에
- 이유는, 15 개 항목과 모든 19 개 항목이라는 세척 실이었다writeAll() 메서드로 호출 된 시스템 스레드가 Ignite Client에서 20 번째 항목을 넣으려는 요청을 처리하는 스레드와 같은 스레드인지 확인합니다.
캐시가 쓰기 가능 상태이고 쓰기 순서 모드가 PRIMARY_SYNC (기본값)이고 캐시에 백업이없는 경우 기본 노드가 쓰기를 커밋 할 수있을 때까지 캐시에 대한 모든 호출을 차단해야합니다. 이것은 또한 Write Behind 캐시에 항목을 넣을 수 있음을 의미합니다.
서버에 항목을 저장하는 경우 Ignite 서버는 저장소 및 캐시 쓰기 방지를 위해 항목의 복사본을 두 개 만듭니다. 또는 동일한 항목의 참조가 사용됩니다.
양해 해 주셔서 감사합니다. 질문이 너무 길면 사과드립니다. 그러나 내용은 관심있는 잠재 고객에게 상황을 설명하는 데 필수적입니다.
답장을 보내 주셔서 감사 드리며 (flushSize * 1.5)는 눈을 뜨게합니다. 항목의 참조가 쓰기 저장 캐시에 저장되므로 Ignite 서버의 메모리 요구 사항을 조정할 때 쓰기 저장 캐시의 메모리를 고려할 필요가 없습니다. 또한, 여전히 플러시 스레드가 플러시 할 항목이 19 개가 아닌 15 개의 항목 만 가져온 이유는 이해할 수 없습니다. 이는 캐시 뒤의 쓰기가 꽉 찼고 배치 크기가 512 였기 때문에 최적의 선택이었을 것입니다. 먼저 플러시 빈도가 있다고 생각했습니다. , 1 분으로 늘리면 효과가 없습니다. Flush Thread는 항상 15 개의 항목이 있습니다. –
자세한 내용을 제공하기 위해 답변을 업데이트했습니다. – sk0x50