2013-07-05 5 views
52

티저 :이 질문은 재시도 정책을 구현하는 방법이 아닙니다. 이는 TPL Dataflow 블록을 올바르게 완료 한 것입니다.재시도 가능 블록의 올바른 완료 구현

이 질문은 대부분 내 이전 질문 Retry policy within ITargetBlock의 연속입니다. 이 질문에 대한 답은 TransformBlock (소스) 및 TransformManyBlock (대상)을 사용하는 @ svick의 스마트 솔루션이었습니다. 남아있는 유일한 문제는이 블록을 올바른 방법으로 완료하는 것입니다. : 먼저 모든 재시도가 완료 될 때까지 기다린 다음 대상 블록을 완료하십시오.

var retries = new HashSet<RetryingMessage<TInput>>(); 

TransformManyBlock<RetryableMessage<TInput>, TOutput> target = null; 
target = new TransformManyBlock<RetryableMessage<TInput>, TOutput>(
    async message => 
    { 
     try 
     { 
      var result = new[] { await transform(message.Data) }; 
      retries.Remove(message); 
      return result; 
     } 
     catch (Exception ex) 
     { 
      message.Exceptions.Add(ex); 
      if (message.RetriesRemaining == 0) 
      { 
       if (failureHandler != null) 
        failureHandler(message.Exceptions); 

       retries.Remove(message); 
      } 
      else 
      { 
       retries.Add(message); 
       message.RetriesRemaining--; 

       Task.Delay(retryDelay) 
        .ContinueWith(_ => target.Post(message)); 
      } 
      return null; 
     } 
    }, dataflowBlockOptions); 

source.LinkTo(target); 

source.Completion.ContinueWith(async _ => 
{ 
    while (target.InputCount > 0 || retries.Any()) 
     await Task.Delay(100); 

    target.Complete(); 
}); 

아이디어는 폴링의 일종을 수행하고 있는지 여부를 확인 : 여기 (가 아닌 스레드 retries 세트에 너무 많은 관심을 지불하지 않는 스 니펫의)와 함께 결국 무엇인가 처리 대기중인 메시지와 재 시도가 필요한 메시지가 없습니다. 그러나이 해결책에서 나는 폴링에 대한 생각을 좋아하지 않는다.

예, 별도의 클래스에 재시도 추가/삭제 로직을 캡슐화 할 수 있습니다. 재 시도 세트가 비게되었을 때 어떤 조치를 취하십시오. 그러나 어떻게 target.InputCount > 0 조건을 처리합니까? 블록에 대해 보류중인 메시지가 없을 때 호출되는 콜백이 없으므로 작은 지연이있는 루프에서 target.ItemCount을 확인하는 것이 유일한 옵션 인 것으로 보입니다.

누구나 이것을 달성하는 더 똑똑한 방법을 알고 있습니까?

+1

과 같은 동작을 차별화하여 응답 할 수 있습니다. ITargetBlock은 AsObserver Extension 메서드에서 반환 한 옵저버를 통해 푸시 기반 알림을 지원하는 것으로 보입니다. http://msdn.microsoft.com/en-us/library/hh160359.aspx 및 http://msdn.microsoft.com/en-us/library/ee850490.aspx를 참조하십시오. – JamieSee

+0

정상적인 프로그램 흐름으로 예외를 사용하려고하는 것처럼 보입니다. 이는 나쁜 습관입니다. 검색 구글이나 SO에 다음과 같은 항목을 보면 : http://stackoverflow.com/questions/729379/why-not-use-exceptions-as-regular-flow-of-control 모든 재시도 논리가해야 예외 블록이 아니라 try 블록에 있어야합니다. 당신의 질문에 대한 대답이 아니지만 당신이 알아야한다고 생각한 어떤 것. – Nullius

+4

@Nullius, 재시도 논리는 * 예외를 기반으로합니다 * - 일시적인 오류가 발생하면 재 시도하십시오. 나는'try' 블록에서 로직을 재 시도하는 것이 좋은 생각이라고 생각하지 않습니다. 왜냐하면 여러분은 에러 타입을 알지 못하며 이런 종류의 에러가 일시적인지 아닌지를 알지 못하기 때문입니다. – Alex

