2017-11-29 17 views
1

Ignite 1.7.0을 사용 중이며 Apache Ignite의 쓰기 기능을 테스트하고있었습니다. 이 질문을하는 동기는 Apache Ignite에서 쓰기 저장 기능을 사용할 수있는 상황에서 발생하는 상황을보다 잘 이해하는 것입니다.내부에서 쓰기 무시 쓰기

테스트 캐시에 20 개의 항목을 삽입하는 Ignite Client Program이 있습니다 ("test_cache"라고 함).

Ignite 서버는 동일한 시스템에서 실행 중이지만 다른 JVM에서 실행 중입니다.

  1. 을 통해 읽기를 통해 쓰기 활성화 뒤에 쓰기 :

    은 Ignite 캐시는 다음과 같은 구성 설정이 있습니다.

  2. 플러시 크기는 13
  3. 플러시 스레드 수는 다른 모든 속성이 기본값으로 설정되어 1

입니다.

더하여, 이것에 캐시하도록 구성된 캐시 저장소가 상기 코드는 다음과 같다 :

I가 의도적 Thread.sleep를 writeAll이라는의() 메소드() 메소드 위해 호출 한
package com.ignite.genericpoc; 

import java.util.Collection; 
import java.util.Map; 

import javax.cache.Cache.Entry; 
import javax.cache.integration.CacheLoaderException; 
import javax.cache.integration.CacheWriterException; 

import org.apache.ignite.Ignite; 
import org.apache.ignite.IgniteCache; 
import org.apache.ignite.cache.store.CacheStore; 
import org.apache.ignite.lang.IgniteBiInClosure; 
import org.apache.ignite.resources.CacheNameResource; 
import org.apache.ignite.resources.IgniteInstanceResource; 

public class IgniteStoreTest implements CacheStore<String, String> { 

@IgniteInstanceResource 
Ignite gridReference; 

@CacheNameResource 
String cacheName; 

@Override 
public String load(String key) throws CacheLoaderException { 
    System.out.println("load method called for the key [ " + key + " ] and cache [ " + cacheName + " ] "); 
    return null; 
} 

@Override 
public Map<String, String> loadAll(Iterable<? extends String> keys) throws CacheLoaderException { 

    IgniteCache<String, String> ic = gridReference.cache(cacheName); 

    int currentKeyNo = 0; 

    for (String key : keys) { 
     ic.put(key, "Value:" + currentKeyNo); 
     currentKeyNo++; 
    } 

    System.out.println("Got " + currentKeyNo + " entries"); 

    return null; 
} 

@Override 
public void write(Entry<? extends String, ? extends String> entry) throws CacheWriterException { 
    System.out.println("Write method called"); 
} 

@Override 
public void writeAll(Collection<Entry<? extends String, ? extends String>> entries) throws CacheWriterException { 
    System.out.println("Write all method called for [ " + entries.size() + " ] entries in the thread " 
      + Thread.currentThread().getName()); 

    System.out.println("Entries recieved by " + Thread.currentThread().getName() + " : " + entries.toString()); 

    try { 
     Thread.sleep(60000); 
    } catch (InterruptedException e) { 
     e.printStackTrace(); 
    } 

} 

@Override 
public void delete(Object key) throws CacheWriterException { 
    System.out.println("Delete method called"); 
} 

@Override 
public void deleteAll(Collection<?> keys) throws CacheWriterException { 
    System.out.println("Delete All method called"); 
} 

@Override 
public void loadCache(IgniteBiInClosure<String, String> clo, Object... args) throws CacheLoaderException { 
    System.out.println("Load cache method called with " + args[0].toString()); 
} 

@Override 
public void sessionEnd(boolean commit) throws CacheWriterException { 
    System.out.println("Session End called"); 
} 

} 

, 느린 데이터베이스 쓰기를 시뮬레이트합니다. 다음과 같이

캐시에 데이터를로드에서 Ignite 클라이언트에 대한 코드는 다음과 같이

package com.ignite.genericpoc; 

import java.util.ArrayList; 
import java.util.List; 

import javax.cache.configuration.FactoryBuilder; 

import org.apache.ignite.Ignite; 
import org.apache.ignite.IgniteCache; 
import org.apache.ignite.Ignition; 
import org.apache.ignite.configuration.CacheConfiguration; 

