BatchBlock
대신 BufferBlock
에 Task
을 입력하여 해당 항목을 수신하고 논리에 따라 대상에 대해 일괄 적으로 다시 보낼 수 있습니다. 일괄 처리가 포함 된 메시지를 보내야하고 다른 항목이 들어오는 경우이를 취소해야합니다. 대상 블록 (샘플의 actionBlock
)은 1로 설정된 BoundedCapacity
이어야합니다.
그래서, 먼저 뭔가를받습니다. 그렇게하면 비동기 적으로 전송되기 시작하며 더 많은 항목을 받으려고합니다. 전송이 먼저 완료되면 다시 시작합니다. 수신이 먼저 완료되면 전송을 취소하고 수신 된 항목을 일괄 처리에 추가 한 다음 두 비동기 작업을 다시 시작합니다.
실제 코드는 약간의 코너 사례 (수신 및 송신은 동시에 완료, 송신은 취소 할 수 없음, 수신 완료, 전체가 완료 되었기 때문에)를 처리해야하기 때문에 약간 더 복잡합니다. :
public static ITargetBlock<T> CreateBatchingWrapper<T>(
ITargetBlock<IReadOnlyList<T>> target)
{
// target should have BoundedCapacity == 1,
// but there is no way to check for that
var source = new BufferBlock<T>();
Task.Run(() => BatchItems(source, target));
return source;
}
private static async Task BatchItems<T>(
IReceivableSourceBlock<T> source, ITargetBlock<IReadOnlyList<T>> target)
{
try
{
while (true)
{
var messages = new List<T>();
// wait for first message in batch
if (!await source.OutputAvailableAsync())
{
// source was completed, complete target and return
target.Complete();
return;
}
// receive all there is right now
source.ReceiveAllInto(messages);
// try sending what we've got
var sendCancellation = new CancellationTokenSource();
var sendTask = target.SendAsync(messages, sendCancellation.Token);
var outputAvailableTask = source.OutputAvailableAsync();
while (true)
{
await Task.WhenAny(sendTask, outputAvailableTask);
// got another message, try cancelling send
if (outputAvailableTask.IsCompleted
&& outputAvailableTask.Result)
{
sendCancellation.Cancel();
// cancellation wasn't successful
// and the message was received, start another batch
if (!await sendTask.EnsureCancelled() && sendTask.Result)
break;
// send was cancelled, receive messages
source.ReceiveAllInto(messages);
// restart both Tasks
sendCancellation = new CancellationTokenSource();
sendTask = target.SendAsync(
messages, sendCancellation.Token);
outputAvailableTask = source.OutputAvailableAsync();
}
else
{
// we get here in three situations:
// 1. send was completed succesfully
// 2. send failed
// 3. input has completed
// in cases 2 and 3, this await is necessary
// in case 1, it's harmless
await sendTask;
break;
}
}
}
}
catch (Exception e)
{
source.Fault(e);
target.Fault(e);
}
}
/// <summary>
/// Returns a Task that completes when the given Task completes.
/// The Result is true if the Task was cancelled,
/// and false if it completed successfully.
/// If the Task was faulted, the returned Task is faulted too.
/// </summary>
public static Task<bool> EnsureCancelled(this Task task)
{
return task.ContinueWith(t =>
{
if (t.IsCanceled)
return true;
if (t.IsFaulted)
{
// rethrow the exception
ExceptionDispatchInfo.Capture(task.Exception.InnerException)
.Throw();
}
// completed successfully
return false;
});
}
public static void ReceiveAllInto<T>(
this IReceivableSourceBlock<T> source, List<T> targetCollection)
{
// TryReceiveAll would be best suited for this, except it's bugged
// (see http://connect.microsoft.com/VisualStudio/feedback/details/785185)
T item;
while (source.TryReceive(out item))
targetCollection.Add(item);
}
이 동작이 필요한 이유를 설명해 주시겠습니까? – svick
@svick 데이터베이스 업데이트를 배치하여 데이터베이스 라운드 트립 횟수를 줄이고 싶습니다. 변경 사항이 스트림으로 제공됩니다. – Brownie