2014-12-19 3 views
0

메시지를 푸른 색 이벤트 허브에 보냅니다. 그러나 이벤트 허브에서 메시지를 다운로드 할 수 없습니다.구독자만으로 푸른 색 이벤트 허브에서 메시지를 소비하는 방법은 무엇입니까?

enter code here 
string eventHubConnectionString = "<connection string>"; 
string eventHubName = "<event Hub name>"; 
string storageAccountName = "<event hub storage>"; 
string storageAccountKey = "<storage Key>"; 
string storageConnectionString = string.Format("DefaultEndpointsProtocol=https;AccountName={0};AccountKey={1}",storageAccountName, storageAccountKey); 


EventProcessorHost eventProcessorHost = new EventProcessorHost("message", eventHubName, EventHubConsumerGroup.DefaultGroupName, eventHubConnectionString, storageConnectionString); 
      eventProcessorHost.RegisterEventProcessorAsync<SimpleEventProcessor>().Wait(); 


IEventProcessor: 
enter code here 
class SimpleEventProcessor : IEventProcessor 

{ 


    Stopwatch checkpointStopWatch; 

    async Task IEventProcessor.CloseAsync(PartitionContext context, CloseReason reason) 
    { 
     Console.WriteLine(string.Format("Processor Shuting Down. Partition '{0}', Reason: '{1}'.", context.Lease.PartitionId, reason.ToString())); 
     if (reason == CloseReason.Shutdown) 
     { 
      await context.CheckpointAsync(); 
     } 
    } 

    Task IEventProcessor.OpenAsync(PartitionContext context) 
    { 
     Console.WriteLine(string.Format("SimpleEventProcessor initialize. Partition: '{0}', Offset: '{1}'", context.Lease.PartitionId, context.Lease.Offset)); 
     this.checkpointStopWatch = new Stopwatch(); 
     this.checkpointStopWatch.Start(); 
     return Task.FromResult<object>(null); 
    } 

    async Task IEventProcessor.ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages) 
    { 
     foreach (EventData eventData in messages) 
     { 
      string data = Encoding.UTF8.GetString(eventData.GetBytes()); 

      Console.WriteLine(string.Format("Message received. Partition: '{0}', Data: '{1}'", 
       context.Lease.PartitionId, data)); 
     } 

     //Call checkpoint every 5 minutes, so that worker can resume processing from the 5 minutes back if it restarts. 
     if (this.checkpointStopWatch.Elapsed > TimeSpan.FromMinutes(5)) 
     { 
      await context.CheckpointAsync(); 
      lock (this) 
      { 
       this.checkpointStopWatch.Reset(); 
      } 
     } 
    } 

} 

다음 오류가 표시됩니다. 누적 예외 처리. 하나 이상의 오류가 발생했습니다. 메시지 세부 정보 : 해당 호스트가 없습니다. EventProcessor 호스트 이름은 무엇입니까?

다음 줄에는 오류가 표시됩니다. eventProcessorHost.RegisterEventProcessorAsync(). Wait();

IEventprocessor를 호출하지 않습니다. 이벤트 허브에서 메시지를 소비하는 다른 방법이 있습니까?

답변

2

당신은 예외를 추적하고 디버깅하는 동안 내부 예외를 찾을 수 있으므로 실제 이유는 무엇인지 실마리를 제공해야합니다. 나는이 바보 같은 예외도 있었고 EventProcessorHost와 함께 eventHubName 변수를 사용할 때 소문자에서, 심지어 경우

를 (단지 문자/숫자와 포함 된 '-'- '가'지원되지 않습니다 eventHubName은 문자로 시작해야합니다 문자 또는 번호로 따라야한다는 것을 의미한다.) 이벤트 허브 이름은 "myEventHub123"이고 변수는 다음과 같습니다.

string eventHubName = "myeventhub123"; 

희망이 누군가를 도울 것입니다 ..

0

here에 위치한 샘플 코드를 사용하여 이벤트 프로세서를 성공적으로 구축했습니다.

제공되지 않았기 때문에 연결 문자열/eventhub/storage 계정 이름의 오타와 관련 될 수 있기 때문에 오류가 무엇인지 샘플 코드에서 알기가 어렵습니다. 민감한 데이터가있는 연결 문자열).

예제가 연결 문자열에서 이벤트 허브 정보를로드하는 방식과 사용자가 제공 한 코드가 Evenhub Client를 통해 정보가 제공되는 방식이 다른 점이 다릅니다. 아래 예와 같이 EventProcessorHost 빌드 방법을 업데이트하십시오.

EventHubClient eventHubClient = EventHubClient.CreateFromConnectionString(eventHubConnectionString, this.eventHubName); 

    // Get the default Consumer Group 
    defaultConsumerGroup = eventHubClient.GetDefaultConsumerGroup(); 
    string blobConnectionString = ConfigurationManager.AppSettings["AzureStorageConnectionString"]; // Required for checkpoint/state 
    eventProcessorHost = new EventProcessorHost("singleworker", eventHubClient.Path, defaultConsumerGroup.GroupName, this.eventHubConnectionString, blobConnectionString); 
    eventProcessorHost.RegisterEventProcessorAsync<SimpleEventProcessor>().Wait();