public class IgnitePersistentStoreClientTest { 

public static void main(String[] args) throws InterruptedException { 

    List<String> addressess = new ArrayList<>(); 

    addressess.add("*.*.*.*:47500"); // Hiding the IP 

    Ignition.setClientMode(true); 

    Ignite i = IgniteConfigurationUtil.startIgniteServer(
      IgniteConfigurationUtil.getIgniteConfiguration(false, IgniteTestConstants.GRID_NAME, addressess)); 

    System.out.println("Client Started"); 

    CacheConfiguration<String, String> ccfg = new CacheConfiguration<>(); 

    ccfg.setName("Persistent_Store_Test_Cache"); 

    ccfg.setCacheStoreFactory(FactoryBuilder.factoryOf(IgniteStoreTest.class)); 

    ccfg.setReadThrough(true); 

    ccfg.setWriteThrough(true); 

    ccfg.setWriteBehindEnabled(true); 

    ccfg.setWriteBehindFlushSize(13); 

    ccfg.setWriteBehindFlushThreadCount(1); 

    System.out.println(ccfg.getWriteBehindBatchSize()); 

    IgniteCache<String, String> ic = i.getOrCreateCache(ccfg); 

    System.out.println("Cache Created"); 

    for (int t = 1; t <= 20; t++) { 
     System.out.println("Loading key "+t); 
     ic.put("Key:" + t,"Value: "+t); 
     System.out.println("Key "+ t + " loaded "); 
    } 

    System.out.println("Cache Loaded"); 

    i.close(); 

} 

} 

실행이 발생합니다

  1. 은 Ignite 서버

    먼저 시작됩니다.

  2. 데이터를로드하는 Ignite Client는 서버 다음에 시작됩니다.

  3. writeAll() 메서드에 대해 60 초의 절전 모드가 정의되어 있기 때문에 Ignite Client는 20 번째 항목을 쓰는 동안 멈추게됩니다.

  4. 또한 서버 로그에서 writeAll() 메서드가 두 개의 스레드에 대해 호출 된 것을 볼 수 있습니다. 그 중 Flush 스레드는 저장소에 쓰기 위해 15 개의 항목을 받았으며 시스템 스레드는 1 개의 항목을 받았습니다. 상점에 씁니다. 다음은 Ignite 서버 로그는 다음과 같습니다

    쓰기를 요구하는 모든 방법 [15] 변기-0- 스레드 # 66 % test_grid %의

    쓰기 스레드 SYS에서 [1] 항목을 촉구 모든 방법의 항목 - # 22 % test_grid %

캐시 뒤에 쓰기가 가득하고 모든 플러시 스레드는 바쁜 데이터를 기록하기 때문에 나는 다음 Ignite 클라이언트 넣어이 항목 20를 기록에 갇혀 있음을 이해할 수있다.

다음을

가 포인트가 된 나는 명확한 이해가 필요하다 : 14 항목을 삽입하는 동안 클라이언트가 20 항목을 삽입에서 차단되는 이유는

  • , 그것은 차단 했어야을 (를 기반으로 나는 512

  • 에 그것을 기본값을 일괄 처리 크기를 설정하지 않은으로 13 개 항목의 최대 캐시 크기)에

  • 이유는, 15 개 항목과 모든 19 개 항목이라는 세척 실이었다writeAll() 메서드로 호출 된 시스템 스레드가 Ignite Client에서 20 번째 항목을 넣으려는 요청을 처리하는 스레드와 같은 스레드인지 확인합니다.

  • 캐시가 쓰기 가능 상태이고 쓰기 순서 모드가 PRIMARY_SYNC (기본값)이고 캐시에 백업이없는 경우 기본 노드가 쓰기를 커밋 할 수있을 때까지 캐시에 대한 모든 호출을 차단해야합니다. 이것은 또한 Write Behind 캐시에 항목을 넣을 수 있음을 의미합니다.

  • 서버에 항목을 저장하는 경우 Ignite 서버는 저장소 및 캐시 쓰기 방지를 위해 항목의 복사본을 두 개 만듭니다. 또는 동일한 항목의 참조가 사용됩니다.

양해 해 주셔서 감사합니다. 질문이 너무 길면 사과드립니다. 그러나 내용은 관심있는 잠재 고객에게 상황을 설명하는 데 필수적입니다.

