0

하늘색 서비스 버스 주제를 사용 중입니다. 나는 큰 메시지를 나누어서 큰 메시지를 나누어서 작은 - 작은 메시지를 보내고 sessionid와 분할 된 명령으로 보내고있다. 내 수신자가 이벤트 기반 아키텍처를 갖기를 바랍니다. 모든 메시지를 동일한 sessionid와 함께 받아야하고 적절한 분할 order.bellow 함께 집계해야 내 코드입니다. 하지만 처음으로 나는 벨로우즈 코드에서 메시지를 받고있다. 두 번째 메시지에서 시간 초과.Azure 서비스 버스 주제 이벤트 구동 아키텍처 모델과 세션이있는 메시지 수신

  public class CRMESBListener : RoleEntryPoint 
      { 
       private readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource(); 
       private readonly ManualResetEvent runCompleteEvent = new ManualResetEvent(false); 

      public override void Run() 
      { 
       Trace.TraceInformation("NPRO.Domain.CRMESBListener.WorkerRole is running"); 
       try 
       { 
        DBMessageListener dbMessageListener = DBMessageListener.GetDBMessageListner(); 
        dbMessageListener.Listen(); 
        runCompleteEvent.WaitOne(); 
        //this.RunAsync(this.cancellationTokenSource.Token).Wait(); 
       } 
       finally 
       { 
        this.runCompleteEvent.Set(); 
       } 
      } 

      public override bool OnStart() 
      { 
       // Set the maximum number of concurrent connections 
       ServicePointManager.DefaultConnectionLimit = 12; 

       // For information on handling configuration changes 
       // see the MSDN topic at http://go.microsoft.com/fwlink/?LinkId=166357. 


       bool result = base.OnStart(); 
       Bootstrapper.Init(); 

       Trace.TraceInformation("NPRO.Domain.CRMESBListener.WorkerRole has been started"); 

       return result; 
      } 

      public override void OnStop() 
      { 
       Trace.TraceInformation("NPRO.Domain.CRMESBListener.WorkerRole is stopping"); 

       this.cancellationTokenSource.Cancel(); 
       this.runCompleteEvent.WaitOne(); 

       base.OnStop(); 

       Trace.TraceInformation("NPRO.Domain.CRMESBListener.WorkerRole has stopped"); 
      } 

      private async Task RunAsync(CancellationToken cancellationToken) 
      { 
       // TODO: Replace the following with your own logic. 
       while (!cancellationToken.IsCancellationRequested) 
       { 
        Trace.TraceInformation("Working"); 
        await Task.Delay(1000); 
       } 
      } 
     } 













     public class DBMessageListener 
     { 
      #region Member Variables 

      private static DBMessageListener dbMessageListner; 
      private static object lockObject = new object(); 
      private TopicSubscribeClientWrapper accountTopicClient; 

      private NamespaceManager namespaceManager; 
      private OnMessageOptions eventDrivenMessagingOptions; 

      private int crmIntegrationUserID = Common.CrmCurrentUser.UserID; 

      #endregion Member Variables 

      #region Constructors 

      private DBMessageListener() 
      { 
       string subscriptionName = "AllMessages"; 
       namespaceManager = new NamespaceManager(ConfigurationSettings.ServiceBus_Pub_Sub_ConnectionString); 

       if (!namespaceManager.SubscriptionExists(ConfigurationSettings.ServiceBus_Pub_Sub_Internal_Account_ServicePath, subscriptionName)) 
       { 
        namespaceManager.CreateSubscription(ConfigurationSettings.ServiceBus_Pub_Sub_Internal_Account_ServicePath, subscriptionName); 
       } 
       accountTopicClient = new TopicSubscribeClientWrapper(ConfigurationSettings.ServiceBus_Pub_Sub_ConnectionString, ConfigurationSettings.ServiceBus_Pub_Sub_Internal_Account_ServicePath); 
       accountTopicClient.SubscriptionName = subscriptionName; 



       eventDrivenMessagingOptions = new OnMessageOptions 
       { 
        AutoComplete = true 
       }; 

       eventDrivenMessagingOptions.ExceptionReceived += OnExceptionReceived; 
       eventDrivenMessagingOptions.MaxConcurrentCalls = 5; 
      } 

      #endregion Constructors 

      #region Methods 

      private async System.Threading.Tasks.Task OnMessageArrived(BrokeredMessage message) 
      { 
       if (message != null) 
       { 
        try 
        { 
         await ProcessDBMessage(message.GetBody<ServiceBusMessage>()); 
        } 
        catch (Exception ex) 
        { 
         //log exception 
        } 
       } 

      } 

      private void OnExceptionReceived(object sender, ExceptionReceivedEventArgs e) 
      { 
       if (e != null && e.Exception != null) 
       { 

       } 
      } 

      private async System.Threading.Tasks.Task ProcessDBMessage(ServiceBusMessage message) 
      { 

    //process message   
      } 

      public static DBMessageListener GetDBMessageListner() 
      { 
       if (dbMessageListner == null) 
       { 
        lock (lockObject) 
        { 
         if (dbMessageListner == null) 
         { 
          dbMessageListner = new DBMessageListener(); 
         } 
        } 
       } 

       return dbMessageListner; 
      } 

      public void Listen() 
      { 
       accountTopicClient.OnMessageAsync(async message => await OnMessageArrived(message), eventDrivenMessagingOptions); 

      } 

      #endregion Methods 
     } 


