0

캡핑 된 버퍼 된 작업 실행 프로그램을 구현하고 싶습니다.캡핑되고 버퍼 된 작업 실행 프로그램 구현

는 단일 메소드를 갖

public class CappedBufferedExecutor { 
    public CappedBufferedExecutor(int bufferCapping, int fillTimeInMillisec); 
    public Task<bool> EnqueueAsync(string val); 
} 

아이디어 값 비동기 대기열되어 있으며, fillTimeInMillisec 밀리 통과 또는 버퍼 고유 값의 캡에 충전되면, 실행은 실제로 완료되고 비동기 작업이 모두 완료됩니다. 실행이 완료되면 (오랜 시간이 소요될 수 있음) 버퍼를 다시 채울 수 있고 새로운 비동기 실행을 수행 할 수 있습니다.

은 내가 fillTime가 한 번 경과, 통과 새 작업을 만들 때까지 기다려야하는 Timer를 사용하여 다음 의사 코드

  • 의 라인에 뭔가 생각, 일을 할 것이다 (아래 참조) .
  • 새 값으로 읽으려면 rwlock을 잠급니다. 버퍼가 가득 차 있는지 확인하십시오 (ManualResetEvent 또는 TaskCompletionSource).
  • 버퍼에 새로운 값을 추가하십시오 (HashSet<string>).
  • 버퍼가 꽉 찬 경우 rwlock을 쓰기 위해 잠그는 새 실행 작업을 만들고 모든 수집 된 값에 대해 작업을 수행하고 TaskCompletionSource을 사용하여 모든 보류중인 작업을 다시 시작하십시오.
  • TaskCompletionSource에서 버퍼 된 작업 (이전 단계에서 언급 했음)이 실행될 때까지 기다립니다.

내 문제 다음 Timer 및 채워진 버퍼 검사를 동기화하는 방법은, 대기하는 버퍼가 가득 찼을 때, 실행을 시작하고 새 값이 도착 할 수 있도록 할 때 TaskCompletionSource 인스턴스 사이를 전환하는 방법에 대해 설명합니다.

+0

:-) 많은 것을 기대하지 않습니다 왜 당신이 EnqueueAsync''에서'작업 '반환해야합니까? 버퍼가 꽉 찼을 때 작업 목록만으로 이벤트를 발생시킬 수 있습니까? – apocalypse

+0

항목이 아직 처리되고 버퍼가 완료되기 전에 가득 차 있다면 어떻게해야합니까? –

+0

@apocalypse bool은 작업이 성공적으로 수행되었음을 나타냅니다. 중요하지 않은, 당신의 길을 갈 수 있습니다. – Mugen

답변

2

이 그냥 개념은, 그래서

using System; 
using System.Collections.Generic; 
using System.Linq; 
using System.Threading; 
using System.Threading.Tasks; 

namespace ConsoleApp 
{ 
    class Program 
    { 
     static void Main (string[] args) 
     { 
      var buffer = CreateBuffer(); 

      var executor = new Executor<string> (SomeWork, buffer); 
      executor.ProcessingStarted += Executor_ProcessingStarted; 

      string userInput = null; 

      do 
      { 
       userInput = Console.ReadLine(); 

       buffer.Enqueue (userInput); 
      } 
      while (!string.IsNullOrWhiteSpace (userInput)); 

      executor.Dispose(); 
     } 

     //---------------------------------------------------------------------------------------------------------------------------------- 

     private static IBuffer<string> CreateBuffer() 
     { 
      var buffer = new UniqueItemsBuffer<string> (3); 

      buffer.DataAvailable += (items) => Console.WriteLine ("BUFFER :: data available raised."); 

      var alert = new Alert(); 

      var bufferWithTimeout = new BufferWithTimeout<string> (buffer, alert, TimeSpan.FromSeconds (5)); 

      return bufferWithTimeout; 
     } 

     //---------------------------------------------------------------------------------------------------------------------------------- 

     static Random rnd = new Random(); // must be outside, to avoid creating Random too quick because it will use the same seed for all tasks 

     public static bool SomeWork (string x) 
     { 
      int delay = rnd.Next (1000, 8000); 

      Console.WriteLine ($" +++ Starting SomeWork for: {x}, delay: {delay} ms"); 

      Thread.Sleep (delay); 

      Console.WriteLine ($" --- SomeWork for: {x} - finished."); 

      return true; 
     } 

     //---------------------------------------------------------------------------------------------------------------------------------- 

     private static void Executor_ProcessingStarted (IReadOnlyList<Task<bool>> items) 
     { 
      Task.Run (() => 
      { 
       Task.WaitAll (items.ToArray()); 
       Console.WriteLine ("Finished processing tasks, count = " + items.Count); 
      }); 
     } 
    } 

    //====== actual code =================================================================================================================== 

