2017-10-24 24 views
0

생산자 소비자 패턴, 집행자 프레임 워크 - 그것은 섹션 5.3.1에 <a href="http://jcip.net/" rel="nofollow noreferrer">Java_author</a>에 의해 언급

는 ... 많은 생산자 - 소비자 설계는 Executor 작업 실행 프레임 워크를 사용하여 표현 될 수있다, 에서 사용하는 자체 생산자 - 소비자 패턴.

은 ... 생산자 - 소비자 패턴은 스레드 친화적 인 수단 (가능한 경우) 간단한 구성 요소로 문제을 분해을 제공합니다.


Executor 프레임 워크 구현이 내부적으로 생산자 - 소비자 패턴을 따라합니까?

그렇다면 생산자 - 소비자 패턴 아이디어가 Executor 프레임 워크 구현에 어떻게 도움이됩니까?

+0

ThreadPoolExecutor 클래스에서 살펴볼 수 있습니다. – atom

+0

@ BongCon 현재 1 부, 마지막 장.'ThreadPoolExecutor'에 대해 이야기하는 Part 2를 아직 시작하지 않았습니다. Part 2로 바로 넘어 가기가 어렵습니다. – overexchange

+0

실제로 생산자 패턴이 아닙니다. .. 메인 클래스는 실행될 작업을 생성하고 Executor는 그것을 실행하여 작업을 소비합니다 .. –

답변

1

확인 구현 지금 Worker

private boolean addWorker(Runnable firstTask, boolean core) { 
    // After some checks, it creates Worker and start the thread 
    Worker w = new Worker(firstTask); 
    Thread t = w.thread; 

    // After some checks, thread has been started 
    t.start(); 
} 

구현을 확인합니다. 요약

/** 
* Performs blocking or timed wait for a task, depending on 
* current configuration settings, or returns null if this worker 
* must exit because of any of: 
* 1. There are more than maximumPoolSize workers (due to 
* a call to setMaximumPoolSize). 
* 2. The pool is stopped. 
* 3. The pool is shutdown and the queue is empty. 
* 4. This worker timed out waiting for a task, and timed-out 
* workers are subject to termination (that is, 
* {@code allowCoreThreadTimeOut || workerCount > corePoolSize}) 
* both before and after the timed wait. 
* 
* @return task, or null if the worker must exit, in which case 
*   workerCount is decremented 
*/ 
private Runnable getTask() { 
    // After some checks, below code returns Runnable 

     try { 
      Runnable r = timed ? 
       workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : 
       workQueue.take(); 
      if (r != null) 
       return r; 
      timedOut = true; 
     } catch (InterruptedException retry) { 
      timedOut = false; 
     } 
} 