public class TopicSubscribeClientWrapper : IServiceBusClientWrapper 
    { 
     #region Member Variables 

     private readonly string _connectionString; 
     private readonly string _topicName; 
     private readonly TopicClient _topicClient; 
     private SubscriptionClient _subscriptionClient; 

     #endregion Member Variables 

     #region Properties 

     public string SubscriptionName { get; set; } 

     #endregion Properties 

     #region Constructors 

     public TopicSubscribeClientWrapper(string connectionString, string topicName) 
     { 
      _connectionString = connectionString; 
      _topicName = topicName; 
      _topicClient = TopicClient.CreateFromConnectionString(connectionString, topicName); 
     } 

     #endregion Constructors 

     #region Event Handlers 

     public void OnMessageAsync(Func<BrokeredMessage, Task> onMessageCallback, OnMessageOptions onMessageOptions) 
     { 

      _subscriptionClient = SubscriptionClient.CreateFromConnectionString(_connectionString, _topicName, SubscriptionName); 

      // _subscriptionClient.OnMessageAsync(onMessageCallback, onMessageOptions); 

      MemoryStream largeMessageStream = new MemoryStream(); 
      MessageSession session = _subscriptionClient.AcceptMessageSession(); 

      while (true) 
      { 
       BrokeredMessage subMessage = session.Receive(TimeSpan.FromSeconds(5)); 

       if (subMessage != null) 
       { 
        Stream subMessageStream = subMessage.GetBody<Stream>(); 
        subMessageStream.CopyTo(largeMessageStream); 

        subMessage.Complete(); 
        //Console.Write("."); 
       } 
       else 
       { 
        //Console.WriteLine("Done!"); 
        break; 
       } 
      } 

      BrokeredMessage largeMessage = new BrokeredMessage(largeMessageStream, true); 
      var message = onMessageCallback.Method.GetParameters(); 
      message.SetValue(largeMessage, 1); 
      _subscriptionClient.OnMessageAsync(onMessageCallback, onMessageOptions); 

     } 

     #endregion Event Handlers 

     #region Methods 

     public Task SendAsync(BrokeredMessage message) 
     { 
      return _topicClient.SendAsync(message); 
     } 

     public void Close() 
     { 
      if (_subscriptionClient != null) 
      { 
       _subscriptionClient.Close(); 
      } 

      _topicClient.Close(); 
     } 

     #endregion Methods 
    } 

답변

1

다른 경로를 제안합니다. 큰 메시지를 전달하기 위해 메시지 세션을 만들지 말고 특별히이 문제를 다루는 claim check pattern (대용량 첨부 파일)을 사용하십시오. 저장소 BLOB에 데이터를 작성하고 URI와 함께 메시지를 보냅니다. 페이로드를 청크로 보내려고 시도하는 것보다 blob을 저장/복원하는 것이 훨씬 간단합니다. 또한 시스템을 모니터링하는 것이 더 쉽습니다 (하나 이상의 모양과 관련된 성공/실패 메시지 하나가 실패한 경우) 세션이나 특별한 것을 사용해야합니다.