답변

2

후기 저장 저장소는 후드 아래에 배압 제어 장치가 있습니다. 시스템이 비동기 작업을 처리 할 수없는 경우 비동기 작업을 즉시 동기화로 변환 할 수 있습니다.
기본 write-behind 캐시의 크기가 임계 크기 (flushSize * 1.5)를 초과하는 경우 쓰기 작업을 실행하는 스레드가 flusherThread 대신에 사용됩니다.

  • - 변기 0- # 66 % test_grid의 % (일반 변기 실)
  • sys- # 22 % test_grid %의 (백 압력 :
    당신이 당신의 로그에 thread를 볼 이유 즉 제어, 조작이) 뒤에 활성화 및 주문 모드 PRIMARY_SYNC (기본 입니다 쓰기 쓰기 권한이 내 캐시을 고려)

클라이언트 스레드를 사용하여 실행하고 캐시에는 백업은 어떤 넣어 호출이 없습니다 캐시 e는 기본 노드가 이 쓰기를 커밋 할 수있을 때까지 차단되어야합니다. 이것은 또한 캐시 뒤에 쓰기 항목을 넣을 수 있음을 의미합니까?

예. 서버에서 항목을 저장하는 경우

는 발화 서버는 엔트리 스토리지에 대한 하나 캐시 뒤에 쓰기위한 하나의 두 개의 복사본을 만드는 않습니다. 또는 동일한 항목의 참조가 사용됩니다.

동일한 항목의 참조를 사용해야합니다.

은의 단계로이 시나리오 단계를 살펴 보자 :

  • 클라이언트 스레드 (14 개) 항목을 업로드하셨습니다. GridCacheWriteBehindStore은 플러시 크기를 초과하는 기본 캐시로의 항목 수가 감지되고 플러시 스레드를 깨우는 신호를 보냅니다. 참조하십시오 GridCacheWriteBehindStore#updateCache()

  • 변기 스레드가 깨어나서 write-behind-cache.entrySet().iterator()를 통해 (ConcurrentLinkedHashMap 인) 쓰기 숨김 캐시에서 데이터를 수집하려고합니다. 이 반복자는 약하게 일관된 순회를 제공합니다. 즉, 반복 이후의 수정 사항을 반영하지는 않습니다. 중요한 점은 클라이언트 스레드가 새로운 항목을 병렬로 배치한다는 것입니다.

  • 클라이언트 스레드는 마지막 값 [key=Key:20, val=Value: 20]을 넣습니다. 동시에, 플러 셔 스레드는 방법에서 Thread.sleep()에 의해 차단됩니다. GridCacheWriteBehindStore은 쓰기 저장 캐시의 현재 크기가 임계 크기 (플러시 크기 * 1.5)를 초과했음을 감지하므로 배압 메커니즘을 사용해야합니다. GridCacheWriteBehindStore은 write-behind 캐시에서 가장 오래된 값을 플러시하기 위해 flushSingleValue() 메서드를 호출합니다 (물론이 값은 플러 셔 스레드에 의해 이전에 획득되어서는 안됩니다). flushSingleValue() 메서드는 클라이언트 스레드의 컨텍스트에서 호출됩니다.

  • 이후 플러시 스레드가 깨어나고 나머지 항목을 처리합니다.

필기체 저장 구현을 이해하는 것이 도움이되기를 바랍니다.

감사합니다.

+0

답장을 보내 주셔서 감사 드리며 (flushSize * 1.5)는 눈을 뜨게합니다. 항목의 참조가 쓰기 저장 캐시에 저장되므로 Ignite 서버의 메모리 요구 사항을 조정할 때 쓰기 저장 캐시의 메모리를 고려할 필요가 없습니다. 또한, 여전히 플러시 스레드가 플러시 할 항목이 19 개가 아닌 15 개의 항목 만 가져온 이유는 이해할 수 없습니다. 이는 캐시 뒤의 쓰기가 꽉 찼고 배치 크기가 512 였기 때문에 최적의 선택이었을 것입니다. 먼저 플러시 빈도가 있다고 생각했습니다. , 1 분으로 늘리면 효과가 없습니다. Flush Thread는 항상 15 개의 항목이 있습니다. –

+0

자세한 내용을 제공하기 위해 답변을 업데이트했습니다. – sk0x50