2008-09-17 11 views
13

Ruby 용 ThreadPool 구현을 찾을 수 없으므로 여기에서 코드를 기반으로 부분적으로는 http://snippets.dzone.com/posts/show/3276을 작성했지만 ThreadPool 종료에 대한 신호 및 기타 구현이 변경되었습니다. (100 개 스레드를 가진 약 1,300 작업을 처리)를 실행, 그것은 라인 (25)에 교착 상태로 죽는다 -. 그것은이 새 작업을 기다리는 모든 아이디어는 왜 일어날 수ThreadPool에서 교착 상태가 발생했습니다

require 'thread' 
begin 
    require 'fastthread' 
rescue LoadError 
    $stderr.puts "Using the ruby-core thread implementation" 
end 

class ThreadPool 
    class Worker 
    def initialize(callback) 
     @mutex = Mutex.new 
     @cv = ConditionVariable.new 
     @callback = callback 
     @mutex.synchronize {@running = true} 
     @thread = Thread.new do 
     while @mutex.synchronize {@running} 
      block = get_block 
      if block 
      block.call 
      reset_block 
      # Signal the ThreadPool that this worker is ready for another job 
      @callback.signal 
      else 
      # Wait for a new job 
      @mutex.synchronize {@cv.wait(@mutex)} # <=== Is this line 25? 
      end 
     end 
     end 
    end 

    def name 
     @thread.inspect 
    end 

    def get_block 
     @mutex.synchronize {@block} 
    end 

    def set_block(block) 
     @mutex.synchronize do 
     raise RuntimeError, "Thread already busy." if @block 
     @block = block 
     # Signal the thread in this class, that there's a job to be done 
     @cv.signal 
     end 
    end 

    def reset_block 
     @mutex.synchronize {@block = nil} 
    end 

    def busy? 
     @mutex.synchronize {[email protected]?} 
    end 

    def stop 
     @mutex.synchronize {@running = false} 
     # Signal the thread not to wait for a new job 
     @cv.signal 
     @thread.join 
    end 
    end 

    attr_accessor :max_size 

    def initialize(max_size = 10) 
    @max_size = max_size 
    @workers = [] 
    @mutex = Mutex.new 
    @cv = ConditionVariable.new 
    end 

    def size 
    @mutex.synchronize {@workers.size} 
    end 

    def busy? 
    @mutex.synchronize {@workers.any? {|w| w.busy?}} 
    end 

    def shutdown 
    @mutex.synchronize {@workers.each {|w| w.stop}} 
    end 
    alias :join :shutdown 

    def process(block=nil,&blk) 
    block = blk if block_given? 
    while true 
     @mutex.synchronize do 
     worker = get_worker 
     if worker 
      return worker.set_block(block) 
     else 
      # Wait for a free worker 
      @cv.wait(@mutex) 
     end 
     end 
    end 
    end 

    # Used by workers to report ready status 
    def signal 
    @cv.signal 
    end 

    private 
    def get_worker 
    free_worker || create_worker 
    end 

    def free_worker 
    @workers.each {|w| return w unless w.busy?}; nil 
    end 

    def create_worker 
    return nil if @workers.size >= @max_size 
    worker = Worker.new(self) 
    @workers << worker 
    worker 
    end 
end 

답변

10

그렇다면 구현시 주요 문제는 신호가 손실되지 않도록하고 데드 록을 피하는 방법입니다.

내 경험에 의하면, 이것은 조건 변수와 뮤텍스에서는 달성하기가 어렵지만 세마포어에서는 쉽게 달성 할 수 있습니다. 그래서 루비는 Queue (또는 SizedQueue)라는 객체를 구현하여 문제를 해결해야합니다.여기

require 'thread' 
begin 
    require 'fasttread' 
rescue LoadError 
    $stderr.puts "Using the ruby-core thread implementation" 
end 

class ThreadPool 
    class Worker 
    def initialize(thread_queue) 
     @mutex = Mutex.new 
     @cv = ConditionVariable.new 
     @queue = thread_queue 
     @running = true 
     @thread = Thread.new do 
     @mutex.synchronize do 
      while @running 
      @cv.wait(@mutex) 
      block = get_block 
      if block 
       @mutex.unlock 
       block.call 
       @mutex.lock 
       reset_block 
      end 
      @queue << self 
      end 
     end 
     end 
    end 

    def name 
     @thread.inspect 
    end 

    def get_block 
     @block 
    end 

    def set_block(block) 
     @mutex.synchronize do 
     raise RuntimeError, "Thread already busy." if @block 
     @block = block 
     # Signal the thread in this class, that there's a job to be done 
     @cv.signal 
     end 
    end 

    def reset_block 
     @block = nil 
    end 

    def busy? 
     @mutex.synchronize { [email protected]? } 
    end 

    def stop 
     @mutex.synchronize do 
     @running = false 
     @cv.signal 
     end 
     @thread.join 
    end 
    end 

    attr_accessor :max_size 

    def initialize(max_size = 10) 
    @max_size = max_size 
    @queue = Queue.new 
    @workers = [] 
    end 

    def size 
    @workers.size 
    end 

    def busy? 
    @queue.size < @workers.size 
    end 

    def shutdown 
    @workers.each { |w| w.stop } 
    @workers = [] 
    end 

    alias :join :shutdown 

    def process(block=nil,&blk) 
    block = blk if block_given? 
    worker = get_worker 
    worker.set_block(block) 
    end 

    private 

    def get_worker 
    if [email protected]? or @workers.size == @max_size 
     return @queue.pop 
    else 
     worker = Worker.new(@queue) 
     @workers << worker 
     worker 
    end 
    end 

