요청한대로 정확하게 수행하기위한 현재 구현 방법은 다음과 같습니다 (또는 생각해보십시오). 즉,이 코드는 약간 중량이 필요합니다. 여전히 정리 중입니다. 이것은 기능 버전 0.1입니다. 이와
public class WorkerRole : RoleEntryPoint
{
public override void Run()
{
var logic = new WorkerAgent();
logic.Go(false);
}
public override bool OnStart()
{
// Initialize our Cloud Storage Configuration.
AzureStorageObject.Initialize(AzureConfigurationLocation.AzureProjectConfiguration);
return base.OnStart();
}
}
public class WorkerAgent
{
private const int _resistance_to_scaling_larger_queues = 9;
private Dictionary<Type, int> _queueWeights = new Dictionary<Type, int>
{
{typeof (Queue1.Processor), 1},
{typeof (Queue2.Processor), 1},
{typeof (Queue3.Processor), 1},
{typeof (Queue4.Processor), 1},
};
private readonly TimeSpan _minDelay = TimeSpan.FromMinutes(Convert.ToDouble(RoleEnvironment.GetConfigurationSettingValue("MinDelay")));
private readonly TimeSpan _maxDelay = TimeSpan.FromMinutes(Convert.ToDouble(RoleEnvironment.GetConfigurationSettingValue("MaxDelay")));
protected TimeSpan CurrentDelay { get; set; }
public Func<string> GetSpecificQueueTypeToProcess { get; set; }
/// <summary>
/// This is a superset collection of all Queues that this WorkerAgent knows how to process, and the weight of focus it should receive.
/// </summary>
public Dictionary<Type, int> QueueWeights
{
get
{
return _queueWeights;
}
set
{
_queueWeights = value;
}
}
public static TimeSpan QueueWeightCalibrationDelay
{
get { return TimeSpan.FromMinutes(15); }
}
protected Dictionary<Type, DateTime> QueueDelays = new Dictionary<Type, DateTime>();
protected Dictionary<Type, AzureQueueMetaData> QueueMetaData { get; set; }
public WorkerAgent(Func<string> getSpecificQueueTypeToProcess = null)
{
CurrentDelay = _minDelay;
GetSpecificQueueTypeToProcess = getSpecificQueueTypeToProcess;
}
protected IProcessQueues CurrentProcessor { get; set; }
/// <summary>
/// Processes queue request(s).
/// </summary>
/// <param name="onlyProcessOnce">True to only process one time. False to process infinitely.</param>
public void Go(bool onlyProcessOnce)
{
if (onlyProcessOnce)
{
ProcessOnce(false);
}
else
{
ProcessContinuously();
}
}
public void ProcessContinuously()
{
while (true)
{
// temporary hack to get this started.
ProcessOnce(true);
}
}
/// <summary>
/// Attempts to fetch and process a single queued request.
/// </summary>
public void ProcessOnce(bool shouldDelay)
{
PopulateQueueMetaData(QueueWeightCalibrationDelay);
if (shouldDelay)
{
Thread.Sleep(CurrentDelay);
}
var typesToPickFrom = new List<Type>();
foreach(var item in QueueWeights)
{
for (var i = 0; i < item.Value; i++)
{
typesToPickFrom.Add(item.Key);
}
}
var randomIndex = (new Random()).Next()%typesToPickFrom.Count;
var typeToTryAndProcess = typesToPickFrom[randomIndex];
CurrentProcessor = ObjectFactory.GetInstance(typeToTryAndProcess) as IProcessQueues;
CleanQueueDelays();
if (CurrentProcessor != null && !QueueDelays.ContainsKey(typeToTryAndProcess))
{
var errors = CurrentProcessor.Go();
var amountToDelay = CurrentProcessor.NumberProcessed == 0 && !errors.Any()
? _maxDelay // the queue was empty
: _minDelay; // else
QueueDelays[CurrentProcessor.GetType()] = DateTime.Now + amountToDelay;
}
else
{
ProcessOnce(true);
}
}
/// <summary>
/// This method populates/refreshes the QueueMetaData collection.
/// </summary>
/// <param name="queueMetaDataCacheLimit">Specifies the length of time to cache the MetaData before refreshing it.</param>
private void PopulateQueueMetaData(TimeSpan queueMetaDataCacheLimit)
{
if (QueueMetaData == null)
{
QueueMetaData = new Dictionary<Type, AzureQueueMetaData>();
}
var queuesWithoutMetaData = QueueWeights.Keys.Except(QueueMetaData.Keys).ToList();
var expiredQueueMetaData = QueueMetaData.Where(qmd => qmd.Value.TimeMetaDataWasPopulated < (DateTime.Now - queueMetaDataCacheLimit)).Select(qmd => qmd.Key).ToList();
var validQueueData = QueueMetaData.Where(x => !expiredQueueMetaData.Contains(x.Key)).ToList();
var results = new Dictionary<Type, AzureQueueMetaData>();
foreach (var queueProcessorType in queuesWithoutMetaData)
{
if (!results.ContainsKey(queueProcessorType))
{
var queueProcessor = ObjectFactory.GetInstance(queueProcessorType) as IProcessQueues;
if (queueProcessor != null)
{
var queue = new AzureQueue(queueProcessor.PrimaryQueueName);
var metaData = queue.GetMetaData();
results.Add(queueProcessorType, metaData);
QueueWeights[queueProcessorType] = (metaData.ApproximateMessageCount) == 0
? 1
: (int)Math.Log(metaData.ApproximateMessageCount, _resistance_to_scaling_larger_queues) + 1;
}
}
}
foreach (var queueProcessorType in expiredQueueMetaData)
{
if (!results.ContainsKey(queueProcessorType))
{
var queueProcessor = ObjectFactory.GetInstance(queueProcessorType) as IProcessQueues;
if (queueProcessor != null)
{
var queue = new AzureQueue(queueProcessor.PrimaryQueueName);
var metaData = queue.GetMetaData();
results.Add(queueProcessorType, metaData);
}
}
}
QueueMetaData = results.Union(validQueueData).ToDictionary(data => data.Key, data => data.Value);
}
private void CleanQueueDelays()
{
QueueDelays = QueueDelays.Except(QueueDelays.Where(x => x.Value < DateTime.Now)).ToDictionary(x => x.Key, x => x.Value);
}
}
, 우리는 각 큐를 처리하는 방법을 알고있는 별도의 클래스를 가지고 있고, 그것은 IProcessQueues을 구현합니다. 우리가 처리하고자하는 유형으로 각각
_queueWeights
컬렉션을로드합니다.
_resistance_to_scaling_larger_queues
상수를 설정하여이 크기를 조정하는 방식을 제어합니다. 이 값은 로그 방식으로 조정됩니다 (
PopulateQueueMetaData
메서드 참조). 0 개의 항목이 있더라도 하나의 대기열에 1보다 작은 가중치가 없습니다.
PopulateQueueMetaData
을
10
으로 설정하면 크기가 10만큼 증가 할 때마다 해당 유형의 "가중치"가 1 씩 증가합니다. 예를 들어 QueueA에 0 개 항목, QueueB에 0 개 항목, QueueC에 10 개 항목이있는 경우 , 각각의 가중치는 1, 1 및 2입니다. 즉, QueueC는 다음에 처리 될 확률이 50 %이고 QueueA 및 QueueB 각각은 처리 할 확률이 25 %입니다. QueueC에 100 개의 항목이 있으면 가중치가 1, 1, 3이고 처리 할 확률은 20 %, 20 %, 60 %입니다. 이렇게하면 빈 대기열을 잊어 버리지 않습니다.
다른 점은 _minDelay
과 _maxDelay
입니다. 이 코드가 대기열에 항목이 하나 이상 있다고 생각하면 _minDelay
속도만큼 빠르게 처리합니다. 그러나 마지막으로 0 개의 항목이있는 경우 _maxDelay
보다 빠르게 처리 할 수 없습니다. 즉, 난수 생성기가 0 개의 항목이있는 대기열 (중량에 관계없이)을 당기면 처리를 건너 뛰고 다음 반복으로 넘어갑니다. (더 나은 스토리지 트랜잭션 효율성을 위해이 부분에 몇 가지 추가 최적화를 적용 할 수 있지만 이는 아주 간단합니다.하나는 본질적으로 CloudQueue
과 다른 상점 같은 큐의 대략적인 계산과 같은 일부 정보에 대한 래퍼입니다 - -)
우리는 (예) AzureQueue
로하고 AzureQueueMetaData
여기에 몇 가지 사용자 정의 클래스가없는 흥미로운 아무것도 거기에 (코드 단순화 방법).
다시 말하지만,이 "예쁜"코드는 아니지만이 코드에서는 일부 영리한 개념이 구현되고 작동합니다. 원하는 모든 이유로 그것을 사용하십시오. :)
마지막으로이 코드를 작성하면 훨씬 많은 대기열을 처리 할 수있는 단일 프로젝트를 가질 수 있습니다. 이 문제가 단순히 해결되지 않는 경우 많은 수의 인스턴스로 쉽게 확장 할 수 있으며 모든 대기열에 맞게 확장 할 수 있습니다. 최소한의 시나리오에서는이 인스턴스 하나를 배포하여 3 개의 대기열을 모니터링 할 수 있습니다. 그러나 네 번째 큐가 성능에 영향을 미치기 시작하면 (또는 고 가용성이 필요한 경우) 최대 2 개의 인스턴스를 늘리십시오. 15 개의 대기열에 도달하면 3 번째 대기열을 추가합니다. 25 개의 대기열에 4 번째 인스턴스를 추가합니다. 새로운 고객을 확보하고 시스템 전체에서 많은 대기열 요청을 처리해야합니다. 그것이 끝날 때까지 20 instanes이 하나의 역할을 스핀하고 그들을 돌려 돌려. 특히 지저분한 대기열이 있습니까? 해당 큐에 _queueWeights
콜렉션이 있음을 주석 처리하고 나머지 큐를 관리 한 다음 _queueWeights
콜렉션에서 주석 처리 된 다른 모든 큐와 함께 다시 배치 한 다음 다시 다른 인스턴스 세트에 배치하고 디버깅을 수행하십시오 a) 다른 QueueProcessors가 디버깅을 방해하고 b) 디버깅이 다른 QueueProcessor와 간섭하지 않습니다. 궁극적으로 이것은 많은 유연성과 효율성을 제공합니다.
@flybyte, 덧글을 남기지 않고 답을 수락하려면 체크 표시를 클릭하십시오. 귀하의 질문에 '대답 함'으로 표시되어 답이없는 목록에는 나타나지 않습니다. 또한 upvote (숫자로 위쪽 화살표를 클릭하십시오!)에 항상 좋습니다. – bdonlan