2012-09-19 3 views
0

memcached 버전 1.4.7 및 spymemcached 2.8.4를 클라이언트로 사용하여 키 값을 설정하고 가져옵니다. 다중 스레드 및 고부하 환경에서 spymemcached 클라이언트를 사용하면 캐시 자체의 값을 설정할 수 없습니다.다중 스레드 및 고부하에서 Spymemcached 집합을 사용할 수 없습니다.

20 개의 작업자 스레드로 똑같이 나눠진 40M 길이의 키로 내로드 테스트 프로그램을 실행하고 있습니다. 각 작업자 스레드는 1M 키를 캐시에 설정하려고 시도합니다. 따라서 40 개의 작업자 스레드가 실행 중입니다.

내 DefaultCache.java 파일에 20 개의 spymemcached 클라이언트로 구성된 연결 풀을 만들었습니다. 작업자 스레드가 캐시 할 키를 설정하려고 할 때마다 DefaultCache.java는 getCache() 메소드에 표시된 것처럼 임의의 클라이언트를 리턴합니다.

은 내 프로그램 종료, 그것은

총 인쇄 할 때 키를 더는 = 40000000

를로드하지 내가 memcached를 텔넷 콘솔로 갈 때, 항상 몇 수천 기록이 골대를 벗어났습니다. 또한 무작위로 null을 출력하는 몇 개의 키를 가져 와서 확인했습니다. 추방은 없으며 cmd_set, curr_items, total_items는 각각 39.5M과 같습니다.

캐시에 누락 된 키의 원인은 무엇일까요?

다음은 참조 용 코드입니다.

public class TestCacheLoader { 
public static final Long TOTAL_RECORDS = 40000000L; 
public static final Long LIMIT = 1000000L; 

public static void main(String[] args) { 
    long keyCount = loadKeyCacheData(); 
    System.out.println("Total no of keys loaded = " + keyCount); 
} 

public static long loadKeyCacheData() { 
    DefaultCache cache = new DefaultCache(); 
    List<Future<Long>> futureList = new ArrayList<Future<Long>>(); 
    ExecutorService executorThread = Executors.newFixedThreadPool(40); 
    long offset = 0; 
    long keyCount = 0; 
    long workerCount = 0; 
    try { 
     do { 
      List<Long> keyList = new ArrayList<Long>(LIMIT.intValue()); 
      for (long counter = offset; counter < (offset + LIMIT) && counter < TOTAL_RECORDS; counter++) { 
       keyList.add(counter); 
      } 
      if (keyList.size() != 0) { 
       System.out.println("Initiating a new worker thread " + workerCount++); 
       KeyCacheThread keyCacheThread = new KeyCacheThread(keyList, cache); 
       futureList.add(executorThread.submit(keyCacheThread)); 
      } 
      offset += LIMIT; 
     } while (offset < TOTAL_RECORDS); 
     for (Future<Long> future : futureList) { 
      keyCount += (Long) future.get(); 
     } 
    } catch (Exception e) { 
     e.printStackTrace(); 
    } finally { 
     cache.shutdown(); 
    } 
    return keyCount; 
} 

}

class KeyCacheThread implements Callable<Long> { 
private List<Long> keyList; 
private DefaultCache cache; 

public KeyCacheThread(List<Long> keyList, DefaultCache cache) { 
    this.keyList = keyList; 
    this.cache = cache; 
} 

public Long call() { 
    return createKeyCache(); 
} 

public Long createKeyCache() { 
    String compoundKey = ""; 
    long keyCounter = 0; 
    System.out.println(Thread.currentThread() + " started to process " + keyList.size() + " keys"); 
    for (Long key : keyList) { 
     keyCounter++; 
     compoundKey = key.toString(); 
     cache.set(compoundKey, 0, key); 
    } 
    System.out.println(Thread.currentThread() + " processed = " + keyCounter + " keys"); 
    return keyCounter; 
} 

}

