2

간단한 예제에서 내 문제를 설명하고 더 가까운 문제를 설명합니다.그룹의 병렬 실행

상자 1에 [i1, i2, i3, i4, ...]가 있고 항목을 처리 할 수있는 상자 2가 있습니다 (일반적으로 m은 n보다 훨씬 적음). 각 항목에 필요한 시간이 다릅니다. 나는 모든 항목이 진행될 때까지 항상 m 직업 품목을하고 싶다.

훨씬 더 가까운 문제는 파일의 n 개의 문자열 (URL 주소) 목록 1을 가지고 있고 시스템이 m 개의 파일을 동시에 다운로드하도록하려는 경우 (예 : httpclient.getAsync() 메소드를 통해) . m 개의 항목 중 하나의 다운로드가 끝날 때마다 list1의 다른 나머지 항목을 최대한 빨리 대체해야하며 List1 항목이 모두 진행될 때까지이 항목을 계승해야합니다. (n 및 m의 번호는 런타임에 사용자 입력에 의해 지정됩니다)

어떻게 수행 할 수 있습니까? 동시 작업의 수를 제한 병렬

답변

1

프로세스 항목 :

string[] strings = GetStrings(); // Items to process. 
const int m = 2; // Max simultaneous jobs. 

Parallel.ForEach(strings, new ParallelOptions {MaxDegreeOfParallelism = m}, s => 
{ 
    DoWork(s); 
}); 
+2

을 할 수있는 또 다른 방법이며, Parallel.ForEach 때
편집
약간의 변경 후 코드는 이제 작업을 생성 비동기를 지원하지 않습니다. –

+0

이 방법은 내 문제에서 작동하지 않습니다. Parallel.ForEach를 비동기 메소드와 함께 사용할 수 없기 때문입니다. 비동기 메소드와 함께 Parallel.ForEach를 사용하는 경우 모든 작업이 즉시 시작됩니다 (비동기 작업 완료까지 기다리지 않습니다). 비동기 메서드 인 HttpClient.getAsync를 사용하고 있습니다. –

6

당신은 간단

private static HttpClient _client = new HttpClient(); 
public async Task<List<MyClass>> ProcessDownloads(IEnumerable<string> uris, 
                int concurrentDownloads) 
{ 
    var result = new List<MyClass>(); 

    var downloadData = new TransformBlock<string, string>(async uri => 
    { 
     return await _client.GetStringAsync(uri); //GetStringAsync is a thread safe method. 
    }, new ExecutionDataflowBlockOptions{MaxDegreeOfParallelism = concurrentDownloads}); 

    var processData = new TransformBlock<string, MyClass>(
      json => JsonConvert.DeserializeObject<MyClass>(json), 
      new ExecutionDataflowBlockOptions {MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded}); 

    var collectData = new ActionBlock<MyClass>(
      data => result.Add(data)); //When you don't specifiy options dataflow processes items one at a time. 

    //Set up the chain of blocks, have it call `.Complete()` on the next block when the current block finishes processing it's last item. 
    downloadData.LinkTo(processData, new DataflowLinkOptions {PropagateCompletion = true}); 
    processData.LinkTo(collectData, new DataflowLinkOptions {PropagateCompletion = true}); 

    //Load the data in to the first transform block to start off the process. 
    foreach (var uri in uris) 
    { 
     await downloadData.SendAsync(uri).ConfigureAwait(false); 
    } 
    downloadData.Complete(); //Signal you are done adding data. 

    //Wait for the last object to be added to the list. 
    await collectData.Completion.ConfigureAwait(false); 

    return result; 
} 
입니다 원하는 다음 프로젝트에 System.Threading.Tasks.Dataflow NuGet 패키지를 추가, TPL Dataflow에 보일 것입니다

위의 코드에서 주어진 시간에만 HttpClient가 활성화되고, 무제한 스레드는 수신 된 문자열을 처리하여 객체로 변환하고 단일 스레드는 thos를 사용합니다. e 개체를 만들고 목록에 추가합니다.

업데이트 : 여기 만이 여기에 해당

private static HttpClient _client = new HttpClient(); 
public void ProcessDownloads(IEnumerable<string> uris, int concurrentDownloads) 
{ 
    var downloadData = new ActionBlock<string>(async uri => 
    { 
     var response = await _client.GetAsync(uri); //GetAsync is a thread safe method. 
     //do something with response here. 
    }, new ExecutionDataflowBlockOptions{MaxDegreeOfParallelism = concurrentDownloads}); 


    foreach (var uri in uris) 
    { 
     downloadData.Post(uri); 
    } 
    downloadData.Complete(); 

    downloadData.Completion.Wait(); 
} 
+0

