2016-08-31 3 views
0

저는 현재 약간의 힘이 있습니다. 비동기 스레딩에 대한 많은 게시물을 읽었지 만 아무도 내 상황에 맞지 않으며 완전한 그림을 얻지 못합니다..NET 비동기 대기열 몇 가지 추가 기능 처리

"가져 오기"응용 프로그램을 통해 지속적으로 증가하는 "작업"테이블이있는 DB가 있습니다. 이 테이블에는이 예제에서 세 개의 열 (Id, customer_id, DateOfEntry)이 있습니다. testdata로 :

1,A,Date 
2,A,Date 
3,B,Date 
4,C,Date 
5,B,Date 
... 

내가 작업하는 것이 임해야 두 번째 응용 프로그램 "JobWorker"가. 다음 제한 사항이 있습니다.

고객과 마찬가지로 "JobWorker"응용 프로그램에서 많은 비동기 작업을 시작하려고합니다.이 예제에서는 3입니다. 점점 어려워지고 있습니다. 이 비동기 작업자는 항상 하나의 작업에서 병렬 (동기식)으로 작업 할 수있는 자체 대기열을 가져야합니다. 1) 테이블에서 가장 오래된 작업을로드해야합니다. 2)가 3) 테이블 이제

까다로운 부분에서 다음 중 하나를로드에 100 명 고객이있는 현실에서 작동하지만 그들은 계속해서 작업을하지 보내 (하지만 나도 몰라),하지만 난 원하는 최대 10 개의 작업을 병렬로 처리 할 수 ​​있습니다. (단, 고객 당 1 개를 잊지 마세요.)

어떻게하면됩니까?

public class FakeJob 
{ 
    public int Id { get; set; } 

    public string ProjectName { get; set; } 

    public int Duration { get; set; } 
} 


public class JobMaster 
{ 
    private IConfigurationRoot _configuration; 

    private BufferBlock<ActionBlock<FakeJob>> _mainQueue; 

    private Dictionary<string, ActionBlock<FakeJob>> _projectQueues; 

    private Dictionary<Guid, CancellationTokenSource> _projectCancellationTokens; 


    public JobMaster() 
    { 
     _mainQueue = new BufferBlock<ActionBlock<FakeJob>>(new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 2 }); 
     _projectQueues = new Dictionary<string, ActionBlock<FakeJob>>(); 
    } 


    public async Task WorkOnJobs() 
    { 
     List<string> projectIds = new List<string>() { "Testkunde 1", "Testkunde 2", "Testkunde 3" }; 
     List<Task> producerTasks = new List<Task>(); 
     List<FakeJob> jobs = new List<FakeJob>(); 


     jobs.Add(new FakeJob { Duration = 10, Id = 1, ProjectName = projectIds[0] }); 
     jobs.Add(new FakeJob { Duration = 10, Id = 2, ProjectName = projectIds[0] }); 
     jobs.Add(new FakeJob { Duration = 10, Id = 3, ProjectName = projectIds[0] }); 
     jobs.Add(new FakeJob { Duration = 4, Id = 4, ProjectName = projectIds[1] }); 
     jobs.Add(new FakeJob { Duration = 4, Id = 5, ProjectName = projectIds[1] }); 
     jobs.Add(new FakeJob { Duration = 4, Id = 6, ProjectName = projectIds[1] }); 
     jobs.Add(new FakeJob { Duration = 2, Id = 7, ProjectName = projectIds[2] }); 
     jobs.Add(new FakeJob { Duration = 2, Id = 8, ProjectName = projectIds[2] }); 
     jobs.Add(new FakeJob { Duration = 2, Id = 9, ProjectName = projectIds[2] }); 


     foreach (var loopProjectId in projectIds) 
     { 
      producerTasks.Add(WorkOnJobsForForProject(loopProjectId, jobs)); 
     } 


     await Task.WhenAll(producerTasks); 
    } 


    private async Task WorkOnJobsForForProject(string projectId, List<FakeJob> jobDB) 
    { 
     var consumerOptions = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 }; 

     while (true) 
     { 
      foreach (var loopJob in jobDB.Where(x => x.ProjectName == projectId)) 
      { 
       var consumer = new ActionBlock<FakeJob>(StartJob, consumerOptions); 
       _projectQueues[projectId] = consumer; 

       await _mainQueue.SendAsync(_projectQueues[projectId]); 
       await _projectQueues[projectId].SendAsync(loopJob); 
       await Task.WhenAll(_projectQueues[projectId].Completion); 
      } 

      break; 
     } 
    } 


    private async Task StartJob(FakeJob job) 
    { 
     Log.Logger.Information("Start job [{A}] for [{B}]", job.Id, job.ProjectName); 
     await Task.Delay(job.Duration * 1000); 
     _projectQueues[job.ProjectName].Complete(); 
     Log.Logger.Information("Finished job [{A}]", job.Id); 
    } 
} 

