2012-12-27 4 views
2

스레드 풀을 사용하여 동시 작업을 실행하는 다중 스레드 라이브러리를 구현하려고합니다. 기본적으로 수신 한 컬렉션 매개 변수에서 스레드 풀에 작업을 추가 한 다음 처리중인 마지막 작업이 펄스 신호를 보낼 때까지 대기합니다. 이전 테스트에서 성공 했었지만 프로세스가 매우 부족한 작업으로 테스트하고 싶을 때 이상한 문제가 발생했습니다. 여하튼 펄스 신호는 대기 명령이 주 스레드에서 자리를 잡기 전에 보내지거나 다른 어떤 일이 일어나고 있습니다. 나는 단순히 동기화에 대한 나의 노력과 상관없이 볼 수 없습니다.다중 스레드 환경에서 Interlocked.Decrement와 monitor.wait 및 monitor.pulse를 사용할 때 잘못된 동작이 발생했습니다.

내 문제를 해결하기 위해 잠재적 인 성능 이점 때문에 다른 "덜 바람직한"솔루션을 구현했습니다. 현재로서는 잘 작동하고 있지만 처음 접근 방식이 작동하지 않는 이유를 알고 싶었습니다. 실적이 현저하게 차이가 나더라도 첫 번째 사례는 그러한 경우입니다.

설명하기 위해 아래 프로세스를 단순화 한 후에 두 솔루션을 모두 추가하려고합니다. 누군가 내가 잘못되고있는 것을 지적하도록 도와 줄 수 있습니까?

미리 감사드립니다.

using System; 
using System.Collections.Generic; 
using System.Linq; 
using System.Text; 
using System.Threading; 
using System.Diagnostics; 

namespace TestcodeBenchmark 
{ 
    class Program 
    { 
     static int remainingTasks = 10000000; 
     static Stopwatch casioF91W = new Stopwatch(); 
     static Random rg = new Random(); 
     static readonly object waitObject = new object(); 


     static void Main(string[] args) 
     { 
      TestLoop(30, remainingTasks); 
      Console.ReadKey(); 
     } 

     private static void TestLoop(int loopCount, int remainingCountResetNumber) 
     { 
      for (int i = 0; i < loopCount; i++) 
      { 
       remainingTasks = remainingCountResetNumber; 
       //When this method is called it eventualy stuck at Monitor.Wait line 
       TestInterlocked(); 

       remainingTasks = remainingCountResetNumber; 
       //When this method is called it processes stuff w/o any issues. 
       TestManualLock(); 
       Console.WriteLine(); 
      } 
     } 

     private static void TestInterlocked() 
     { 
      casioF91W.Restart(); 
      //for (int i = 0; i < remainingTasks; i++) 
      //{ 
      // ThreadPool.QueueUserWorkItem(delegate { TestInterlockedDecrement(); }); 
      //} 
      int toStart = remainingTasks; 
      //for (int i = 0; i < remainingTasks; i++) 
      for (int i = 0; i < toStart; i++) 
      { 
       if (!ThreadPool.QueueUserWorkItem(delegate { TestInterlockedDecrement(); })) 
        Console.WriteLine("Queue failed"); 
      } 
      //lock waitObject to be able to call Monitor.Wait 
      lock (waitObject) 
      { 
       //if waitObject is locked then no worker thread should be able to send a pulse signal 
       //however, if pulse signal was sent before locking here remainingTasks should be 
       //zero so don't wait if all tasks are processed already 
       if (remainingTasks != 0) 
       { 
        //release the lock on waitObject and wait pulse signal from the worker thread that 
        //finishes last task 
        Monitor.Wait(waitObject); 
       } 
      } 
      casioF91W.Stop(); 
      Console.Write("Interlocked:{0}ms ", casioF91W.ElapsedMilliseconds); 
     } 

