역할이 성공적으로 시작되어 EventProcessor 클래스 구현 (EventProcessorA 및 EventProcessorB)을 등록하는 작업자 역할에 문제가 있습니다. 그러나 둘 중 아무 것도 선택하지 않습니다 이벤트. 이것에 의해, IEventProcessor.ProcessEventsAsync 메소드가 전혀 영향을 미치지 않는다는 것을 의미합니다.Azure 클라우드 서비스 - EventProcessor IEventProcessor.ProcessEventsAsync가 실행되지 않습니다.
각 EventProcessor 클래스에는 자체 이벤트 허브가 설정되어 있습니다.
내 로그에 생성자 및 OpenAsync 메서드가 EventProcessor 클래스에 대해 호출되는 것으로 나타납니다. 실제로, 그들은 정확히 다음과 같이 4 번 호출됩니다. 그러나이 후에는 추가적인 활동이 없습니다. 나는 4 개의 파티션이 있기 때문에 4 번 추측하고 있습니다.
SimpleEventProcessorA - Constructor
SimpleEventProcessorA - OpenAsync
SimpleEventProcessorA - Constructor
SimpleEventProcessorA - OpenAsync
SimpleEventProcessorA - Constructor
SimpleEventProcessorA - OpenAsync
SimpleEventProcessorB - Constructor
SimpleEventProcessorB - Open Async
SimpleEventProcessorB - Constructor
SimpleEventProcessorB - OpenAsync
SimpleEventProcessorB - Constructor
SimpleEventProcessorB - OpenAsync
SimpleEventProcessorB - Constructor
SimpleEventProcessorB - OpenAsync
SimpleEventProcessorA - Constructor
SimpleEventProcessorA - OpenAsync
내가 그들을 킥오프 할 때 이벤트를 통해 오는 볼 수있는 푸른 포털, 또한 어떤 하나 때문에 모든 이벤트가 홍수를 제공한다 작업자 역할의 RunAsync 방법에 EventProcessorOptions 제공 오프셋.
없습니다 . EventProcessor를 등록
작업자 역할 코드 :
public class WorkerRole : RoleEntryPoint
{
private readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
private readonly ManualResetEvent runCompleteEvent = new ManualResetEvent(false);
private EventProcessorHost eventProcessorHostA;
private EventProcessorHost eventProcessorHostB;
public override void Run()
{
Trace.TraceInformation("ReportWorkerRole is running");
try
{
this.RunAsync(this.cancellationTokenSource.Token).Wait();
}
finally
{
this.runCompleteEvent.Set();
}
}
public override bool OnStart()
{
// Set the maximum number of concurrent connections
ServicePointManager.DefaultConnectionLimit = 12;
// For information on handling configuration changes
// see the MSDN topic at http://go.microsoft.com/fwlink/?LinkId=166357.
bool result = base.OnStart();
Trace.TraceInformation("ReportWorkerRole has been started");
//EventHub Processing
try
{
string eventHubNameA = CloudConfigurationManager.GetSetting("EventHubNameA");
string eventHubNameB = CloudConfigurationManager.GetSetting("EventHubNameB");
string eventHubConnectionString = CloudConfigurationManager.GetSetting("EventHubConnectionString");
string storageAccountName = CloudConfigurationManager.GetSetting("AzureStorageAccount");
string storageAccountKey = CloudConfigurationManager.GetSetting("AzureStorageAccountKey");
string storageConnectionString = CloudConfigurationManager.GetSetting("AzureStorageAccountConnectionString");
string eventProcessorHostNameA = Guid.NewGuid().ToString();
eventProcessorHostA = new EventProcessorHost(eventProcessorHostNameA, eventHubNameA, EventHubConsumerGroup.DefaultGroupName, eventHubConnectionString, storageConnectionString);
string eventProcessorHostNameB = Guid.NewGuid().ToString();
eventProcessorHostB = new EventProcessorHost(eventProcessorHostNameB, eventHubNameB, EventHubConsumerGroup.DefaultGroupName, eventHubConnectionString, storageConnectionString);
}
catch (Exception ex)
{
//Logging omitted
}
return result;
}
public override void OnStop()
{
Trace.TraceInformation("ReportWorkerRole is stopping");
this.eventProcessorHostA.UnregisterEventProcessorAsync().Wait();
this.eventProcessorHostB.UnregisterEventProcessorAsync().Wait();
this.cancellationTokenSource.Cancel();
this.runCompleteEvent.WaitOne();
base.OnStop();
Trace.TraceInformation("ReportWorkerRole has stopped");
}
private async Task RunAsync(CancellationToken cancellationToken)
{
var options = new EventProcessorOptions()
{
MaxBatchSize = 100,
PrefetchCount = 10,
ReceiveTimeOut = TimeSpan.FromSeconds(20),
//InitialOffsetProvider = (partitionId) => DateTime.Now
};
options.ExceptionReceived += (sender, e) =>
{
//Logging omitted
};
//Tried both using await and wait
eventProcessorHostA.RegisterEventProcessorAsync<SimpleEventProcessorA>(options).Wait();
eventProcessorHostB.RegisterEventProcessorAsync<SimpleEventProcessorB>(options).Wait();
//await eventProcessorHostA.RegisterEventProcessorAsync<SimpleEventProcessorA>(options);
//await eventProcessorHostB.RegisterEventProcessorAsync<SimpleEventProcessorB>(options);
// TODO: Replace the following with your own logic.
while (!cancellationToken.IsCancellationRequested)
{
Trace.TraceInformation("Working");
await Task.Delay(1000);
}
}
}
이벤트 프로세서 A (B와 같은 구성) :
class SimpleEventProcessorA : IEventProcessor
{
Stopwatch checkpointStopWatch;
//Non-relevant variables omitted
public SimpleEventProcessorA()
{
try
{
//Initializing variables using CloudConfigurationManager
//Logging omitted
}
catch (Exception ex)
{
//Logging omitted
}
}
async Task IEventProcessor.CloseAsync(PartitionContext context, CloseReason reason)
{
//Logging omitted
Console.WriteLine("Processor Shutting Down. Partition '{0}', Reason: '{1}'.", context.Lease.PartitionId, reason);
if (reason == CloseReason.Shutdown)
{
await context.CheckpointAsync();
}
}
Task IEventProcessor.OpenAsync(PartitionContext context)
{
try
{
//Logging omitted
Console.WriteLine("Initialized. Partition: '{0}', Offset: '{1}'", context.Lease.PartitionId, context.Lease.Offset);
this.checkpointStopWatch = new Stopwatch();
this.checkpointStopWatch.Start();
return Task.FromResult<object>(null);
}
catch (Exception ex)
{
//Logging omitted
return Task.FromResult<object>(null);
}
}
async Task IEventProcessor.ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
{
//Logging omitted
foreach (EventData eventData in messages)
{
try
{
//Logging omitted
Console.WriteLine(string.Format("Message received. Partition: '{0}', Data: '{1}'",
context.Lease.PartitionId, data));
}
catch (Exception ex)
{
//Logging omitted
throw;
}
}
//Call checkpoint every 5 minutes, so that worker can resume processing from 5 minutes back if it restarts.
if (this.checkpointStopWatch.Elapsed > TimeSpan.FromMinutes(5))
{
await context.CheckpointAsync();
this.checkpointStopWatch.Restart();
}
}
}
어떤 도움을 주셔서 감사합니다겠습니까, 감사합니다!
UPDATE
모든 것이 잘처럼 ... 그것은 이벤트 허브에 물건을 밀어 때 사용 된 연결 문자열이었다 보인다. entityPath
이 잘못 설정되었다
Endpoint=sb://[myEventHubName].servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;entityPath=[old-eventhub-name];SharedAccessKey=[mykey]
:
이
내 이벤트 허브 연결 문자열이었다. 내가 설정했던 이전 이벤트 허브 이름을 사용하고있었습니다. eventHubNameA 또는 eventHubNameB에 대해 설정된 값이어야합니다.
당신이 호스트와 eventprocessor 등록 코드를 공유 할 수 있었다? –
@PeterBons 업데이트 됨, 감사합니다. – RizJa
'EventProcessorOptions'에서'GeneralProcessor_ExceptionReceived' 이벤트에 가입하고'InvokeProcessorAfterReceiveTimeout' 속성을 true로 설정하십시오. – cassandrad