[수정 2] 나의 현재 시도 : (

[편집] 내 현재 시도, 나는 퍼즐 조각 (SemaphoreSlim, ActionBlock)을 알고 있지만 나는 완성 된 퍼즐에 그것을 얻을하지 않습니다 _mainQueue에 대한 MaxDegreeOfParallelism = 3 일 2 하지만,. 나는 2로 설정하면 작업 (9)가 실행되지 않습니다 ( (이것은 않습니다

public class FakeJob 
{ 
    public int Id { get; set; } 

    public string ProjectName { get; set; } 

    public int Duration { get; set; } 

    public bool IsComplete { get; set; } 
} 


public class JobMaster_BackUp 
{ 
    private ActionBlock<CustomerQueue> _mainQueue; 

    private Dictionary<string, ActionBlock<FakeJob>> _projectQueues; 

    public static List<FakeJob> FakeJobDB = new List<FakeJob>(); 


    public JobMaster_BackUp() 
    { 
     _mainQueue = new ActionBlock<CustomerQueue>(MainQueueJob, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 2 }); 
     _projectQueues = new Dictionary<string, ActionBlock<FakeJob>>(); 
    } 


    public async Task WorkOnJobs() 
    { 
     List<string> projectIds = new List<string>() { "Testkunde 1", "Testkunde 2", "Testkunde 3" }; 
     List<Task> producerTasks = new List<Task>(); 



     FakeJobDB.Add(new FakeJob { Duration = 3, Id = 1, ProjectName = projectIds[0] }); 
     FakeJobDB.Add(new FakeJob { Duration = 3, Id = 2, ProjectName = projectIds[0] }); 
     FakeJobDB.Add(new FakeJob { Duration = 3, Id = 3, ProjectName = projectIds[0] }); 
     FakeJobDB.Add(new FakeJob { Duration = 3, Id = 4, ProjectName = projectIds[1] }); 
     FakeJobDB.Add(new FakeJob { Duration = 3, Id = 5, ProjectName = projectIds[1] }); 
     FakeJobDB.Add(new FakeJob { Duration = 3, Id = 6, ProjectName = projectIds[1] }); 
     FakeJobDB.Add(new FakeJob { Duration = 3, Id = 7, ProjectName = projectIds[2] }); 
     FakeJobDB.Add(new FakeJob { Duration = 3, Id = 8, ProjectName = projectIds[2] }); 
     FakeJobDB.Add(new FakeJob { Duration = 3, Id = 9, ProjectName = projectIds[2] }); 


     foreach (var loopProjectId in projectIds) 
     { 
      CancellationTokenHandler.ProjectCancellationTokens[loopProjectId] = new CancellationTokenSource(); 
      producerTasks.Add(WorkOnJobsForForProject(loopProjectId, CancellationTokenHandler.ProjectCancellationTokens[loopProjectId].Token)); 
     } 


     await Task.WhenAll(producerTasks); 
    } 


    private FakeJob GetNextJob(string projectId) 
    { 
     FakeJob nextJob = FakeJobDB.Where(x => x.ProjectName == projectId && x.IsComplete == false).OrderBy(x => x.Id).FirstOrDefault(); 

     if (nextJob != null) 
     { 
      Log.Logger.Information("GetNextJob [" + nextJob.Id + "]"); 
     } 

     return nextJob; 
    } 


    private async Task WorkOnJobsForForProject(string projectId, CancellationToken cancellationToken) 
    { 
     while (!cancellationToken.IsCancellationRequested) 
     { 
      FakeJob loopJob = GetNextJob(projectId); 

      if (loopJob != null) 
      { 
       CustomerQueue customerQueue = new CustomerQueue(loopJob); 

       await _mainQueue.SendAsync(customerQueue); 

       await customerQueue.WaitForCompletion(); 
      } 
     } 
    } 





    private async Task MainQueueJob(CustomerQueue consumer) 
    { 
     consumer.Start(); 
     await Task.WhenAll(consumer.WaitForCompletion()); 
    } 
} 


public class CustomerQueue 
{ 
    private ActionBlock<FakeJob> _queue; 

    private FakeJob _job; 


    public CustomerQueue(FakeJob job) 
    { 
     _job = job; 

     var consumerOptions = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 }; 
     _queue = new ActionBlock<FakeJob>(StartJob, consumerOptions); 
    } 


    public void Start() 
    { 
     _queue.SendAsync(_job); 
    } 


    public async Task WaitForCompletion() 
    { 
     await Task.WhenAll(_queue.Completion); 
    } 


    private async Task StartJob(FakeJob job) 
    { 
     //Log.Logger.Information("Start job [{A}] for [{B}]", job.Id, job.ProjectName); 
     await Task.Delay(job.Duration * 1000); 

     JobMaster_BackUp.FakeJobDB.Single(x => x.Id == job.Id).IsComplete = true; 

     _queue.Complete(); 

     Log.Logger.Information("Finished job [{A}]", job.Id); 
    } 
} 
+0

'Parallel.ForEach()'를 살펴볼 수 있습니다. –

+1

_something_을 시도해야합니다. "퍼즐 조각"은 구현을 위해 .NET API의 특정 부분을 이미 결정했음을 의미합니다. 동일한 작업을 수행 할 수있는 다른 메커니즘과 반대입니다. 자, 보자.당신이 얼마나 멀리 있는지를 보여주는 좋은 [mcve]를 제공하고, 현재 당신이 붙어있는 문제의 _ 특정적인 부분을 정확하게 설명하십시오. –

+0

TPL Dataflow API가 사용자의 요구 사항을 충족시킬 수는 있지만 잘 알지 못합니다. 따라서 각 고객에 대해 개별 대기열을 유지 한 다음 해당 대기열의 단일 대기열을 유지 관리 할 것입니다. 가장 오래된 작업을 먼저 실행하려는 경우 대기열은 우선 순위 대기열 (예 : 정렬 됨)이 될 수 있으므로 가장 오래된 대기열을 항상 대기열에서 제외합니다. 그런 다음 한 번에 10 개 이상의 활성 대기열 해제 대기열을 실행하지 마십시오. 작업 처리가 끝나면 고객 대기열을 기본 대기열에 다시 대기열에 다시 넣고 대기열에서 다음 대기열에서 대기열에서 제외합니다. –

답변

1

한 가지 권리. 고객 당 하나의 작업 만 병렬로 실행합니다. 하지만 최대 2 개의 Jobs는 작동하지 않습니다.

나는 당신이 달성하기 위해 노력하고 정확하게의 명확한 그림이없는,하지만 난 당신이 놓치고있는 것은 다른 흐름 블록의 조화라고 생각 .

이렇게하려면 몇 가지 방법이 있습니다. 하나는 이미 언급했듯이 SemaphoreSlim을 사용하는 것입니다. 최대 개수 10 인 SemaphoreSlim을 하나 생성하고 CustomerQueue 생성자로 전달하고 StartJob을 시작 부분에 await WaitAsync, 끝 부분에 Release으로 만듭니다. 특히, 10으로 설정 동시성과 ConcurrentExclusiveSchedulerPairConcurrent 반 당신은 ConsumerQueue 생성자에이를 전달하고 블록 옵션을 설정할 것 -

다른 방법은 스케줄러 동작 블록을 제공하는 것입니다.

+0

정말 고맙습니다! 그것은 내가 필요로하는 아이디어 였고, 나의 시도는 복잡했다. :) 남자, 그 시간과 신경을 비용 : 나는 다시 감사드립니다. 종류는 안부 – SharpNoiZy