2012-03-28 1 views
9

저는 ThreadPoolExecutor를 JDK6과 함께 사용하여 스레드 풀링에 대한 다양한 전략을 고민하고 있습니다. 우선 순위 큐가 작동하지만 keepAliveTime (무제한 대기열을 사용하는 경우) 이후에 풀 크기가 어떻게되지 않았는지 확실하지 않았습니다. 그래서 LinkedBlockingQueue와 CallerRuns 정책을 사용하여 ThreadPoolExecutor를보고 있습니다.왜 ThreadPoolExecutor가 keepAliveTime 후에 corePoolSize 아래의 스레드를 줄입니까?

내가 지금 가지고있는 문제는 문서가 설명해야하므로 풀이 올라가는 것입니다.하지만 작업이 완료되고 keepAliveTime이 재생되면 getPoolSize가 풀이 0으로 줄어든 것을 보여줍니다. 예제 코드는 아래 내 질문의 기초를 볼 수 있도록해야합니다

public class ThreadPoolingDemo { 
    private final static Logger LOGGER = 
     Logger.getLogger(ThreadPoolingDemo.class.getName()); 

    public static void main(String[] args) throws Exception { 
     LOGGER.info("MAIN THREAD:starting"); 
     runCallerTestPlain(); 
    } 

    private static void runCallerTestPlain() throws InterruptedException { 
     //10 core threads, 
     //50 max pool size, 
     //100 tasks in queue, 
     //at max pool and full queue - caller runs task 
     ThreadPoolExecutor tpe = new ThreadPoolExecutor(10, 50, 
      5L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(100), 
      new ThreadPoolExecutor.CallerRunsPolicy()); 

     //dump 5000 tasks on the queue 
     for (int i = 0; i < 5000; i++) { 
      tpe.submit(new Runnable() { 
       @Override 
       public void run() { 
        //just to eat some time and give a little feedback 
        for (int j = 0; j < 20; j++) { 
         LOGGER.info("First-batch Task, looping:" + j + "[" 
           + Thread.currentThread().getId() + "]"); 
        } 
       } 
      }, null); 
     } 
     LOGGER.info("MAIN THREAD:!!Done queueing!!"); 

     //check tpe statistics forever 
     while (true) { 
      LOGGER.info("Active count: " + tpe.getActiveCount() + " Pool size: " 
       + tpe.getPoolSize() + " Largest Pool: " + tpe.getLargestPoolSize()); 
      Thread.sleep(1000); 
     } 
    } 
} 

나는이 문제가 될 것으로 보인다 오래된 버그를 발견하지만, 그것은 닫혔다 : http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6458662. 이것은 여전히 ​​1.6에 존재할 수 있습니까, 아니면 제가 빠진 것이 있습니까?

마치 고무로 찍힌 것 같습니다 (http://www.codinghorror.com/blog/2012/03/rubber-duck-problem-solving.html). 위의 링크 된 버그는이 문제와 관련이 있습니다. http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6576792,이 문제는 1.7에서 해결 된 것으로 보입니다 (1.7을로드하고 확인 됨 - 고정 ...). 내 주요 문제는이 근본적인 버그가 거의 10 년 동안 남아 있었다는 것입니다. 이 글을 쓰려면 너무 많은 시간을 쓰고 지금은 게시하지 말고 누군가가 도움이되기를 바랍니다.

+0

+1 멋진 발견은이 행동을 보니 놀랐습니다. –

+1

아마도 귀하의 게시물을 질문으로 구성하고 귀하가 배운 것을 답변으로 제공하는 것이 더 좋을까요? –

답변

6

... 작업 완료 후 keepAliveTime이 재생되면 getPoolSize는 풀이 0으로 감소 함을 나타냅니다.

이렇게 이것은 ThreadPoolExecutor에서 경쟁 조건으로 보입니다. 나는 그것이 예상치 못했지만 디자인에 따라 작동하고 있다고 생각한다. 차단 대기열에서 작업을 얻는을 통해 작업자 스레드 루프,이 코드를 참조 getTask() 방법은 다음 poolSize는 다음 corePoolSize 이상으로 증가하면

if (state == SHUTDOWN) // Help drain queue 
    r = workQueue.poll(); 
else if (poolSize > corePoolSize || allowCoreThreadTimeOut) 
    r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS); 
else 
    r = workQueue.take(); 
if (r != null) 
    return r; 
