2011-08-27 1 views
1

우리는 직접 REST API 또는 우리가 제공하는 WCF 서비스로 채워지는 4 개의 Azure Queues를 가지고 있습니다.Azure 대기열 작업자 역할 다중 스레드 예제

  1. 이 4 개의 대기열을 모두 모니터하기 위해 하나의 작업자 역할을 원합니다.
  2. 나는 (처리를 큐에서 메시지를 읽고 않습니다) 설정에서 대기열 이름 등을 읽고 처리 방법을 회전 멀티 스레드를 사용하여 생각하고

은 누군가가 나에게 예를 제공시겠습니까 또는 Worker 역할에서이 목표를 달성하는 방법에 대한 지침을 제공해주십시오.

멀티 스레딩에 익숙하지 않은 이상 멀티 스레딩없이 달성 할 수 있는지 확실하지 않습니다.

감사합니다

답변

2
당신은 다른 작업에 대해 서로 다른 스레드를 해고뿐만 아니라 비 스레드 접근 방식을 고려할 수

(수행 할 수 좋든 나쁘 든 당신은 메시지로 수행 내용에 따라) :

while (true) 
{ 
    var msg = queue1.GetMessage(); 
    if (msg != null) 
    { 
     didSomething = true; 
     // do something with it 
     queue1.DeleteMessage(msg); 
    } 
    msg = queue2.GetMessage(); 
    if (msg != null) 
    { 
     didSomething = true; 
     // do something with it 
     queue2.DeleteMessage(msg); 
    } 
    // ... 
    if (!didSomething) Thread.Sleep(TimeSpan.FromSeconds(1)); // so I don't enter a tight loop with nothing to do 
} 
+0

@flybyte, 덧글을 남기지 않고 답을 수락하려면 체크 표시를 클릭하십시오. 귀하의 질문에 '대답 함'으로 표시되어 답이없는 목록에는 나타나지 않습니다. 또한 upvote (숫자로 위쪽 화살표를 클릭하십시오!)에 항상 좋습니다. – bdonlan

0

작업자 역할의 while 루프 내부에서 멀티 스레드 C# 응용 프로그램을 작성하는 것처럼 4 개의 스레드를 시작합니다. 물론 네 개의 서로 다른 스레드 함수가 정의되어 있어야하며, 이들 함수는 대기열을 폴링하기 위해 루프가 분리되어 있어야합니다. worker의 while 루프가 끝나면 쓰레드가 끝나기를 기다린다.

1

요청한대로 정확하게 수행하기위한 현재 구현 방법은 다음과 같습니다 (또는 생각해보십시오). 즉,이 코드는 약간 중량이 필요합니다. 여전히 정리 중입니다. 이것은 기능 버전 0.1입니다. 이와

public class WorkerRole : RoleEntryPoint 
{ 
    public override void Run() 
    { 
     var logic = new WorkerAgent(); 
     logic.Go(false); 
    } 

    public override bool OnStart() 
    { 
     // Initialize our Cloud Storage Configuration. 
     AzureStorageObject.Initialize(AzureConfigurationLocation.AzureProjectConfiguration); 

     return base.OnStart(); 
    } 
} 

public class WorkerAgent 
{ 
    private const int _resistance_to_scaling_larger_queues = 9; 
    private Dictionary<Type, int> _queueWeights = new Dictionary<Type, int> 
                 { 
                  {typeof (Queue1.Processor), 1}, 
                  {typeof (Queue2.Processor), 1}, 
                  {typeof (Queue3.Processor), 1}, 
                  {typeof (Queue4.Processor), 1}, 
                 }; 

    private readonly TimeSpan _minDelay = TimeSpan.FromMinutes(Convert.ToDouble(RoleEnvironment.GetConfigurationSettingValue("MinDelay"))); 
    private readonly TimeSpan _maxDelay = TimeSpan.FromMinutes(Convert.ToDouble(RoleEnvironment.GetConfigurationSettingValue("MaxDelay"))); 
    protected TimeSpan CurrentDelay { get; set; } 

    public Func<string> GetSpecificQueueTypeToProcess { get; set; } 

    /// <summary> 
    /// This is a superset collection of all Queues that this WorkerAgent knows how to process, and the weight of focus it should receive. 
    /// </summary> 
    public Dictionary<Type, int> QueueWeights 
    { 
     get 
     { 
      return _queueWeights; 
     } 
     set 
     { 
      _queueWeights = value; 
     } 
    } 

    public static TimeSpan QueueWeightCalibrationDelay 
    { 
     get { return TimeSpan.FromMinutes(15); } 
    } 


