1

역할이 성공적으로 시작되어 EventProcessor 클래스 구현 (EventProcessorA 및 EventProcessorB)을 등록하는 작업자 역할에 문제가 있습니다. 그러나 둘 중 아무 것도 선택하지 않습니다 이벤트. 이것에 의해, IEventProcessor.ProcessEventsAsync 메소드가 전혀 영향을 미치지 않는다는 것을 의미합니다.Azure 클라우드 서비스 - EventProcessor IEventProcessor.ProcessEventsAsync가 실행되지 않습니다.

각 EventProcessor 클래스에는 자체 이벤트 허브가 설정되어 있습니다.

내 로그에 생성자 및 OpenAsync 메서드가 EventProcessor 클래스에 대해 호출되는 것으로 나타납니다. 실제로, 그들은 정확히 다음과 같이 4 번 호출됩니다. 그러나이 후에는 추가적인 활동이 없습니다. 나는 4 개의 파티션이 있기 때문에 4 번 추측하고 있습니다.

SimpleEventProcessorA - Constructor 
SimpleEventProcessorA - OpenAsync 
SimpleEventProcessorA - Constructor 
SimpleEventProcessorA - OpenAsync 
SimpleEventProcessorA - Constructor 
SimpleEventProcessorA - OpenAsync 
SimpleEventProcessorB - Constructor 
SimpleEventProcessorB - Open Async 
SimpleEventProcessorB - Constructor 
SimpleEventProcessorB - OpenAsync 
SimpleEventProcessorB - Constructor 
SimpleEventProcessorB - OpenAsync 
SimpleEventProcessorB - Constructor 
SimpleEventProcessorB - OpenAsync 
SimpleEventProcessorA - Constructor 
SimpleEventProcessorA - OpenAsync 

내가 그들을 킥오프 할 때 이벤트를 통해 오는 볼 수있는 푸른 포털, 또한 어떤 하나 때문에 모든 이벤트가 홍수를 제공한다 작업자 역할의 RunAsync 방법에 EventProcessorOptions 제공 오프셋.

없습니다 . EventProcessor를 등록

작업자 역할 코드 :

public class WorkerRole : RoleEntryPoint 
{ 
    private readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource(); 
    private readonly ManualResetEvent runCompleteEvent = new ManualResetEvent(false); 

    private EventProcessorHost eventProcessorHostA; 
    private EventProcessorHost eventProcessorHostB; 

    public override void Run() 
    { 
     Trace.TraceInformation("ReportWorkerRole is running"); 

     try 
     { 
      this.RunAsync(this.cancellationTokenSource.Token).Wait(); 
     } 
     finally 
     { 
      this.runCompleteEvent.Set(); 
     } 
    } 

    public override bool OnStart() 
    { 
     // Set the maximum number of concurrent connections 
     ServicePointManager.DefaultConnectionLimit = 12; 

     // For information on handling configuration changes 
     // see the MSDN topic at http://go.microsoft.com/fwlink/?LinkId=166357. 

     bool result = base.OnStart(); 

     Trace.TraceInformation("ReportWorkerRole has been started"); 

     //EventHub Processing 
     try 
     { 
      string eventHubNameA = CloudConfigurationManager.GetSetting("EventHubNameA"); 
      string eventHubNameB = CloudConfigurationManager.GetSetting("EventHubNameB"); 
      string eventHubConnectionString = CloudConfigurationManager.GetSetting("EventHubConnectionString"); 

      string storageAccountName = CloudConfigurationManager.GetSetting("AzureStorageAccount"); 
      string storageAccountKey = CloudConfigurationManager.GetSetting("AzureStorageAccountKey"); 
      string storageConnectionString = CloudConfigurationManager.GetSetting("AzureStorageAccountConnectionString"); 

      string eventProcessorHostNameA = Guid.NewGuid().ToString(); 
      eventProcessorHostA = new EventProcessorHost(eventProcessorHostNameA, eventHubNameA, EventHubConsumerGroup.DefaultGroupName, eventHubConnectionString, storageConnectionString); 

      string eventProcessorHostNameB = Guid.NewGuid().ToString(); 
      eventProcessorHostB = new EventProcessorHost(eventProcessorHostNameB, eventHubNameB, EventHubConsumerGroup.DefaultGroupName, eventHubConnectionString, storageConnectionString); 

     } 
     catch (Exception ex) 
     { 
      //Logging omitted 
     } 

     return result; 
    } 

    public override void OnStop() 
    { 
     Trace.TraceInformation("ReportWorkerRole is stopping"); 

     this.eventProcessorHostA.UnregisterEventProcessorAsync().Wait(); 
     this.eventProcessorHostB.UnregisterEventProcessorAsync().Wait(); 

     this.cancellationTokenSource.Cancel(); 
     this.runCompleteEvent.WaitOne(); 

     base.OnStop(); 

     Trace.TraceInformation("ReportWorkerRole has stopped"); 
    } 

