2010-07-15 4 views
8

엄청난 양의 작업이 있습니다. 각 작업은 단일 그룹에 속합니다. 요구 사항은 각 스레드 그룹이 단일 스레드에서 실행되는 것과 같이 순차적으로 실행되어야하며 멀티 코어 (또는 멀티 CPU) 환경에서 처리량을 최대화해야합니다. 참고 : 작업 수에 비례하여 엄청난 양의 그룹이 있습니다.어떤 Java ThreadPool을 사용해야합니까?

순진한 해결책은 ThreadPoolExecutor를 사용하고 동기화 (또는 잠금)하는 것입니다. 그러나 스레드는 서로를 차단하고 처리량이 최대화되지 않습니다.

더 좋은 아이디어가 있습니까? 아니면 요구 사항을 만족시키는 타사 라이브러리가 있습니까?

+2

"하지만, 쓰레드는 서로를 차단하는 것입니다 및 처리량을 극대화하지 않습니다.". 개별 작업이 공유 된 데이터 구조 나 리소스에 액세스하고 이것이 경쟁의 원인이라는 것을 의미합니까? – Adamski

+0

그룹의 모든 작업을 미리 알고 있습니까? 이는 솔루션을 선택할 때 중요합니다 (대기열 대 대기열 없음) –

답변

3

입니다. 그러나 이것은 다른 그룹이 완전히 완료되지 않고 스레드 풀에서 약간의 공간을 만들지 않으면 시작되지 않는 다른 그룹에서 지연을 일으킬 수 있습니다.

다른 방법으로 그룹 작업을 연결하는 것을 고려해보십시오. 다음 코드를 보여

public class MultiSerialExecutor { 
    private final ExecutorService executor; 

    public MultiSerialExecutor(int maxNumThreads) { 
     executor = Executors.newFixedThreadPool(maxNumThreads); 
    } 

    public void addTaskSequence(List<Runnable> tasks) { 
     executor.execute(new TaskChain(tasks)); 
    } 

    private void shutdown() { 
     executor.shutdown(); 
    } 

    private class TaskChain implements Runnable { 
     private List<Runnable> seq; 
     private int ind; 

     public TaskChain(List<Runnable> seq) { 
      this.seq = seq; 
     } 

     @Override 
     public void run() { 
      seq.get(ind++).run(); //NOTE: No special error handling 
      if (ind < seq.size()) 
       executor.execute(this); 
     }  
    } 

장점은 불필요한 자원 (스레드/큐)를 사용중인 것입니다 및 작업의 단위는 순진 접근 것보다 낫다는 것을. 단점은 모든 그룹의 작업을 미리 알려야한다는 것입니다..

--edit--

당신이 오류 처리를 결정 할 수 있습니다,이 솔루션은 일반적이고 완전한 만들려면

(즉, 체인이 오류가 occures 경우에도 계속 여부), 또한 그것이 좋은 생각이 될 것입니다 ExecutorService를 구현하고 모든 호출을 기본 실행 프로그램에 위임합니다.

+0

영리한 해결책! +1 –

+0

나는이 해결책을 좋아한다. – James

+0

지도를 추가하여 지정된 작업의 TaskChain을 찾아 TaskChain에 추가 할 수도 있습니다. – James

2

나는 작업 큐를 사용하는 것이 좋습니다 것입니다 :

  • 을 당신은 큐를 만들고 그것으로 그 그룹에서 모든 작업을 삽입이 작업의 모든 그룹에 대해.
  • 이제는 하나의 대기열에있는 작업이 순차적으로 실행되는 동안 모든 대기열을 병렬로 실행할 수 있습니다.

빠른 google 검색은 자바 API 자체가 작업/스레드 대기열을 가지고 있지 않음을 나타냅니다. 그러나 코딩에 사용할 수있는 자습서가 많이 있습니다. 누구나 알고 있다면 좋은 자습서/구현 목록을 자유롭게 열어 볼 수 있습니다.

+0

감사합니다. Dave. 그룹 수가 많으면 스레드 수가 제한을 초과합니다. – James

+0

@James 반드시 그렇지는 않습니다. 당신이 n 개의 그룹을 가지고 있기 때문에 당신이 n 개의 쓰레드를 만들어서 실행시켜야한다는 것을 의미하지는 않습니다. 당신이 생각하는만큼 많은 스레드를 생성하면 라운드 로빈 방식이나 연속 방식으로 대기열을 관리 할 것입니다. –

1

대부분 Dave의 대답에 동의하지만 모든 "그룹"에서 CPU 시간을 슬라이스해야 할 경우 모든 작업 그룹이 병렬로 진행되어야합니다.

class TaskAllocator { 
    private final ConcurrentLinkedQueue<Queue<Runnable>> entireWork 
     = childQueuePerTaskGroup(); 