: 필요한 경우가 workQueue.offer(command)

  • execute()execute API에 Runnable 또는 Callable 추가

    1. 프로듀서 방법 Worker 스레드를 생성

    2. Worker 스레드는 무한 루프로 실행됩니다. 그것은 BlockingQueue<Runnable> workQueue)getTask()

    3. getTask() 풀에서 작업 (예를 들어, Runnable)를 얻고 Runnable을. 소비자BlockingQueue입니다.

    합니까 집행자 프레임 워크 구현은 내부적으로 생산자 - 소비자 패턴을 따른다?

    예 위에서 설명한대로.

    그렇다면 생산자 - 소비자 패턴 아이디어가 어떻게 Executor 프레임 워크를 구현하는 데 도움이됩니까? ArrayBlockingQueueExecutorService 구현 ThreadPoolExecutor 같은

    BlockingQueue 구현은 스레드로부터 안전합니다. 프로그래머가 오버 헤드를 구현하여 명시 적으로 구현 된 동기화, 대기 및 알림 호출을 구현하기 위해 동일한 기능이 축소되었습니다.

  • +0

    그래서 'Worker' thread는 처음에'execute()'에 전달 된'command' 때문에 초기에 생성되었습니다 (<'corePoolSize'>). 내 명령이이 'Worker' 스레드에서 실행을 완료 한 후 동일한'Worker' 스레드가 대기열에서 새 작업을 선택합니다. 그렇지 않니? – overexchange

    +0

    예. corePoolSize는 나중에 증가합니다. –

    +0

    그래서이 [code] (https://www.ntu.edu.sg/home/ehchua/programming/java/J5e_multithreading.html#zz-7.4)에서'java ThreadPoolTest 5 2'를 실행하면'workQueue'가 생성됩니다. ThreadPoolExecutor 인스턴스의 멤버로서 사이즈 5와 사이즈 2의'workers'가됩니다. 그 맞습니까? – overexchange

    1

    Executor frameworkproducer-consumer 패턴을 사용합니다. 위키

    , 컴퓨팅

    는 생산자 - 소비자 문제 (도 바운스 버퍼 문제로 알려진) 다중 프로세스 동기화 문제의 전형적인 예이다. 이 문제는 큐로 사용되는 공통 고정 크기 버퍼를 공유하는 두 개의 프로세스, 즉 생산자와 소비자 인 을 설명합니다. 제작자의 임무는 데이터를 생성하여 버퍼 에 넣고 다시 시작하는 것입니다. 동시에, 소비자는 데이터를 버퍼 (즉, 버퍼로부터 제거)로 한번에 하나씩 소비한다. 문제는 생산자가 가득 채워져 있고 소비자가 빈 버퍼에서 데이터 을 제거하려고 시도하지 않을 경우 버퍼에 데이터를 추가하려고 시도하지 않는지 확인하는 것입니다. 우리가 다른 ExecutorService framework 구현에 대한보고가있는 경우

    는, 더 구체적으로 ThreadPoolExecutor 클래스, 그것은 기본적으로 가지고있는 다음

    1. 작업이 제출 된 스레드의
    2. 번호를 개최 큐, 대기열에 제출 된 태스크를 사용합니다.실행 프로그램 서비스의 타입에 기반

    , 이들 파라미터는, 예를 들어

    ,

    • 고정 스레드 풀은 LinkedBlockingQueue 사용 변경하지 않고 사용자 스레드
    • 캐시 스레드 풀 용도 더 구성된 a SynchronousQueue 및 제출 된 작업 수를 기준으로 0에서 Integer.MAX_VALUE 사이의 스레드 없음
    실행 Runnable

    /** 
        * Class Worker mainly maintains interrupt control state for 
        * threads running tasks, along with other minor bookkeeping. 
        * This class opportunistically extends AbstractQueuedSynchronizer 
        * to simplify acquiring and releasing a lock surrounding each 
        * task execution. This protects against interrupts that are 
        * intended to wake up a worker thread waiting for a task from 
        * instead interrupting a task being run. We implement a simple 
        * non-reentrant mutual exclusion lock rather than use ReentrantLock 
        * because we do not want worker tasks to be able to reacquire the 
        * lock when they invoke pool control methods like setCorePoolSize. 
        */ 
        private final class Worker 
         extends AbstractQueuedSynchronizer 
         implements Runnable 
        { 
    
         /** Delegates main run loop to outer runWorker */ 
         public void run() { 
          runWorker(this); 
         } 
    
        final void runWorker(Worker w) { 
          Runnable task = w.firstTask; 
          w.firstTask = null; 
          boolean completedAbruptly = true; 
        try { 
         while (task != null || (task = getTask()) != null) { 
          w.lock(); 
          clearInterruptsForTaskRun(); 
          try { 
           beforeExecute(w.thread, task); 
           Throwable thrown = null; 
           try { 
            task.run(); 
           } catch (RuntimeException x) { 
            thrown = x; throw x; 
           } catch (Error x) { 
            thrown = x; throw x; 
           } catch (Throwable x) { 
            thrown = x; throw new Error(x); 
           } finally { 
            afterExecute(task, thrown); 
           } 
          } finally { 
           task = null; 
           w.completedTasks++; 
           w.unlock(); 
          } 
         } 
         completedAbruptly = false; 
        } finally { 
         processWorkerExit(w, completedAbruptly); 
        } 
    

    는 논리 아래에 의존 : ThreadPoolExecutor

    public void execute(Runnable command) { 
        int c = ctl.get(); 
        if (workerCountOf(c) < corePoolSize) { 
         if (addWorker(command, true)) 
          return; 
         c = ctl.get(); 
        } 
        if (isRunning(c) && workQueue.offer(command)) { 
         int recheck = ctl.get(); 
         if (! isRunning(recheck) && remove(command)) 
          reject(command); 
         else if (workerCountOf(recheck) == 0) 
          addWorker(null, false); 
        } 
        else if (!addWorker(command, false)) 
         reject(command); 
    }