public class DefaultCache { 
private static final Logger LOGGER = Logger.getLogger(DefaultCache.class); 

private MemcachedClient[] clients; 

public DefaultCache() { 
    this.cacheNamespace = ""; 
    this.cacheName = "keyCache"; 
    this.addresses = "127.0.0.1:11211"; 
    this.cacheLookupTimeout = 3000; 
    this.numberOfClients = 20; 

    try { 
     LOGGER.debug("Cache initialization started for the cache : " + cacheName); 
     ConnectionFactory connectionFactory = new DefaultConnectionFactory(DefaultConnectionFactory.DEFAULT_OP_QUEUE_LEN, 
       DefaultConnectionFactory.DEFAULT_READ_BUFFER_SIZE, DefaultHashAlgorithm.KETAMA_HASH) { 
      public NodeLocator createLocator(List<MemcachedNode> list) { 
       KetamaNodeLocator locator = new KetamaNodeLocator(list, DefaultHashAlgorithm.KETAMA_HASH); 
       return locator; 
      } 
     }; 

     clients = new MemcachedClient[numberOfClients]; 

     for (int i = 0; i < numberOfClients; i++) { 
      MemcachedClient client = new MemcachedClient(connectionFactory, AddrUtil.getAddresses(getServerAddresses(addresses))); 
      clients[i] = client; 
     } 
     LOGGER.debug("Cache initialization ended for the cache : " + cacheName); 
    } catch (IOException e) { 
     LOGGER.error("Exception occured while initializing cache : " + cacheName, e); 
     throw new CacheException("Exception occured while initializing cache : " + cacheName, e); 
    } 
} 

public Object get(String key) { 
    try { 
     return getCache().get(cacheNamespace + key); 
    } catch (Exception e) { 
     return null; 
    } 
} 

public void set(String key, Integer expiryTime, final Object value) { 
    getCache().set(cacheNamespace + key, expiryTime, value); 
} 

public Object delete(String key) { 
    return getCache().delete(cacheNamespace + key); 
} 

public void shutdown() { 
    for (MemcachedClient client : clients) { 
     client.shutdown(); 
    } 
} 

public void flush() { 
    for (MemcachedClient client : clients) { 
     client.flush(); 
    } 
} 

private MemcachedClient getCache() { 
    MemcachedClient client = null; 
    int i = (int) (Math.random() * numberOfClients); 
    client = clients[i]; 
    return client; 
} 

private String getServerAddresses(List<Address> addresses) { 
    StringBuilder addressStr = new StringBuilder(); 
    for (Address address : addresses) { 
     addressStr.append(address.getHost()).append(":").append(address.getPort()).append(" "); 
    } 
    return addressStr.toString().trim(); 
} 

}

+0

이러한 키가 캐시에서 제거되지 않았습니까? – mikewied

+0

그래, 나는 telnet stats 명령에서 내게 evictions = 0이라는 것을 볼 수있다. –

답변

0

나는 확실하지 오전하지만 spymemcached 라이브러리 자체의 문제 보인다. xmemcached를 사용하기 위해 DefaultCache.java 파일의 구현을 변경했으며 모든 것이 잘 작동하기 시작했습니다. 이제 나는 어떤 기록도 놓치지 않고있다. 텔넷 통계가 일치하는 수의 set 명령을 보여줍니다.

양해 해 주셔서 감사합니다.

1

나는 똑같이 보았다. 그 이유는 비동기 작업에 사용하는 원자로 패턴 때문입니다. 그것은 1 연결 당 1 개의 작업자 스레드를 의미합니다. 이 1 스레드는 고부하 및 다중 스레드 시스템에서 부트 로크입니다. 1 스레드는 1 CPU 만로드 할 수 있고 나머지 23 스레드는 유휴 상태 일 수 있습니다.

우리는 작업자 스레드를 증가시키고 더 많은 하드웨어 전력을 사용할 수있는 연결 풀을 생각해 냈습니다. 프로젝트 3levelmemcachegithub에서 프로젝트를 확인하십시오.