    public Queue<Runnable> lockTaskGroup(){ 
     return entireWork.poll(); 
    } 

    public void release(Queue<Runnable> taskGroup){ 
     entireWork.offer(taskGroup); 
    } 
} 

class DoWork implmements Runnable { 
    private final TaskAllocator allocator; 

    public DoWork(TaskAllocator allocator){ 
     this.allocator = allocator; 
    } 

    pubic void run(){ 
     for(;;){ 
      Queue<Runnable> taskGroup = allocator.lockTaskGroup(); 
      if(task==null){ 
       //No more work 
       return; 
      } 
      Runnable work = taskGroup.poll(); 
      if(work == null){ 
       //This group is done 
       continue; 
      } 

      //Do work, but never forget to release the group to 
      // the allocator. 
      try { 
       work.run(); 
      } finally { 
       allocator.release(taskGroup); 
      } 
     }//for 
    } 
} 

그런 다음 사용할 수 있습니다 유용한 구조의이 종류를 찾을 수 있습니다. ("잠금"으로 제거를 사용하여이 나는 그것이 더 많은 메모리를 사용하는 경향이 상상하지만 내 경우에는 괜찮 았는데) 실행할 최적 스레드 수 DoWork 태스크. 그것은

심지어

ConcurrentSkipListSet<MyQueue<Runnable>> sophisticatedQueue = 
    new ConcurrentSkipListSet(new SophisticatedComparator()); 
(실행 얻을하는 경향이 남아있는 많은 작업과 작업 그룹) TaskAllocator 대신 간단한 큐의이를 이용하여,보다 정교한 뭔가를 할 수 .. 라운드 로빈로드 균형 종류의

SophisticatedComparator이 간단한 방법은, 따라서 하위 작업이 순차적으로 실행하고, 하나 개의 슈퍼 작업에 모든 그룹의 작업을 "연결할"하는 것입니다

class SophisticatedComparator implements Comparator<MyQueue<Runnable>> { 
    public int compare(MyQueue<Runnable> o1, MyQueue<Runnable> o2){ 
     int diff = o2.size() - o1.size(); 
     if(diff==0){ 
      //This is crucial. You must assign unique ids to your 
      //Subqueue and break the equality if they happen to have same size. 
      //Otherwise your queues will disappear... 
      return o1.id - o2.id; 
     } 
     return diff; 
    } 
} 
+1

+1 작업 대기열을 사용하면 필요에 맞는 모든 예약 알고리즘을 사용할 수 있습니다. –

+0

스레드 풀을 다시 구현하는 것 같습니다. 내 솔루션 에서처럼 표준 ThreadPoolExecutor와 일부 추가 기능을 함께 사용하지 않는 이유는 무엇입니까? 내 솔루션에는 큐가 필요없고 동기화가 필요 없습니다. –

+0

@Eyal : 작업 그룹을 순차적으로 소비해도 괜찮을 경우 동의합니다. 그러나 병렬로 소비해야하는 경우에는 이것이 필요합니다. –

0

액터는이 지정된 유형의 문제에 대한 또 다른 솔루션입니다. 스칼라에는 AKKA에서 제공 한 배우와 Java가 있습니다.

-2

나는 당신과 비슷한 문제가있어서 을 사용하여 Executor과 함께 작업 모음을 완성했습니다.

는 각각 어떤 종류의 결과 값을 반환 당신이 특정 문제에 대한 해법의 집합이 있다고 가정하고, 동시에 실행할 싶습니다 : 여기 는 Java7 이후 java.util.concurrent의 API에서 추출한다 , 일부 메소드 use (Result r)에서 널이 아닌 값을 리턴하는 각각의 결과를 처리합니다. 당신은이를 작성할 수

void solve(Executor e, Collection<Callable<Result>> solvers) 
     throws InterruptedException, ExecutionException { 
    CompletionService<Result> ecs = new ExecutorCompletionService<Result>(e); 
    for (Callable<Result> s : solvers) 
     ecs.submit(s); 
    int n = solvers.size(); 
    for (int i = 0; i < n; ++i) { 
     Result r = ecs.take().get(); 
     if (r != null) 
      use(r); 
    } 
} 

그래서, 당신의 시나리오에서 모든 작업이 될 것입니다 단일 Callable<Result> 및 작업은 Collection<Callable<Result>>에 그룹화됩니다.

참조 : http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorCompletionService.html