답변

1

이상적인 해결책이 될 수있다 : 그래서 장소에서 당신은 당신이 다음 코드를 추가 할 수 있습니다 target.InputCount을 변경합니다. 관련 이벤트가 발생할 때 통지되도록

var signal = new ManualResetEvent(false); 
var completedEvent = new ManualResetEvent(false); 

그런 다음, 당신은, 관찰자를 만들고 TransformManyBlock에 가입해야한다 : 첫째

, 당신은 하나 개 이상의 이벤트를 만들 필요가

private class RetryingBlockObserver<T> : IObserver<T> { 
     private ManualResetEvent completedEvent; 

     public RetryingBlockObserver(ManualResetEvent completedEvent) {     
      this.completedEvent = completedEvent; 
     } 

     public void OnCompleted() { 
      completedEvent.Set(); 
     } 

     public void OnError(Exception error) { 
      //TODO 
     } 

     public void OnNext(T value) { 
      //TODO 
     } 
    } 

그리고 당신은 CA :

var observer = new RetryingBlockObserver<TOutput>(completedEvent); 
var observable = target.AsObservable(); 
observable.Subscribe(observer); 

관찰 가능한 아주 쉽게 할 수 있습니다 사용자가 설정 한 이벤트 이해 WaitAll의 결과 값을 검사하고 그에 따라 반응 할 수

source.Completion.ContinueWith(async _ => { 

      WaitHandle.WaitAll(completedEvent, signal); 
      // Or WaitHandle.WaitAny, depending on your needs! 

      target.Complete(); 
     }); 

신호 또는 종료 (모든 소스 항목의 고갈), 또는 둘 다에 대한 N 기다린다. 다른 이벤트를 코드에 추가하여 관찰자에게 전달할 수 있으므로 필요할 때 설정할 수 있습니다.

2

아마도 ManualResetEvent이 당신을 속일 수 있습니다.

TransformManyBlock

private ManualResetEvent _signal = new ManualResetEvent(false); 
public ManualResetEvent Signal { get { return _signal; } } 

에 공용 속성을 추가 그리고 여기 당신은 간다 :

var retries = new HashSet<RetryingMessage<TInput>>(); 

TransformManyBlock<RetryableMessage<TInput>, TOutput> target = null; 
target = new TransformManyBlock<RetryableMessage<TInput>, TOutput>(
    async message => 
    { 
     try 
     { 
      var result = new[] { await transform(message.Data) }; 
      retries.Remove(message); 

      // Sets the state of the event to signaled, allowing one or more waiting threads to proceed 
      if(!retries.Any()) Signal.Set(); 
      return result; 
     } 
     catch (Exception ex) 
     { 
      message.Exceptions.Add(ex); 
      if (message.RetriesRemaining == 0) 
      { 
       if (failureHandler != null) 
        failureHandler(message.Exceptions); 

       retries.Remove(message); 

       // Sets the state of the event to signaled, allowing one or more waiting threads to proceed 
       if(!retries.Any()) Signal.Set(); 
      } 
      else 
      { 
       retries.Add(message); 
       message.RetriesRemaining--; 

       Task.Delay(retryDelay) 
        .ContinueWith(_ => target.Post(message)); 
      } 
      return null; 
     } 
    }, dataflowBlockOptions); 

source.LinkTo(target); 

source.Completion.ContinueWith(async _ => 
{ 
    //Blocks the current thread until the current WaitHandle receives a signal. 
    target.Signal.WaitOne(); 

    target.Complete(); 
}); 

나는 당신의 target.InputCount가 설정되는 경우 확인이 아닙니다. hwcverwe 응답 및 JamieSee 주석을 결합

if(InputCount == 0) Signal.Set(); 
+0

것은 :'target.InputCount'는 * 블랙 박스 *입니다 - TPL Dataflow의'TransformManyBlock'의 읽기 전용 속성입니다. – Alex