1

amazon sqs 대기열의 메시지와 지금까지의 작업 벌금을 소모하도록 통합 플로우를 설정하려고합니다. 하지만 분당 초당 메시지 수를 줄이려고합니다. 예 : 분당 20 개의 메시지. 여기 스프링 통합을 사용하여 sqs 대기열 사용량을 조절하는 방법

내 SQL 리스너 콩

@Bean 
    public MessageProducer mySqsMessageDrivenChannelAdapter() { 
     SqsMessageDrivenChannelAdapter adapter = new SqsMessageDrivenChannelAdapter(this.amazonSqs, queueName); 
     adapter.setMessageDeletionPolicy(SqsMessageDeletionPolicy.ON_SUCCESS); 

     adapter.setVisibilityTimeout(TIMEOUT_VISIBILITY); 
     adapter.setWaitTimeOut(TIMEOUT_MESSAGE_WAIT); 
     adapter.setMaxNumberOfMessages(prefetch); 
     adapter.setOutputChannel(processMessageChannel()); 
     return adapter; 
    } 

당신이, 내가 여론 조사 당 가져올 메시지의 최대 수를 설정하고있어,하지만 어떻게 여론 조사 사이의 지연 시간을 설정할 볼 수 있듯이의 정의인가?

일반 jms 큐에서 사용자 지정 폴러를 사용하여 JMS.inboundAdapter를 사용할 수 있지만 SqsMessageDrivenChannelAdapter를 사용하면 폴링 타이머 값을 설정할 수 없습니다.

어쩌면 SqsMessageDrivenChannelAdapter 이외의 MessageProducer를 사용할 수 있습니까?

sqs를 사용하여 JMS.inboundAdapter를 설정할 수 있습니까?

+0

솔루션은 https://stackoverflow.com/questions/29667321/polling-interval-for-jms-messagelistener-with-sqs-provider?rq=1에서 확인할 수 있습니다. 이 경우이 질문은 중복 된 것으로 간주 될 수 있지만 여기서 핵심은 스프링 통합을 사용한다는 것입니다. 나는 그것을 해결할 수있는 해결책을 제시 할 것입니다. –

답변

1

스프링 통합 SqsMessageDrivenChannelAdapter은 메시지 드라이버 활성 구성 요소입니다. 이 프로젝트는 Springh Cloud AWS 프로젝트의 SimpleMessageListenerContainer을 기반으로합니다.이 프로젝트는 장기간 실행되는 while() 루프를 호출하여 AmazonSQS.receiveMessage()을 호출합니다. 그 루프의 논리는 너무 복잡되지 않습니다

try { 
    ReceiveMessageResult receiveMessageResult = getAmazonSqs().receiveMessage(this.queueAttributes.getReceiveMessageRequest()); 
    CountDownLatch messageBatchLatch = new CountDownLatch(receiveMessageResult.getMessages().size()); 
    for (Message message : receiveMessageResult.getMessages()) { 
     if (isQueueRunning()) { 
      MessageExecutor messageExecutor = new MessageExecutor(this.logicalQueueName, message, this.queueAttributes); 
       getTaskExecutor().execute(new SignalExecutingRunnable(messageBatchLatch, messageExecutor)); 
     } else { 
      messageBatchLatch.countDown(); 
     } 
    } 
    try { 
     messageBatchLatch.await(); 
    } catch (InterruptedException e) { 
     Thread.currentThread().interrupt(); 
    } 
} catch (Exception e) { 

우리가 messageBatchLatch이 만들고 루프 후 대기시피. 각 메시지는 자신의 SignalExecutingRunnable에 의해 처리됩니다. 은 MessageExecutor의 끝에 있습니다. 따라서, 목표 서비스 방법에서 인위적인 Thread.sleep()을 사용하여 SQS 폴링 간격을 좀 더 늘리려면 무엇을하고 싶습니까?

는하지만 귀하의 요청을 듣고 우리는 실제로 같은 것을 추가해야한다 : 나는 KinesisMessageDrivenChannelAdapter 위해 이런 짓을

/** 
* The sleep interval in milliseconds used in the main loop between shards polling cycles. 
* Defaults to {@code 1000} minimum {@code 250}. 
* @param idleBetweenPolls the interval to sleep between shards polling cycles. 
*/ 
public void setIdleBetweenPolls(int idleBetweenPolls) { 
    this.idleBetweenPolls = Math.max(250, idleBetweenPolls); 
} 

을, 그러나 여기에서 우리는 SimpleMessageListenerContainer을 위해 그렇게 봄 클라우드 AWS를 요청해야합니다.