감사합니다.내 문제에 대한 답변을 찾기 위해 TPL이나 Reactive Extension에 대해 많이 들었지만 그 중 일부는 나를 위해 얼마나 복잡하고 어떻게 사용 하는지를 알지 못합니다. 이 작업을 실제로 처리 할 수있는 간단한 솔루션이 없습니까? :) –

+0

데이터 흐름 파이프 라인에서 단계를 설정하는 것만 알면 쉽게 처리 할 수 ​​있습니다. TPL DataFlow의 모든 기능을 보여줄 수 있도록 예제를 지나치게 복잡하게 만들었으므로 필자는 요구 사항의 예를 업데이트했습니다. –

+0

'HttpClient'는 여러 요청에 대해 동시 적으로 재사용되도록 설계되었습니다. 하나의 인스턴스를 만들고 그것을 사용하고, 매번 새로운 인스턴스를 만들지 마십시오. –

6

요청 사용할 수있는 일반적인 방법이 무엇인지한다는 것을 단순화 된 예입니다.

이 TIn을 문자열 (URL 주소)이라고하면 asyncProcessor는 URL 주소를 입력으로 사용하여 작업을 반환하는 비동기 메서드가됩니다.

이 방법으로 사용되는 SlimSemaphore는 다른 요청이 완료되는 즉시 n 개의 동시 비동기 I/O 요청을 실시간으로 허용합니다. 슬라이딩 윈도우 패턴과 같은 것.

public static Task ForEachAsync<TIn>(
      IEnumerable<TIn> inputEnumerable, 
      Func<TIn, Task> asyncProcessor, 
      int? maxDegreeOfParallelism = null) 
     { 
      int maxAsyncThreadCount = maxDegreeOfParallelism ?? DefaultMaxDegreeOfParallelism; 
      SemaphoreSlim throttler = new SemaphoreSlim(maxAsyncThreadCount, maxAsyncThreadCount); 

      IEnumerable<Task> tasks = inputEnumerable.Select(async input => 
      { 
       await throttler.WaitAsync().ConfigureAwait(false); 
       try 
       { 
        await asyncProcessor(input).ConfigureAwait(false); 
       } 
       finally 
       { 
        throttler.Release(); 
       } 
      }); 

      return Task.WhenAll(tasks); 
     } 
+0

감사. 귀엽다. 나는 그것을 시험하고 그 일을 다시보고해야한다. –

+0

죄송합니다. 질문이 있습니다. 즉시 모든 작업을 생성하고 각 작업의 순서가 타임 라인에 표시되기를 기다리거나 시간이 필요할 때마다 작업을 생성합니까? –

+0

'Task.WhenAll'은 모든 작업에 대한 목록을 내부적으로 생성하므로 모두 즉시 생성됩니다. –

2

간단한 조절 솔루션은 SemaphoreSlim입니다. 그들은

var client = new HttpClient(); 
SemaphoreSlim semaphore = new SemaphoreSlim(m, m); //set the max here 
var tasks = new List<Task>(); 

foreach(var url in urls) 
{ 
    // moving the wait here throttles the foreach loop 
    await semaphore.WaitAsync(); 
    tasks.Add(((Func<Task>)(async() => 
    { 
     //await semaphore.WaitAsync(); 
     var response = await client.GetAsync(url); // possibly ConfigureAwait(false) here 
     // do something with response 
     semaphore.Release(); 
    }))()); 
} 

await Task.WhenAll(tasks); 

필요 이것은 그의 DoWork는 비동기입니다 그것을

var client = new HttpClient(); 
var tasks = new HashSet<Task>(); 

foreach(var url in urls) 
{ 
    if(tasks.Count == m) 
    { 
     tasks.Remove(await Task.WhenAny(tasks));    
    } 

    tasks.Add(((Func<Task>)(async() => 
    { 
     var response = await client.GetAsync(url); // possibly ConfigureAwait(false) here 
     // do something with response    
    }))()); 
} 

await Task.WhenAll(tasks); 
+0

그것은 일을하는 것처럼 보입니다. 모토로라에서 모질라를 다운로드하지만, 문제가 있습니다. 예를 들어, 1 백만 개의 URL 목록이있는 경우 짧은 시간에 1 백만 개의 작업을 생성 한 다음 각 작업의 순서를 기다리고 있습니다. 내가 잘못? –

+0

맞습니다. 짧은 시간에 모든 작업을 생성합니다. 또한 같은 스레드에서 모든 것을 수행하지만'ConfigureAwait (false)'를 사용하여 변경하거나 스레드 풀에서 실행할 수 있습니다. 더 많은 정보를 가지고 답변을 업데이트하겠습니다. –

+0

내 URL 목록이 너무 길어서 수백만 명이 될 수 있습니다.이 수백만 건의 작업이 모두 작성되면 메모리 부족, 다른 예외 또는 오류가 발생할 수 있습니다. :) 낮은 메모리 사용량으로 필요할 때마다 각 부분에 대한 솔루션을 제공합니다. –