    protected Dictionary<Type, DateTime> QueueDelays = new Dictionary<Type, DateTime>(); 


    protected Dictionary<Type, AzureQueueMetaData> QueueMetaData { get; set; } 

    public WorkerAgent(Func<string> getSpecificQueueTypeToProcess = null) 
    { 
     CurrentDelay = _minDelay; 
     GetSpecificQueueTypeToProcess = getSpecificQueueTypeToProcess; 
    } 

    protected IProcessQueues CurrentProcessor { get; set; } 

    /// <summary> 
    /// Processes queue request(s). 
    /// </summary> 
    /// <param name="onlyProcessOnce">True to only process one time. False to process infinitely.</param> 
    public void Go(bool onlyProcessOnce) 
    { 
     if (onlyProcessOnce) 
     { 
      ProcessOnce(false); 
     } 
     else 
     { 
      ProcessContinuously(); 
     } 
    } 

    public void ProcessContinuously() 
    { 
     while (true) 
     { 
      // temporary hack to get this started. 
      ProcessOnce(true); 
     } 
    } 

    /// <summary> 
    /// Attempts to fetch and process a single queued request. 
    /// </summary> 
    public void ProcessOnce(bool shouldDelay) 
    { 
     PopulateQueueMetaData(QueueWeightCalibrationDelay); 

     if (shouldDelay) 
     { 
      Thread.Sleep(CurrentDelay); 
     } 

     var typesToPickFrom = new List<Type>(); 
     foreach(var item in QueueWeights) 
     { 
      for (var i = 0; i < item.Value; i++) 
      { 
       typesToPickFrom.Add(item.Key); 
      } 
     } 

     var randomIndex = (new Random()).Next()%typesToPickFrom.Count; 
     var typeToTryAndProcess = typesToPickFrom[randomIndex]; 

     CurrentProcessor = ObjectFactory.GetInstance(typeToTryAndProcess) as IProcessQueues; 
     CleanQueueDelays(); 

     if (CurrentProcessor != null && !QueueDelays.ContainsKey(typeToTryAndProcess)) 
     { 
      var errors = CurrentProcessor.Go(); 

      var amountToDelay = CurrentProcessor.NumberProcessed == 0 && !errors.Any() 
           ? _maxDelay // the queue was empty 
           : _minDelay; // else 

      QueueDelays[CurrentProcessor.GetType()] = DateTime.Now + amountToDelay; 
     } 
     else 
     { 
      ProcessOnce(true); 
     } 
    } 

    /// <summary> 
    /// This method populates/refreshes the QueueMetaData collection. 
    /// </summary> 
    /// <param name="queueMetaDataCacheLimit">Specifies the length of time to cache the MetaData before refreshing it.</param> 
    private void PopulateQueueMetaData(TimeSpan queueMetaDataCacheLimit) 
    { 
     if (QueueMetaData == null) 
     { 
      QueueMetaData = new Dictionary<Type, AzureQueueMetaData>(); 
     } 

     var queuesWithoutMetaData = QueueWeights.Keys.Except(QueueMetaData.Keys).ToList(); 
     var expiredQueueMetaData = QueueMetaData.Where(qmd => qmd.Value.TimeMetaDataWasPopulated < (DateTime.Now - queueMetaDataCacheLimit)).Select(qmd => qmd.Key).ToList(); 
     var validQueueData = QueueMetaData.Where(x => !expiredQueueMetaData.Contains(x.Key)).ToList(); 
     var results = new Dictionary<Type, AzureQueueMetaData>(); 

     foreach (var queueProcessorType in queuesWithoutMetaData) 
     { 
      if (!results.ContainsKey(queueProcessorType)) 
      { 
       var queueProcessor = ObjectFactory.GetInstance(queueProcessorType) as IProcessQueues; 
       if (queueProcessor != null) 
       { 
        var queue = new AzureQueue(queueProcessor.PrimaryQueueName); 
        var metaData = queue.GetMetaData(); 
        results.Add(queueProcessorType, metaData); 

        QueueWeights[queueProcessorType] = (metaData.ApproximateMessageCount) == 0 
                ? 1 
                : (int)Math.Log(metaData.ApproximateMessageCount, _resistance_to_scaling_larger_queues) + 1; 
       } 
      } 
     } 

     foreach (var queueProcessorType in expiredQueueMetaData) 
     { 
      if (!results.ContainsKey(queueProcessorType)) 
      { 
       var queueProcessor = ObjectFactory.GetInstance(queueProcessorType) as IProcessQueues; 
       if (queueProcessor != null) 
       { 
        var queue = new AzureQueue(queueProcessor.PrimaryQueueName); 
        var metaData = queue.GetMetaData(); 
        results.Add(queueProcessorType, metaData); 
       } 
      } 
     } 

     QueueMetaData = results.Union(validQueueData).ToDictionary(data => data.Key, data => data.Value); 
    } 

