Java에서 Amazon SQS 대기열 관리

1. 개요

이 자습서에서는 Java SDK사용하여 Amazon의 SQS (Simple Queue Service)를 사용하는 방법을 살펴 봅니다 .

2. 전제 조건

Amazon AWS SDK for SQS를 사용하는 데 필요한 Maven 종속성, AWS 계정 설정 및 클라이언트 연결은 여기이 문서의 내용과 동일합니다.

이전 기사에서 설명한대로 AWSCredentials 의 인스턴스를 생성했다고 가정하면 계속해서 SQS 클라이언트를 생성 할 수 있습니다.

AmazonSQS sqs = AmazonSQSClientBuilder.standard() .withCredentials(new AWSStaticCredentialsProvider(credentials)) .withRegion(Regions.US_EAST_1) .build(); 

3. 대기열 생성

SQS 클라이언트를 설정하고 나면 대기열을 만드는 것은 매우 간단합니다.

3.1. 표준 대기열 생성

표준 대기열을 만드는 방법을 살펴 보겠습니다. 이렇게하려면 CreateQueueRequest 인스턴스를 만들어야합니다 .

CreateQueueRequest createStandardQueueRequest = new CreateQueueRequest("baeldung-queue"); String standardQueueUrl = sqs.createQueue(createStandardQueueRequest).getQueueUrl(); 

3.2. FIFO 대기열 생성

FIFO 생성은 표준 대기열 생성과 유사합니다. 이전과 마찬가지로 CreateQueueRequest 인스턴스를 계속 사용할 것 입니다. 이번에 만 큐 속성을 전달하고 FifoQueue 속성을 true로 설정 해야 합니다 .

Map queueAttributes = new HashMap(); queueAttributes.put("FifoQueue", "true"); queueAttributes.put("ContentBasedDeduplication", "true"); CreateQueueRequest createFifoQueueRequest = new CreateQueueRequest( "baeldung-queue.fifo").withAttributes(queueAttributes); String fifoQueueUrl = sqs.createQueue(createFifoQueueRequest) .getQueueUrl(); 

4. 큐에 메시지 게시

대기열이 설정되면 메시지 전송을 시작할 수 있습니다.

4.1. 표준 대기열에 메시지 게시

표준 대기열에 메시지를 보내 려면 SendMessageRequest 의 인스턴스를 만들어야합니다 .

그런 다음이 요청에 메시지 속성 맵을 첨부합니다.

Map messageAttributes = new HashMap(); messageAttributes.put("AttributeOne", new MessageAttributeValue() .withStringValue("This is an attribute") .withDataType("String")); SendMessageRequest sendMessageStandardQueue = new SendMessageRequest() .withQueueUrl(standardQueueUrl) .withMessageBody("A simple message.") .withDelaySeconds(30) .withMessageAttributes(messageAttributes); sqs.sendMessage(sendMessageStandardQueue); 

withDelaySeconds () 메시지가 큐에 도착해야하는 시간 후 지정합니다.

4.2. FIFO 대기열에 메시지 게시

이 경우 유일한 차이점 은 메시지가 속한 그룹지정해야 한다는 것입니다 .

SendMessageRequest sendMessageFifoQueue = new SendMessageRequest() .withQueueUrl(fifoQueueUrl) .withMessageBody("Another simple message.") .withMessageGroupId("baeldung-group-1") .withMessageAttributes(messageAttributes);

위의 코드 예제에서 볼 수 있듯이 withMessageGroupId () 를 사용하여 그룹을 지정합니다 .

4.3. 대기열에 여러 메시지 게시

단일 요청을 사용하여 여러 메시지를 대기열에 게시 할 수도 있습니다 . 우리의 목록을 만듭니다 SendMessageBatchRequestEntry 우리의 인스턴스를 사용하여 보내드립니다 SendMessageBatchRequest를 :

List  messageEntries = new ArrayList(); messageEntries.add(new SendMessageBatchRequestEntry() .withId("id-1") .withMessageBody("batch-1") .withMessageGroupId("baeldung-group-1")); messageEntries.add(new SendMessageBatchRequestEntry() .withId("id-2") .withMessageBody("batch-2") .withMessageGroupId("baeldung-group-1")); SendMessageBatchRequest sendMessageBatchRequest = new SendMessageBatchRequest(fifoQueueUrl, messageEntries); sqs.sendMessageBatch(sendMessageBatchRequest);

5. 대기열에서 메시지 읽기