    public delegate void ItemsAvailable<T> (IReadOnlyList<T> items); // new type to simplify code 

    public delegate bool ProcessItem<T> (T item); // processes the given item and returns true if job is done with success 

    //====================================================================================================================================== 

    public interface IDataAvailableEvent<T> 
    { 
     event ItemsAvailable<T> DataAvailable; // occurs when buffer need to be processed (also before raising this event, buffer should be cleared) 
    } 

    //====================================================================================================================================== 

    public interface IProcessingStartedEvent<T> 
    { 
     event ItemsAvailable<Task<bool>> ProcessingStarted; // executor raises this event when all tasks are created and started 
    } 

    //====================================================================================================================================== 

    public interface IBuffer<T> : IDataAvailableEvent<T> 
    { 
     bool Enqueue (T item); // adds new item to buffer (but sometimes it can ignore item, for example if we need only unique items in list) 
           // returns: true = buffer is not empty, false = is emtpy 

     void FlushBuffer(); // should clear buffer and raise event (or not raise if buffer was already empty) 
    } 

    //====================================================================================================================================== 

    // raises DataAvailable event when buffer cap is reached 
    // ignores duplicates 

    // you can only use this class from one thread 

    public class UniqueItemsBuffer<T> : IBuffer<T> 
    { 
     public event ItemsAvailable<T> DataAvailable; 

     readonly int capacity; 
     HashSet<T> items = new HashSet<T>(); 

     public UniqueItemsBuffer (int capacity = 10) 
     { 
      this.capacity = capacity; 
     } 

     public bool Enqueue (T item) 
     { 
      if (items.Add (item) && items.Count == capacity) 
      { 
       FlushBuffer(); 
      } 

      return items.Count > 0; 
     } 

     public void FlushBuffer() 
     { 
      Console.WriteLine ("BUFFER :: flush, item count = " + items.Count); 

      if (items.Count > 0) 
      { 
       var itemsCopy = items.ToList(); 
       items.Clear(); 

       DataAvailable?.Invoke (itemsCopy); 
      } 
     } 
    } 

    //====================================================================================================================================== 

    public class Executor<T> : IProcessingStartedEvent<T>, IDisposable 
    { 
     public event ItemsAvailable<Task<bool>> ProcessingStarted; 

     readonly ProcessItem<T> work; 
     readonly IDataAvailableEvent<T> dataEvent; 

     public Executor (ProcessItem<T> work, IDataAvailableEvent<T> dataEvent) 
     { 
      this.work = work; 
      this.dataEvent = dataEvent; 

      dataEvent.DataAvailable += DataEvent_DataAvailable; 
     } 

     private void DataEvent_DataAvailable (IReadOnlyList<T> items) 
     { 
      Console.WriteLine ("EXECUTOR :: new items to process available, count = " + items.Count); 

      var list = new List<Task<bool>>(); 

      foreach (var item in items) 
      { 
       var task = Task.Run (() => work (item)); 

       list.Add (task); 
      } 

      Console.WriteLine ("EXECUTOR :: raising processing started event (this msg can appear later than messages from SomeWork)"); 

      ProcessingStarted?.Invoke (list); 
     } 

     public void Dispose() 
     { 
      dataEvent.DataAvailable -= DataEvent_DataAvailable; 
     } 
    } 

    //====================================================================================================================================== 

    // if you want to fill buffer using many threads - use this decorator 

    public sealed class ThreadSafeBuffer<T> : IBuffer<T> 
    { 
     public event ItemsAvailable<T> DataAvailable; 

     readonly IBuffer<T> target; 
     readonly object sync = new object(); 

     private ThreadSafeBuffer (IBuffer<T> target) 
     { 
      this.target = target; 
      this.target.DataAvailable += (items) => DataAvailable?.Invoke (items); // TODO: unpin event :P 
     } 

     public bool Enqueue (T item) 
     { 
      lock (sync) return target.Enqueue (item); 
     } 

     public void FlushBuffer() 
     { 
      lock (sync) target.FlushBuffer(); 
     } 

     public static IBuffer<T> MakeThreadSafe (IBuffer<T> target) 
     { 
      if (target is ThreadSafeBuffer<T>) return target; 

      return new ThreadSafeBuffer<T> (target); 
     } 
    } 

    //====================================================================================================================================== 

    // and now if you want to process buffer after elapsed time 

    public interface IAlert 
    { 
     CancellationTokenSource CreateAlert (TimeSpan delay, Action action); // will execute 'action' after given delay (non blocking) 
    } 

    // I didn't use much timers, so idk is this code good 

    public class Alert : IAlert 
    { 
     List<System.Timers.Timer> timers = new List<System.Timers.Timer>(); // we need to keep reference to timer to avoid dispose 

