2010-06-08 3 views
2

.NET 4 애플리케이션에서 병렬 데이터 구조를 사용하고 있고 처리하는 동안 추가되는 ConcurrentQueue이 있습니다.ConcurrentQueue를 사용하는 동안 병렬로 루핑하는 동안 큐에서 빼내려고 시도합니다.

내가 좋아하는 뭔가를하고 싶지 :

personqueue.AsParallel().WithDegreeOfParallelism(20).ForAll(i => ...);

나는 데이터베이스에 데이터를 저장하기 위해 호출, 그래서 동시 스레드의 수를 제한하고 만들기로

.

는하지만, 나는 ForAll가 큐에서 제거하지 않을 것으로 예상하고, 내가 올바른 일을 진열하고 보장이 없기 때문에 단지

ForAll(i => { 
    personqueue.personqueue.TryDequeue(...); 
    ... 
}); 

을하는 것에 대해 걱정.

그래서 병렬로 컬렉션 및 큐에서 반복을 수행 할 수 있습니까?

또는이 처리를 병렬로 수행하려면 PLINQ을 사용하는 것이 더 좋을까요?

답변

4

그럼 여기에 보관하려는 내용을 100 % 확신하지 못했습니다. 아무것도 남지 않을 때까지 모든 항목을 큐에서 제거하려고합니까? 또는 한 번에 많은 항목을 큐에서 제거 할 수 있습니까?

첫 번째 아마 예기치 않은 동작이 문을 시작하십시오 ConcurrentQueue를 들어

theQueue.AsParallel() 

, 당신은 'Snapshot'-열거를 얻을. 따라서 동시 스택을 반복 할 때 스냅 샷을 반복하고 '라이브'큐는 반복하지 않습니다.

일반적으로 반복하는 동안 변경중인 내용을 반복하는 것은 좋지 않다고 생각합니다.

그래서 다른 해결책은 다음과 같이 보일 것이다 : 그것은 반복하면서

 // this way it's more clear, that we only deque for theQueue.Count items 
     // However after this, the queue is probably not empty 
     // or maybe the queue is also empty earlier 
     Parallel.For(0, theQueue.Count, 
        new ParallelOptions() {MaxDegreeOfParallelism = 20}, 
        () => { 
         theQueue.TryDequeue(); //and stuff 
        }); 

이 조작 뭔가를 피할 수 있습니다. 그러나이 명령문 다음에 큐에는 여전히 for 루프 중에 추가 된 데이터가 포함될 수 있습니다.

잠시 동안 대기열을 비우려면 좀 더 많은 작업이 필요할 수 있습니다. 여기에 정말 못생긴 해결책이 있습니다. 대기열에 항목이있는 동안 새 작업을 만듭니다. 각 작업 시작은 가능한 한 큐에서 큐를 대기열에서 제외합니다. 결국 모든 작업이 끝나기를 기다립니다. 병렬 처리를 제한하기 위해 결코 20 개 이상의 태스크를 생성하지 않습니다. 당신이 진짜 사이트 전반에 걸쳐 높은 목표로하고 즉시 DB 업데이트를 할 필요가없는 경우

 // Probably a kitty died because of this ugly code ;) 
     // However, this code tries to get the queue empty in a very aggressive way 
     Action consumeFromQueue =() => 
             { 
              while (tt.TryDequeue()) 
              { 
               ; // do your stuff 
              } 
             }; 
     var allRunningTasks = new Task[MaxParallism]; 
     for(int i=0;i<MaxParallism && tt.Count>0;i++) 
     { 
      allRunningTasks[i] = Task.Factory.StartNew(consumeFromQueue); 
     } 
     Task.WaitAll(allRunningTasks); 
+0

중간 아이디어를 시도해 볼 수 있습니다. 이것은 매우 짧은 시간에 한 번에 1 만 건의 업데이트를 얻을 수있는 웹 서비스의 일부이지만, 각 업데이트를 수행하면서 데이터베이스를 해머하고 싶지는 않습니다. 싱글 톤에 정적 대기열, 그리고 기능을 처리 할 수 ​​있습니다. 내 솔루션이 완벽하지는 않지만 우선 데이터베이스를 보호해야합니다. –

0

, 당신은 매우 보수적 인 솔루션이 아닌 별도의 레이어 라이브러리를가는 더 나은 수 있습니다.

요청이 데이터를 슬롯에 넣고 돌아 오도록 고정 크기 배열 (초대 크기 - 예 : 1000 항목 또는 N 초 상당 요청)과 연동 인덱스를 만듭니다. 하나의 블록이 채워질 때 (카운트를 계속 체크), 또 하나의 블록을 만들고 비동기 델리게이트를 생성하여 방금 채워진 블록을 처리하고 SQL로 보냅니다. 데이터의 구조에 따라 위임자는 모든 데이터를 쉼표로 구분 된 배열로 묶을 수 있으며, 간단한 XML (물론 그 중 하나의 성능을 테스트해야 함)을 SQL sproc에 보내면 레코드를 처리하는 것이 가장 좋습니다 기록에 의해 - 결코 큰 자물쇠를 들고.만약 무거워지면 블록을 여러 개의 작은 블록으로 나눌 수 있습니다. 핵심은 SQL에 대한 요청 수를 최소화하고 항상 분리 수준을 유지하며 스레드 풀의 가격을 지불하지 않아도된다는 것입니다. 아마도 2 개의 비동기 스레드를 더 이상 사용할 필요가 없을 것입니다 .

Parallel-s를 만지면 훨씬 빨라질 것입니다.

+0

내 대기열이 가득 차면 대기열을 비우는 것이 좋을 것입니다. 그래서 한 웹 서비스 호출에서 수백 가지 항목을 얻은 다음 다른 큰 배치를 보내면 거기에 뭔가있는 동안 계속 진행됩니다. 그 순간 나는 10 초를 자고 대기열을 처리 한 다음 10 초를 더 자게됩니다. –