2017-04-20 6 views
0

작업을 병렬로 처리하는 메모리 사용량이 많은 응용 프로그램과 함께 작업하고 있습니다. 문제는 많은 메모리를 생성 한 다음 GC가 멈출 때까지 16GByte 시스템에 과부하가 걸린다는 것입니다. 이 시점에서 성능은 끔찍하며 끝내려면 며칠이 걸릴 수 있습니다. 원래 응용 프로그램을 실행하는 데 일반적으로 30 분이 걸립니다.메모리가 많은 멀티 스레드 응용 프로그램

class Program 
{ 
    static void Main(string[] args) 
    { 
     var tasks = new List<Task<string[]>>(); 
     var report = new List<string>(); 

     for (int i = 0; i < 2000; i++) 
     { 
      tasks.Add(Task<string[]>.Factory.StartNew(DummyProcess.Process)); 
     } 

     foreach (var task in tasks) 
     { 
      report.AddRange(task.Result); 
     } 

     Console.WriteLine("Press RETURN..."); 
     Console.ReadLine(); 
    } 
} 

여기에 '프로세서'입니다 :

public static class DummyProcess 
{ 
    public static string[] Process() 
    { 
     var result = new List<string>(); 

     for (int i = 1; i < 10000000; i++) 
     { 
      result.Add($"This is a dummy string of some length [{i}]"); 
     } 

     var random = new Random(); 
     var delay = random.Next(100, 300); 

     Thread.Sleep(delay); 

     return result.ToArray(); 
    } 
} 

저는 믿습니다 문제는 여기에 있습니다 :

foreach (var task in tasks) 
{ 
    report.AddRange(task.Result); 
} 

하는하는 작업은 배치하지 않는 이것은이의 버전을 박탈하다 작업이 끝나면 결과 (문자열 [])를 작업에서 가져 와서 작업을 처리하는 가장 좋은 방법은 무엇입니까?

foreach (var task in tasks) 
{ 
    report.AddRange(task.Result); 
    task.Dispose(); 
} 

그러나 많은하지 차이 :

나는 이것을 시도했다. 내가 시도 할 수도있는 것은 반환되는 결과를 단순히 멈추는 것이다. 즉, 문자열의 거대한 10 - 50 MBytes가 원래 응용 프로그램에서 유지되지 않는다.

편집 : 나는 다음과 같이 결과를 읽을 수있는 코드를 교체 시도 :

나는 두 시간 후에 중단했다
while (tasks.Any()) 
{ 
    var listCopy = tasks.ToList(); 

    foreach (var task in listCopy) 
    { 
     if (task.Wait(0)) 
     { 
      report.AddRange(task.Result); 
      tasks.Remove(task); 
      task.Dispose(); 
     } 
    } 

    Thread.Sleep(300); 
} 

- 나는 그것을 하루 아침에 오늘 밤을 실행 드리겠습니다과이 완료되는지 확인합니다. 메모리 사용량은 달렸지 만 느린 속도로 나아졌습니다.

답변

0

당신 말이 맞아, 문제가있다

foreach (var task in tasks) 
{ 
    report.AddRange(task.Result); 
} 

입니다 그러나 문제는 당신이 생각하는 것보다 훨씬 더 큰 것입니다. Result 블럭을 호출 할 때마다 코드를 효과적으로 처리 할 수있는 직렬 버전으로 코드를 변환하는 호출은 너무 심합니다. 나는 모든 작업에 continuation을 추가하여 예를 들어, 제 1 병렬 버전에 코드를 돌려 제안

일단

task.ContinueWith(t => { 
    //NOTE1 that t.Result is already ready here 
    //NOTE2 you need synchronization for your data structure, mutex or synchronized collection 
    report.AddRange(t.Result); 
}); 

가 나는 또한 작업 목록에서 자신을 제거하는 모든 작업을 제안 할 이잖아,이 GC가 수집하게됩니다 그것을 빨리가, 내가 합계에서, 여기에 마지막 수단으로 만 명시 적으로 폐기를 사용하는 것이 좋습니다 보유하고 무거운 결과와 함께 : 요, 대안

task.ContinueWith(t => { 
    //NOTE1 that t.Result is already ready here 
    //NOTE2 you need synchronization for your data structure, mutex or synchronized collection 
    report.AddRange(t.Result); 
    //NOTE3 Synchronize access to task list! 
    tasks.Remove(t); 
}); 

또는 u는 한 단계 작업 기반 병렬 위에 가서 처음부터 Parallel 방법을 적용 할 수 :

ParallelLoopResult result = Parallel.For(0, 2000, ctr => { 
    // NOTE you still need to synchronize access to report 
    report.Add(/*get your result*/); 
}); 

this answer 의역하려면 : 결과는 동일합니다 있지만,이 특히 같은 대형 컬렉션, 작업보다 훨씬 적은 오버 헤드를 소개합니다 너의 것 (2000의 품목), 그리고 전반적인 런타임을 더 빨리 일으키는 원인이 된.

0

task.Result은 모든 루트에서 작업에 더 이상 액세스 할 수 없을 때까지 결과 배열에 대한 참조를 유지합니다. 즉, tasks 목록이 범위를 벗어날 때까지 모든 결과 배열이 존재 함을 의미합니다.

또한 2000 개의 스레드를 만들면 최대 2000 개의 결과 데이터 집합을 동시에 대기시킬 수 있습니다. 소비자 생산자 모델로 변경하고 Environment.ProcessorCount 스레드가 2000 개의 작업을 보유한 작업 대기열을 수행하면 메모리를 사용하여 "비행 중"의 작업이 줄어 듭니다. TPL Dataflow과 같은 도구를 사용하면 제한된 수의 직원이있는 파이프 라인을 만들 수 있으며 이전 작업자가 체인의 다음 링크에서 작업을 처리 할 때까지 작업자가 새 작업을 시작하지 않습니다.

static void Main(string[] args) 
    { 
     var report = new List<string>(); 

                 //We don't use i because you did not have Process accept a parameter of any kind. 
     var producer = new TransformBlock<int, string[]>((i) => DummyProcess.Process(), new ExecutionDataflowBlockOptions {MaxDegreeOfParallelism = Environment.ProcessorCount}); 

     //Only 20 processed items can be in flight at once, if the queue is full it will block the producer threads which there only is Environment.ProcessorCount of them. 
     //Only 1 thread is used for the consumer. 
     var consumer = new ActionBlock<string[]>((result) => report.AddRange(result), new ExecutionDataflowBlockOptions{BoundedCapacity = 20}); 
     producer.LinkTo(consumer, new DataflowLinkOptions {PropagateCompletion = true}); 

     for (int i = 0; i < 2000; i++) 
     { 
      //We just add dummy values to queue up 2000 items to be processed. 
      producer.Post(i); 
     } 
     //Signals we are done adding to the producer. 
     producer.Complete(); 

     //Waits for the consumer to finish processing all pending items. 
     consumer.Completion.Wait(); 

     Console.WriteLine("Press RETURN..."); 
     Console.ReadLine(); 
    } 
}