     public CancellationTokenSource CreateAlert (TimeSpan delay, Action action) 
     { 
      var cts = new CancellationTokenSource(); 

      var timer = new System.Timers.Timer (delay.TotalMilliseconds); 
      timers.Add (timer); 

      timer.Elapsed += (sender, e) => 
      { 
       timers.Remove (timer); 

       timer.Dispose(); 

       if (cts.Token.IsCancellationRequested) return; 

       action.Invoke(); 
      }; 

      timer.AutoReset = false; // just one tick 
      timer.Enabled = true; 

      return cts; 
     } 
    } 

    // thread safe (maybe :-D) 

    public class BufferWithTimeout<T> : IBuffer<T> 
    { 
     public event ItemsAvailable<T> DataAvailable; 

     readonly IBuffer<T> target; 
     readonly IAlert  alert; 
     readonly TimeSpan timeout; 

     CancellationTokenSource cts; 

     readonly object sync = new object(); 

     public BufferWithTimeout (IBuffer<T> target, IAlert alert, TimeSpan timeout) 
     { 
      this.target = ThreadSafeBuffer<T>.MakeThreadSafe (target); // alert can be raised from different thread 
      this.alert = alert; 
      this.timeout = timeout; 

      target.DataAvailable += Target_DataAvailable; // TODO: unpin event 
     } 

     private void Target_DataAvailable (IReadOnlyList<T> items) 
     { 
      lock (sync) 
      { 
       DisableTimer(); 
      } 

      DataAvailable?.Invoke (items); 
     } 

     public bool Enqueue (T item) 
     { 
      lock (sync) 
      { 
       bool hasItems = target.Enqueue (item); // can raise underlying flush -> dataAvailable event (will disable timer) 

       // and now if buffer is empty, we cannot start timer 

       if (hasItems && cts == null) // if timer is not enabled 
       { 
        Console.WriteLine ("TIMER :: created alert"); 
        cts = alert.CreateAlert (timeout, HandleAlert); 
       } 

       return hasItems; 
      } 
     } 

     public void FlushBuffer() 
     { 
      lock (sync) 
      { 
       DisableTimer(); 
       target.FlushBuffer(); 
      } 
     } 

     private void HandleAlert() 
     { 
      lock (sync) 
      { 
       Console.WriteLine ("TIMER :: handler, will call buffer flush"); 
       target.FlushBuffer(); 
      } 
     } 

     private void DisableTimer() 
     { 
      cts?.Cancel(); 
      cts = null; 

      Console.WriteLine ("TIMER :: disable"); 
     } 
    } 
} 
+0

감사합니다. 내 코드에서이 작업을 시도 할 것입니다! 작은 메모 :'System.Timers.Timer'는 참조를 유지할 필요가 없습니다 - 프레임 워크에 의해 유지 관리 됨 : – Mugen

+0

@Mugen : 아 ... 사실. 팁 주셔서 감사. Net 2.0에서 수정 된 것처럼 보입니다. – apocalypse

1

Reactive Extensions을 사용하여 쉽게 할 수 있습니다.

void Main() 
{ 
    var c = new Processor(); 
    c.SetupBufferedProcessor(2, TimeSpan.FromMilliseconds(1000)); 

    c.Enqueue("A"); 
    c.Enqueue("B"); 
    c.Enqueue("C"); 

    Console.ReadLine(); 

    // When application has ended, flush the buffer 
    c.Dispose(); 
} 


public sealed class Processor : IDisposable 
{ 
    private IDisposable subscription; 
    private Subject<string> subject = new Subject<string>(); 

    public void Enqueue(string item) 
    { 
     subject.OnNext(item);  
    } 

    public void SetupBufferedProcessor(int bufferSize, TimeSpan bufferCloseTimespan) 
    { 
     // Create a subscription that will produce a set of strings every second 
     // or when buffer has 2 items, whatever comes first 
     subscription = subject.AsObservable() 
      .Buffer(bufferCloseTimespan, bufferSize) 
      .Where(list => list.Any()) // suppress empty list (no items enqueued for 1 second) 
      .Subscribe(async list => 
      { 
       await Task.Run(() => 
       { 
        Console.WriteLine(string.Join(",", list)); 
        Thread.Sleep(2000); // For demo purposes, to demonstrate processing takes place parallel with other batches. 
       }); 
      }); 
    } 

    public void Dispose() 
    { 
     subscription?.Dispose(); 
    } 
} 

이 출력 할 RX의 코드 수신에 at GitHub 더 많이

C 

일초 후에

A,B 

하고, : http://www.introtorx.com/Buffer 방법을 이용하여 기본적인 예

이 예 생성 된 Task 개체에 대한 참조를 유지하도록 향상 될 수 있으므로 응용 프로그램을 끝내기 전에 제대로 기다릴 수 있지만 일반적인 생각을 줄 것입니다.