end 

그리고 간단한 테스트 코드입니다 :

tp = ThreadPool.new 500 
(1..1000).each { |i| tp.process { (2..10).inject(1) { |memo,val| sleep(0.1); memo*val }; print "Computation #{i} done. Nb of tasks: #{tp.size}\n" } } 
tp.shutdown 
+0

1. @workers에 대한 액세스가 동기화되어서는 안됩니까? 2. 작업자 스레드에서 여전히 잠금 및 잠금 해제가 필요한 이유는 무엇입니까? – Roman

+0

작업자에 대한 액세스는 항상 동일한 스레드에서 수행되므로 동기화가 필요하지 않습니다. 작업자 스레드의 잠금은 스레드를 안전하게 깨우기 위해 필요합니다. – PierreBdR

+0

여전히 문제가 있습니다. 교착 상태가 발생할 수 있습니다. 작업자 스레드가 자신을 큐에 추가하면 ThreadPool은 큐에서 큐를 가져 와서 작업을 할당 할 수 있습니다. 이 경우 신호가 전송됩니다. 그러나 작업자 스레드가 cv에서 대기 중이 아니면 신호가 손실됩니다. – Roman

1

나는 약간 여기 바이어스있어를? (ACP 기반 언어 사용), Mobility Workbench (pi-calculus) 및 Spin (PROMELA)과 같은 무료 도구를 사용할 수 있습니다.

그렇지 않으면 문제에 필수적이지 않은 모든 비트의 코드를 제거하고 교착 상태가 발생하는 최소한의 경우를 찾는 것이 좋습니다. 교착 상태가 발생하기 위해서는 100 개의 스레드와 1300 개의 작업이 필수적이라고 생각합니다. 작은 경우에는 문제를 해결하는 데 충분한 정보를 제공하는 디버그 인쇄물을 추가 할 수 있습니다.

+0

문제의 코드가 180000 개 중 1300 개의 작업 만 처리하는 데 실패했습니다. 불행히도 작은 세트로 다시 생성하지 못했습니다 ... – Roman

1

좋아, 문제는 ThreadPool # 신호 방법에있는 것 같습니다. 무엇 발생할 수 있습니다 것은 :

1 - 귀하의 모든 직원이 바쁜 당신은 새로운 일

이 처리하려고 - 라인 (90)가 전무 노동자

3 얻는다 - 작업자가 해제 및 신호를 얻을를, 그러나 ThreadPool이 그것을 기다리지 않기 때문에 신호가 손실됩니다.

4 - 당신은 무료 작업자가 있더라도 기다리고 있습니다.

여기서 오류는 아무도 듣지 않아도 무료 직원에게 신호를 보낼 수 있다는 것입니다. 이 스레드 풀 # 신호 메서드는 다음과 같아야합니다.

def signal 
    @mutex.synchronize { @cv.signal } 
end 

그리고 Worker 개체의 문제는 동일합니다. 어떤 발생할 수 것은 :

1 - 그것은 검사 (17 행) 작업 대기가있는 경우 - 작업자는 작업

이 완료 :

3이없는 - 스레드 풀 보내기 새 작업 및 신호를 ...하지만 신호는

4

손실 - 그것은 중이라는 표시하더라도, 신호에 대한 작업자의 대기를

당신은 당신의 초기화 방법을 넣어해야합니다

다음으로 Worker # get_block 및 Worker # reset_block 메소드는 더 이상 동기화되지 않아야합니다. 이렇게하면 블록 테스트와 신호 대기 사이에 작업자에게 할당 된 블록을 가질 수 없습니다.

+0

나는 당신이 옳다고 생각합니다! 나는 바로 이것을 시험 할 것이다, 고마워! – Roman

+0

음 .. 이제 스레드가 완료 될 때까지 대기 중일 때 교착 상태가 발생합니다 (예 : ThreadPool에 대한 호출 참여). 나는 이유를 알아 내려고 노력하고있어. – Roman

8

당신은 생산자와 작업자 스레드 풀 사이의 작업을 조정하기 위해 설계된 work_queue 보석을 시도 할 수 있습니다 여기에 내 제안 구현입니다.