    private async Task RunAsync(CancellationToken cancellationToken) 
    { 

     var options = new EventProcessorOptions() 
     { 
      MaxBatchSize = 100, 
      PrefetchCount = 10, 
      ReceiveTimeOut = TimeSpan.FromSeconds(20), 
      //InitialOffsetProvider = (partitionId) => DateTime.Now 
     }; 

     options.ExceptionReceived += (sender, e) => 
     { 
      //Logging omitted 
     }; 

     //Tried both using await and wait 
     eventProcessorHostA.RegisterEventProcessorAsync<SimpleEventProcessorA>(options).Wait(); 
     eventProcessorHostB.RegisterEventProcessorAsync<SimpleEventProcessorB>(options).Wait(); 
     //await eventProcessorHostA.RegisterEventProcessorAsync<SimpleEventProcessorA>(options); 
     //await eventProcessorHostB.RegisterEventProcessorAsync<SimpleEventProcessorB>(options); 

     // TODO: Replace the following with your own logic. 
     while (!cancellationToken.IsCancellationRequested) 
     { 
      Trace.TraceInformation("Working"); 
      await Task.Delay(1000); 
     } 
    } 
} 

이벤트 프로세서 A (B와 같은 구성) :

class SimpleEventProcessorA : IEventProcessor 
{ 

    Stopwatch checkpointStopWatch; 

    //Non-relevant variables omitted 

    public SimpleEventProcessorA() 
    { 
     try 
     { 
      //Initializing variables using CloudConfigurationManager 

      //Logging omitted 
     } 
     catch (Exception ex) 
     { 
      //Logging omitted 
     } 
    } 

    async Task IEventProcessor.CloseAsync(PartitionContext context, CloseReason reason) 
    { 
     //Logging omitted 

     Console.WriteLine("Processor Shutting Down. Partition '{0}', Reason: '{1}'.", context.Lease.PartitionId, reason); 
     if (reason == CloseReason.Shutdown) 
     { 
      await context.CheckpointAsync(); 
     } 
    } 

    Task IEventProcessor.OpenAsync(PartitionContext context) 
    { 
     try 
     { 
      //Logging omitted 

      Console.WriteLine("Initialized. Partition: '{0}', Offset: '{1}'", context.Lease.PartitionId, context.Lease.Offset); 
      this.checkpointStopWatch = new Stopwatch(); 
      this.checkpointStopWatch.Start(); 

      return Task.FromResult<object>(null); 
     } 
     catch (Exception ex) 
     { 
      //Logging omitted 

      return Task.FromResult<object>(null); 
     } 
    } 

    async Task IEventProcessor.ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages) 
    { 
     //Logging omitted 

     foreach (EventData eventData in messages) 
     { 
      try 
      { 
       //Logging omitted 

       Console.WriteLine(string.Format("Message received. Partition: '{0}', Data: '{1}'", 
        context.Lease.PartitionId, data)); 
      } 
      catch (Exception ex) 
      { 
       //Logging omitted 

       throw; 
      } 
     } 

     //Call checkpoint every 5 minutes, so that worker can resume processing from 5 minutes back if it restarts. 
     if (this.checkpointStopWatch.Elapsed > TimeSpan.FromMinutes(5)) 
     { 
      await context.CheckpointAsync(); 
      this.checkpointStopWatch.Restart(); 
     } 
    } 

} 

어떤 도움을 주셔서 감사합니다겠습니까, 감사합니다!

UPDATE

모든 것이 잘처럼 ... 그것은 이벤트 허브에 물건을 밀어 때 사용 된 연결 문자열이었다 보인다. entityPath이 잘못 설정되었다

Endpoint=sb://[myEventHubName].servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;entityPath=[old-eventhub-name];SharedAccessKey=[mykey] 

:

내 이벤트 허브 연결 문자열이었다. 내가 설정했던 이전 이벤트 허브 이름을 사용하고있었습니다. eventHubNameA 또는 eventHubNameB에 대해 설정된 값이어야합니다.

+0

당신이 호스트와 eventprocessor 등록 코드를 공유 할 수 있었다? –

+0

@PeterBons 업데이트 됨, 감사합니다. – RizJa

+0

'EventProcessorOptions'에서'GeneralProcessor_ExceptionReceived' 이벤트에 가입하고'InvokeProcessorAfterReceiveTimeout' 속성을 true로 설정하십시오. – cassandrad

답변

0

다른 사람들이 혜택을 누릴 수 있도록 질문에 대답하십시오. 대답은 "업데이트"섹션의 질문에 자세히 설명되어 있지만 여기에서 다시 알려 드리겠습니다.

entityPath이 잘못 설정되었습니다. 내가 설정했던 이전 이벤트 허브 이름을 사용하고있었습니다. eventHubNameA 또는 eventHubNameB에 대해 설정된 값이어야합니다. 대신 Endpoint=sb://[myEventHubName].servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;entityPath=[old-eventhub-name];SharedAccessKey=[mykey]

그것은해야이 Endpoint=sb://[myEventHubName].servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;entityPath=[eventHubNameA];SharedAccessKey=[mykey]