if (workerCanExit()) { 
    if (runState >= SHUTDOWN) // Wake up others 
     interruptIdleWorkers(); 
    return null; 
} 

keepAliveTime 아웃 후 투표 시간은, 코드는 아래로 떨어지면 rnull이므로 workerCanExit()으로 변경하십시오.

mainLock.lock(); 
    boolean canExit; 
    try { 
     canExit = runState >= STOP || 
      workQueue.isEmpty() || 
      (allowCoreThreadTimeOut && 
      poolSize > Math.max(1, corePoolSize)); << test poolSize here 
    } finally { 
     mainLock.unlock();       << race to workerDone() begins 
    } 

을 즉 poolSize이 감소되어 다음true 다음 작업자 스레드가 종료 및 를 반환하면 : 그냥 poolSize의 상태를 테스트하기 때문에 모든 스레드는 그 방법에서 true을 반환 할 수 있습니다. 모든 작업자 스레드가 동시에 해당 테스트를 수행하면 poolSize 테스트와 --poolSize이 발생할 때 작업자 중지 사이의 경쟁으로 인해 모두 종료됩니다.

경쟁 조건이 얼마나 일관성 있는지는 놀랍습니다. 아래 run() 안에있는 sleep()에 무작위 화를 추가하면 종료하지 말아야 할 몇 가지 핵심 스레드를 얻을 수 있지만 경쟁 조건이 적중했을 것이라고 생각했을 것입니다.


다음과 같은 시험에서이 동작을 볼 수 있습니다

private final Random random = new Random(); 
... 
    Thread.sleep(100 + random.nextInt(100)); 

것은이 것 :이 같은에 run() 방법의 내부 sleep 라인을 변경하는 경우

@Test 
public void test() throws Exception { 
    int before = Thread.activeCount(); 
    int core = 10; 
    int max = 50; 
    int queueSize = 100; 
    ThreadPoolExecutor tpe = 
      new ThreadPoolExecutor(core, max, 1L, TimeUnit.SECONDS, 
        new LinkedBlockingQueue<Runnable>(queueSize), 
        new ThreadPoolExecutor.CallerRunsPolicy()); 
    tpe.allowCoreThreadTimeOut(false); 
    assertEquals(0, tpe.getActiveCount()); 
    // if we start 1 more than can go into core or queue, poolSize goes to 0 
    int startN = core + queueSize + 1; 
    // if we only start jobs the core can take care of, then it won't go to 0 
    // int startN = core + queueSize; 
    for (int i = 0; i < startN; i++) { 
     tpe.submit(new Runnable() { 
      @Override 
      public void run() { 
       try { 
        Thread.sleep(100); 
       } catch (InterruptedException e) { 
        e.printStackTrace(); 
       } 
      } 
     }); 
    } 
    while (true) { 
     System.out.println("active = " + tpe.getActiveCount() + ", poolSize = " + tpe.getPoolSize() 
       + ", largest = " + tpe.getLargestPoolSize() + ", threads = " + (Thread.activeCount() - before)); 
     Thread.sleep(1000); 
    } 
} 

을 경주 조건이 타격을 받기에 더 어려워 지므로 일부 핵심 스레드가 여전히 주위에있게됩니다.

+0

OP는'getActiveCount()'가 아니라'getPoolSize()'에 더 관심이 있습니다. 모든 작업을 완료하고 keepAliveTime이 만료되면 TPE가 종료하지 않고 모든 스레드가 실제로 종료됩니다. –

+0

@Gray 내 TPE 매개 변수가 중요합니다. 항상 실패하지는 않습니다. 사용했던 매개 변수를 사용하여 예상대로 작동합니다. 내가 지정한 매개 변수는 유일하지만 현실적인 방식으로 TPE 규칙 (즉, 대기열에 버려지는 거대한 배치 작업)과 함께 재생됩니다. 매개 변수의 주요 문제점은 스레드가 코어 크기 이상으로 풀에 추가되는 상황을 유발하지 않는다는 것입니다. 코어 크기를 초과하여 초과/추가하려면 풀의 크기가 코어 크기 (10) 여야하며 대기열이 한계 (100)를 초과해야합니다. 'for (int i = 0; i andematt

+0

@andematt 재미있는 여행. 이것은'ThreadPoolExecutor'의 경합 조건입니다. 내 대답을 편집했습니다. – Gray