ReceiveMessageRequest 인스턴스 에서 receiveMessage () 메서드를 호출하여 큐에서 메시지를 수신 할 수 있습니다 .

ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(fifoQueueUrl) .withWaitTimeSeconds(10) .withMaxNumberOfMessages(10); List sqsMessages = sqs.receiveMessage(receiveMessageRequest).getMessages(); 

withMaxNumberOfMessages ()를 사용 하여 대기열에서 가져올 메시지 수를 지정합니다 . 최대 값은 10 입니다.

withWaitTimeSeconds () 메소드 는 긴 폴링을 가능하게 합니다. 긴 폴링은 SQS로 보내는 수신 메시지 요청 수를 제한하는 방법입니다.

간단히 말해, 메시지를 검색하기 위해 지정된 시간 (초)까지 기다릴 것임을 의미합니다. 해당 기간 동안 대기열에 메시지가 없으면 요청은 비어있는 상태로 반환됩니다. 해당 시간 동안 메시지가 대기열에 도착하면 반환됩니다.

주어진 메시지의 속성과 본문을 얻을 수 있습니다 .

sqsMessages.get(0).getAttributes(); sqsMessages.get(0).getBody();

6. 대기열에서 메시지 삭제

메시지를 삭제하려면 DeleteMessageRequest를 사용합니다 .

sqs.deleteMessage(new DeleteMessageRequest() .withQueueUrl(fifoQueueUrl) .withReceiptHandle(sqsMessages.get(0).getReceiptHandle())); 

7. 배달 못한 편지 대기열

배달 못한 편지 대기열은 기본 대기열과 동일한 유형 이어야합니다. 기본 대기열이 FIFO이면 FIFO 여야하고 기본 대기열이 표준이면 표준이어야합니다. 이 예에서는 표준 대기열을 사용합니다.

가장 먼저해야 할 일은 배달 못한 편지 대기열이 될 항목만드는 것입니다.

String deadLetterQueueUrl = sqs.createQueue("baeldung-dead-letter-queue").getQueueUrl(); 

다음으로 새로 생성 된 대기열의 ARN (Amazon 리소스 이름)을 가져옵니다.

GetQueueAttributesResult deadLetterQueueAttributes = sqs.getQueueAttributes( new GetQueueAttributesRequest(deadLetterQueueUrl) .withAttributeNames("QueueArn")); String deadLetterQueueARN = deadLetterQueueAttributes.getAttributes() .get("QueueArn"); 

마지막으로 새로 생성 된 큐를 원래 표준 큐의 배달 못한 편지 큐로 설정합니다.

SetQueueAttributesRequest queueAttributesRequest = new SetQueueAttributesRequest() .withQueueUrl(standardQueueUrl) .addAttributesEntry("RedrivePolicy", "{\"maxReceiveCount\":\"2\", " + "\"deadLetterTargetArn\":\"" + deadLetterQueueARN + "\"}"); sqs.setQueueAttributes(queueAttributesRequest); 

The JSON packet we set in the addAttributesEntry() method when building our SetQueueAttributesRequest instance contains the information we need: the maxReceiveCount is 2, which means that if a message is received this many times, it's assumed to haven't been processed correctly, and is sent to our dead letter queue.

The deadLetterTargetArn attribute points our standard queue to our newly created dead letter queue.

8. Monitoring

We can check how many messages are currently in a given queue, and how many are in flight with the SDK. First, we'll need to create a GetQueueAttributesRequest.

From there we'll check the state of the queue:

GetQueueAttributesRequest getQueueAttributesRequest = new GetQueueAttributesRequest(standardQueueUrl) .withAttributeNames("All"); GetQueueAttributesResult getQueueAttributesResult = sqs.getQueueAttributes(getQueueAttributesRequest); System.out.println(String.format("The number of messages on the queue: %s", getQueueAttributesResult.getAttributes() .get("ApproximateNumberOfMessages"))); System.out.println(String.format("The number of messages in flight: %s", getQueueAttributesResult.getAttributes() .get("ApproximateNumberOfMessagesNotVisible")));

Amazon Cloud Watch를 사용하여보다 심층적 인 모니터링을 수행 할 수 있습니다.

9. 결론

이 기사에서는 AWS Java SDK를 사용하여 SQS 대기열관리 하는 방법을 살펴 보았습니다 .

평소처럼이 기사에 사용 된 모든 코드 샘플은 GitHub에서 찾을 수 있습니다.