2017-11-21 5 views
5

PLYNQ 전문가의 도움을 받아 주시면 감사하겠습니다! 답변을 검토하는 데 시간이 걸릴 것입니다. math.SE에 대한보다 일반적인 프로필이 있습니다.ParallelQuery.Aggregate가 병렬로 실행되지 않는 이유는 무엇입니까

저는 ParallelQuery<List<string>> 유형의 객체가 있습니다.이 객체에는 병렬로 처리하려는 44 개의 목록이 있습니다 (한 번에 5 개씩). 내 프로세스는 아래와 같이, 부울 값의 쌍의 결과를 반환

private ProcessResult Process(List<string> input) 

처리 등 서명을 갖는다.

private struct ProcessResult 
    { 
     public ProcessResult(bool initialised, bool successful) 
     { 
      ProcessInitialised = initialised; 
      ProcessSuccessful = successful; 
     } 

     public bool ProcessInitialised { get; } 
     public bool ProcessSuccessful { get; } 
    } 

문제가 있습니다.IEnumerable<List<string>> processMe이 주어지면 내 PLYNQ 쿼리는이 메서드를 구현하려고 시도합니다 : https://msdn.microsoft.com/en-us/library/dd384151(v=vs.110).aspx.

processMe.AsParallel() 
     .Aggregate<List<string>, ConcurrentStack<ProcessResult>, ProcessResult> 
      (
       new ConcurrentStack<ProcessResult>, //aggregator seed 
       (agg, input) => 
       {       //updating the aggregate result 
        var res = Process(input); 
        agg.Push(res); 
        return agg; 
       }, 
       agg => 
       {       //obtain the result from the aggregator agg 
        ProcessResult res; // (in this case just the most recent result**) 
        agg.TryPop(out res); 
        return res; 
       } 
      ); 

불행히도 그것은 병렬로, 순차적으로 실행되지 않습니다. (** 난 그냥 지금은 일할 수있는 parallelisation를 얻기 위해 노력하고,이 구현은하지 않습니다 "의미"를 참고하세요.) 병렬 실행 한 약간 다른 구현을 시도


, 그러나 집계가 없었다. ([A1, A2], [B1, B2]) ≡ [A1 & & B1, A2 & & B2]의 두 부분에 본질적으로 부울 AND를 정의했습니다.

private static ProcessResult AggregateProcessResults 
     (ProcessResult aggregate, ProcessResult latest) 
    { 
     bool ini = false, suc = false; 
     if (aggregate.ProcessInitialised && latest.ProcessInitialised) 
      ini = true; 
     if (aggregate.ProcessSuccessful && latest.ProcessSuccessful) 
      suc = true; 


     return new ProcessResult(ini, suc); 
    } 

그리고는 PLYNQ 쿼리 https://msdn.microsoft.com/en-us/library/dd383667(v=vs.110).aspx

.Aggregate<List<string>, ProcessResult, ProcessResult>(
    new ProcessResult(true, true), 
    (res, input) => Process(input), 
    (agg, latest) => AggregateProcessResults(agg, latest), 
    agg   => agg 
을 사용

문제 ... 여기 AggregateProcessResults 코드가 어떤 이유로-I는 결과가 가고 있었던 곳 우둔하니, 공격하지 않았다이었다

읽어 주셔서 감사합니다, 어떤 도움을 주셨습니다 :)

+0

:

대신, 다른 오버로드를 사용합니다. 시도하고있는 작업에 대해 올바른 작업을 사용하면 시스템이 훨씬 더 효과적으로 작업을 수행 할 수 있습니다. – Servy

+0

컬렉션에 몇 개의 아이템이 있습니까? (오직 44?) 몇 개의 CPU 코어가 있습니까? 다중 트레드에서 쿼리를 실행하고 CPU 코어를 늘리려면 복잡한 준비가 필요합니다. 컬렉션은 사용 가능한 많은 CPU 코어만큼 많은 부분으로 분할되어야하며 스레드에서 작업을 실행하고 마지막으로 결과를 집계해야합니다. .NET을 사용하면 모든 것을 훨씬 느리게 만들 수있는 많은 작업을하지 않아도됩니다. – Major

+0

@ Major 나는 2200 개의 문자열을 가지고 있는데, 500 개의 문자열로 묶여 44 개의리스트 을 제공합니다. 나는 5 개의 프로세스를 동시에 실행하는 것으로 제한되어있다. – Szmagpie

답변

2

Aggregate의 과부하는 실제로 p에서 실행되지 않습니다 디자인 상 병렬. 당신은 씨앗을 전달한 다음 단계 함수를 전달하지만, 단계 함수 (agg)에 대한 인수는 이전에 단계에서받은 누적 기입니다. 이러한 이유로, 본질적으로 순차적이며 (이전 단계의 결과가 다음 단계로 입력 됨) 병렬 처리가 불가능합니다. 이 과부하가 ParallelEnumerable에 포함 된 이유를 모르지만 이유가있을 수 있습니다. 당신은`Select`가 아닌`Aggregate`를 사용한다 순서의 각 항목에 대한 새 값을 계산하려면

var result = processMe 
.AsParallel() 
.Aggregate 
(
    // seed factory. Each partition will call this to get its own seed 
    () => new ConcurrentStack<ProcessResult>(), 
    // process element and update accumulator 
    (agg, input) => 
    {           
     var res = Process(input); 
     agg.Push(res); 
     return agg; 
    }, 
    // combine accumulators from different partitions 
    (agg1, agg2) => { 
     agg1.PushRange(agg2.ToArray()); 
     return agg1; 
    }, 
    // reduce 
    agg => 
    { 
     ProcessResult res; 
     agg.TryPop(out res); 
     return res; 
    } 
);