    private void CleanQueueDelays() 
    { 
     QueueDelays = QueueDelays.Except(QueueDelays.Where(x => x.Value < DateTime.Now)).ToDictionary(x => x.Key, x => x.Value); 
    } 
} 

, 우리는 각 큐를 처리하는 방법을 알고있는 별도의 클래스를 가지고 있고, 그것은 IProcessQueues을 구현합니다. 우리가 처리하고자하는 유형으로 각각 _queueWeights 컬렉션을로드합니다. _resistance_to_scaling_larger_queues 상수를 설정하여이 크기를 조정하는 방식을 제어합니다. 이 값은 로그 방식으로 조정됩니다 ( PopulateQueueMetaData 메서드 참조). 0 개의 항목이 있더라도 하나의 대기열에 1보다 작은 가중치가 없습니다. PopulateQueueMetaData10으로 설정하면 크기가 10만큼 증가 할 때마다 해당 유형의 "가중치"가 1 씩 증가합니다. 예를 들어 QueueA에 0 개 항목, QueueB에 0 개 항목, QueueC에 10 개 항목이있는 경우 , 각각의 가중치는 1, 1 및 2입니다. 즉, QueueC는 다음에 처리 될 확률이 50 %이고 QueueA 및 QueueB 각각은 처리 할 확률이 25 %입니다. QueueC에 100 개의 항목이 있으면 가중치가 1, 1, 3이고 처리 할 확률은 20 %, 20 %, 60 %입니다. 이렇게하면 빈 대기열을 잊어 버리지 않습니다.

다른 점은 _minDelay_maxDelay입니다. 이 코드가 대기열에 항목이 하나 이상 있다고 생각하면 _minDelay 속도만큼 빠르게 처리합니다. 그러나 마지막으로 0 개의 항목이있는 경우 _maxDelay보다 빠르게 처리 할 수 ​​없습니다. 즉, 난수 생성기가 0 개의 항목이있는 대기열 (중량에 관계없이)을 당기면 처리를 건너 뛰고 다음 반복으로 넘어갑니다. (더 나은 스토리지 트랜잭션 효율성을 위해이 부분에 몇 가지 추가 최적화를 적용 할 수 있지만 이는 아주 간단합니다.하나는 본질적으로 CloudQueue과 다른 상점 같은 큐의 대략적인 계산과 같은 일부 정보에 대한 래퍼입니다 - -)

우리는 (예) AzureQueue로하고 AzureQueueMetaData 여기에 몇 가지 사용자 정의 클래스가없는 흥미로운 아무것도 거기에 (코드 단순화 방법).

다시 말하지만,이 "예쁜"코드는 아니지만이 코드에서는 일부 영리한 개념이 구현되고 작동합니다. 원하는 모든 이유로 그것을 사용하십시오. :)

마지막으로이 코드를 작성하면 훨씬 많은 대기열을 처리 할 수있는 단일 프로젝트를 가질 수 있습니다. 이 문제가 단순히 해결되지 않는 경우 많은 수의 인스턴스로 쉽게 확장 할 수 있으며 모든 대기열에 맞게 확장 할 수 있습니다. 최소한의 시나리오에서는이 인스턴스 하나를 배포하여 3 개의 대기열을 모니터링 할 수 있습니다. 그러나 네 번째 큐가 성능에 영향을 미치기 시작하면 (또는 고 가용성이 필요한 경우) 최대 2 개의 인스턴스를 늘리십시오. 15 개의 대기열에 도달하면 3 번째 대기열을 추가합니다. 25 개의 대기열에 4 번째 인스턴스를 추가합니다. 새로운 고객을 확보하고 시스템 전체에서 많은 대기열 요청을 처리해야합니다. 그것이 끝날 때까지 20 instanes이 하나의 역할을 스핀하고 그들을 돌려 돌려. 특히 지저분한 대기열이 있습니까? 해당 큐에 _queueWeights 콜렉션이 있음을 주석 처리하고 나머지 큐를 관리 한 다음 _queueWeights 콜렉션에서 주석 처리 된 다른 모든 큐와 함께 다시 배치 한 다음 다시 다른 인스턴스 세트에 배치하고 디버깅을 수행하십시오 a) 다른 QueueProcessors가 디버깅을 방해하고 b) 디버깅이 다른 QueueProcessor와 간섭하지 않습니다. 궁극적으로 이것은 많은 유연성과 효율성을 제공합니다.