저는 현재 약간의 힘이 있습니다. 비동기 스레딩에 대한 많은 게시물을 읽었지 만 아무도 내 상황에 맞지 않으며 완전한 그림을 얻지 못합니다..NET 비동기 대기열 몇 가지 추가 기능 처리
"가져 오기"응용 프로그램을 통해 지속적으로 증가하는 "작업"테이블이있는 DB가 있습니다. 이 테이블에는이 예제에서 세 개의 열 (Id, customer_id, DateOfEntry)이 있습니다. testdata로 :
1,A,Date
2,A,Date
3,B,Date
4,C,Date
5,B,Date
...
내가 작업하는 것이 임해야 두 번째 응용 프로그램 "JobWorker"가. 다음 제한 사항이 있습니다.
고객과 마찬가지로 "JobWorker"응용 프로그램에서 많은 비동기 작업을 시작하려고합니다.이 예제에서는 3입니다. 점점 어려워지고 있습니다. 이 비동기 작업자는 항상 하나의 작업에서 병렬 (동기식)으로 작업 할 수있는 자체 대기열을 가져야합니다. 1) 테이블에서 가장 오래된 작업을로드해야합니다. 2)가 3) 테이블 이제
까다로운 부분에서 다음 중 하나를로드에 100 명 고객이있는 현실에서 작동하지만 그들은 계속해서 작업을하지 보내 (하지만 나도 몰라),하지만 난 원하는 최대 10 개의 작업을 병렬로 처리 할 수 있습니다. (단, 고객 당 1 개를 잊지 마세요.)
어떻게하면됩니까?
public class FakeJob
{
public int Id { get; set; }
public string ProjectName { get; set; }
public int Duration { get; set; }
}
public class JobMaster
{
private IConfigurationRoot _configuration;
private BufferBlock<ActionBlock<FakeJob>> _mainQueue;
private Dictionary<string, ActionBlock<FakeJob>> _projectQueues;
private Dictionary<Guid, CancellationTokenSource> _projectCancellationTokens;
public JobMaster()
{
_mainQueue = new BufferBlock<ActionBlock<FakeJob>>(new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 2 });
_projectQueues = new Dictionary<string, ActionBlock<FakeJob>>();
}
public async Task WorkOnJobs()
{
List<string> projectIds = new List<string>() { "Testkunde 1", "Testkunde 2", "Testkunde 3" };
List<Task> producerTasks = new List<Task>();
List<FakeJob> jobs = new List<FakeJob>();
jobs.Add(new FakeJob { Duration = 10, Id = 1, ProjectName = projectIds[0] });
jobs.Add(new FakeJob { Duration = 10, Id = 2, ProjectName = projectIds[0] });
jobs.Add(new FakeJob { Duration = 10, Id = 3, ProjectName = projectIds[0] });
jobs.Add(new FakeJob { Duration = 4, Id = 4, ProjectName = projectIds[1] });
jobs.Add(new FakeJob { Duration = 4, Id = 5, ProjectName = projectIds[1] });
jobs.Add(new FakeJob { Duration = 4, Id = 6, ProjectName = projectIds[1] });
jobs.Add(new FakeJob { Duration = 2, Id = 7, ProjectName = projectIds[2] });
jobs.Add(new FakeJob { Duration = 2, Id = 8, ProjectName = projectIds[2] });
jobs.Add(new FakeJob { Duration = 2, Id = 9, ProjectName = projectIds[2] });
foreach (var loopProjectId in projectIds)
{
producerTasks.Add(WorkOnJobsForForProject(loopProjectId, jobs));
}
await Task.WhenAll(producerTasks);
}
private async Task WorkOnJobsForForProject(string projectId, List<FakeJob> jobDB)
{
var consumerOptions = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 };
while (true)
{
foreach (var loopJob in jobDB.Where(x => x.ProjectName == projectId))
{
var consumer = new ActionBlock<FakeJob>(StartJob, consumerOptions);
_projectQueues[projectId] = consumer;
await _mainQueue.SendAsync(_projectQueues[projectId]);
await _projectQueues[projectId].SendAsync(loopJob);
await Task.WhenAll(_projectQueues[projectId].Completion);
}
break;
}
}
private async Task StartJob(FakeJob job)
{
Log.Logger.Information("Start job [{A}] for [{B}]", job.Id, job.ProjectName);
await Task.Delay(job.Duration * 1000);
_projectQueues[job.ProjectName].Complete();
Log.Logger.Information("Finished job [{A}]", job.Id);
}
}
[수정 2] 나의 현재 시도 : (
[편집] 내 현재 시도, 나는 퍼즐 조각 (SemaphoreSlim, ActionBlock)을 알고 있지만 나는 완성 된 퍼즐에 그것을 얻을하지 않습니다 _mainQueue에 대한 MaxDegreeOfParallelism = 3 일 2 하지만,. 나는 2로 설정하면 작업 (9)가 실행되지 않습니다 ( (이것은 않습니다
public class FakeJob
{
public int Id { get; set; }
public string ProjectName { get; set; }
public int Duration { get; set; }
public bool IsComplete { get; set; }
}
public class JobMaster_BackUp
{
private ActionBlock<CustomerQueue> _mainQueue;
private Dictionary<string, ActionBlock<FakeJob>> _projectQueues;
public static List<FakeJob> FakeJobDB = new List<FakeJob>();
public JobMaster_BackUp()
{
_mainQueue = new ActionBlock<CustomerQueue>(MainQueueJob, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 2 });
_projectQueues = new Dictionary<string, ActionBlock<FakeJob>>();
}
public async Task WorkOnJobs()
{
List<string> projectIds = new List<string>() { "Testkunde 1", "Testkunde 2", "Testkunde 3" };
List<Task> producerTasks = new List<Task>();
FakeJobDB.Add(new FakeJob { Duration = 3, Id = 1, ProjectName = projectIds[0] });
FakeJobDB.Add(new FakeJob { Duration = 3, Id = 2, ProjectName = projectIds[0] });
FakeJobDB.Add(new FakeJob { Duration = 3, Id = 3, ProjectName = projectIds[0] });
FakeJobDB.Add(new FakeJob { Duration = 3, Id = 4, ProjectName = projectIds[1] });
FakeJobDB.Add(new FakeJob { Duration = 3, Id = 5, ProjectName = projectIds[1] });
FakeJobDB.Add(new FakeJob { Duration = 3, Id = 6, ProjectName = projectIds[1] });
FakeJobDB.Add(new FakeJob { Duration = 3, Id = 7, ProjectName = projectIds[2] });
FakeJobDB.Add(new FakeJob { Duration = 3, Id = 8, ProjectName = projectIds[2] });
FakeJobDB.Add(new FakeJob { Duration = 3, Id = 9, ProjectName = projectIds[2] });
foreach (var loopProjectId in projectIds)
{
CancellationTokenHandler.ProjectCancellationTokens[loopProjectId] = new CancellationTokenSource();
producerTasks.Add(WorkOnJobsForForProject(loopProjectId, CancellationTokenHandler.ProjectCancellationTokens[loopProjectId].Token));
}
await Task.WhenAll(producerTasks);
}
private FakeJob GetNextJob(string projectId)
{
FakeJob nextJob = FakeJobDB.Where(x => x.ProjectName == projectId && x.IsComplete == false).OrderBy(x => x.Id).FirstOrDefault();
if (nextJob != null)
{
Log.Logger.Information("GetNextJob [" + nextJob.Id + "]");
}
return nextJob;
}
private async Task WorkOnJobsForForProject(string projectId, CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
FakeJob loopJob = GetNextJob(projectId);
if (loopJob != null)
{
CustomerQueue customerQueue = new CustomerQueue(loopJob);
await _mainQueue.SendAsync(customerQueue);
await customerQueue.WaitForCompletion();
}
}
}
private async Task MainQueueJob(CustomerQueue consumer)
{
consumer.Start();
await Task.WhenAll(consumer.WaitForCompletion());
}
}
public class CustomerQueue
{
private ActionBlock<FakeJob> _queue;
private FakeJob _job;
public CustomerQueue(FakeJob job)
{
_job = job;
var consumerOptions = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 };
_queue = new ActionBlock<FakeJob>(StartJob, consumerOptions);
}
public void Start()
{
_queue.SendAsync(_job);
}
public async Task WaitForCompletion()
{
await Task.WhenAll(_queue.Completion);
}
private async Task StartJob(FakeJob job)
{
//Log.Logger.Information("Start job [{A}] for [{B}]", job.Id, job.ProjectName);
await Task.Delay(job.Duration * 1000);
JobMaster_BackUp.FakeJobDB.Single(x => x.Id == job.Id).IsComplete = true;
_queue.Complete();
Log.Logger.Information("Finished job [{A}]", job.Id);
}
}
'Parallel.ForEach()'를 살펴볼 수 있습니다. –
_something_을 시도해야합니다. "퍼즐 조각"은 구현을 위해 .NET API의 특정 부분을 이미 결정했음을 의미합니다. 동일한 작업을 수행 할 수있는 다른 메커니즘과 반대입니다. 자, 보자.당신이 얼마나 멀리 있는지를 보여주는 좋은 [mcve]를 제공하고, 현재 당신이 붙어있는 문제의 _ 특정적인 부분을 정확하게 설명하십시오. –
TPL Dataflow API가 사용자의 요구 사항을 충족시킬 수는 있지만 잘 알지 못합니다. 따라서 각 고객에 대해 개별 대기열을 유지 한 다음 해당 대기열의 단일 대기열을 유지 관리 할 것입니다. 가장 오래된 작업을 먼저 실행하려는 경우 대기열은 우선 순위 대기열 (예 : 정렬 됨)이 될 수 있으므로 가장 오래된 대기열을 항상 대기열에서 제외합니다. 그런 다음 한 번에 10 개 이상의 활성 대기열 해제 대기열을 실행하지 마십시오. 작업 처리가 끝나면 고객 대기열을 기본 대기열에 다시 대기열에 다시 넣고 대기열에서 다음 대기열에서 대기열에서 제외합니다. –