     private static void TestInterlockedDecrement() 
     { 
      //process task 
      //TestWork(); 
      //Once processing finishes decrement 1 from remainingTasks using Interlocked.Decrement 
      //to make sure it is atomic and therefore thread safe. If resulting value is zero 
      //send pulse signal to wake main thread.    
      if (Interlocked.Decrement(ref remainingTasks) == 0) 
      { 
       //Acquire a lock on waitObject to be able to send pulse signal to main thread. If main 
       //thread acquired the lock earlier, this will wait until main thread releases it 
       lock (waitObject) 
       { 
        //send a pulse signal to main thread to continue 
        Monitor.PulseAll(waitObject); 
       } 
      } 
     } 

     private static void TestManualLock() 
     { 
      casioF91W.Restart(); 

      //Acquire the lock on waitObject and don't release it until all items are added and 
      //Wait method is called. This will ensure wait method is called in main thread 
      //before any worker thread can send pulse signal by requiring worker threads to 
      //lock waitObject to be able to modify remainingTasks    
      lock (waitObject) 
      { 
       for (int i = 0; i < remainingTasks; i++) 
       { 
        ThreadPool.QueueUserWorkItem(delegate { TestManualDecrement(); }); 
       } 
       Monitor.Wait(waitObject); 
      } 
      casioF91W.Stop(); 
      Console.Write("ManualLock:{0}ms ", casioF91W.ElapsedMilliseconds); 
     } 

     private static void TestManualDecrement() 
     { 
      //TestWork(); 
      //try to acquire lock on wait object. 
      lock (waitObject) 
      { 
       //if lock is acquired, decrement remaining tasks by and then check 
       //whether resulting value is zero. 
       if (--remainingTasks == 0) 
       { 
        //send a pulse signal to main thread to continue 
        Monitor.PulseAll(waitObject); 
       } 
      } 
     } 

     private static void TestWork() 
     { 
      //Uncomment following to simulate some work. 
      //int i = rg.Next(100, 110); 
      //for (int j = 0; j < i; j++) 
      //{ 

      //} 
     } 
    } 
} 
+0

한 ... –

+0

귀하의 예는, 그래서 첫 번째 실행 후'비 제로 remainingTasks'를 재설정하지 30 번에서 2 번 시도하면 항상 멈 춥니 다.이 문제를 해결하면 여러 번 실행될 수 있지만 계속 멈추는 것을 볼 수 있습니다. –

+0

@DarkFalcon 시도 사이에 remainingTasks를 올바르게 재설정하는 코드가 수정되었습니다. – Ferhat

답변

3

작업을 시작하면 루프가 remainingTasks 작업을 시작합니다. 그러나 10000에 가까워지면 일부 작업이 완료되고 10000보다 작아 지므로 적절한 수의 작업을 시작하지 마십시오. 루프를 수정하여 시작해야하는 작업의 수를 저장하면 코드가 성공적으로 실행됩니다. (당신은 또한 QueueUserWorkItem의 반환 값을 확인해야합니다.) 잘 부탁 질문에 대한

 int toStart = remainingTasks; 
     for (int i = 0; i < toStart; i++) 
     { 
      if (!ThreadPool.QueueUserWorkItem(delegate { TestInterlockedDecrement(); })) 
       Console.WriteLine("Queue failed"); 
     } 
+0

내 제안을 포함하도록 코드를 수정했으며 언급 한대로 해결되었습니다. 사실 내가 위에 붙여 넣은 코드는 실제 라이브러리에서 테스트 솔루션으로 복사되고 내가 수정 한 동안 언급 한 실수를 저질렀습니다. 원래 라이브러리가 왜 이런 식으로 행동하지 않는지 궁금 해서요 (위와 같이 taskprocessing 섹션을 제거했을 때) 미스테리가 해결되었습니다. 빠른 도움말과 ThreadPool에 작업을 추가하는 데 대한 대기열 작업 결과를 경고하는 데 많은 도움을 주셔서 감사합니다. – Ferhat