AWS SQS FIFO를 활용한 서버리스 아키텍처 구축하기 - 1
- -
최근 업무에서 SQS 를 활용한 Serverless 아키텍처를 구축하는 프로젝트를 진행했습니다.
이번 포스팅에서는 해당 프로젝트에서 배운 인사이트를 활용하여 API Gateway - SQS FIFO - Lambda 로 이어지는
Serverless 기반 Message Queue 서비스 아키텍처를 구축해보겠습니다.
1. 개요
이번 포스팅에서는 SQS 를 단순히 생성하여 콘솔에서 메시지를 저장하는 것에 그치지 않고,
앞 뒤로 API Gateway , Lambda 를 연결하여 SQS 서비스를 어떻게 구현할 수 있는 지 알아보겠습니다.
이번 글에서 사용할 AWS 서비스는 다음과 같습니다.
- API Gateway
- SQS
- Lambda
- EC2
- DynamoDB
- Route53
1.1. 아키텍처

아키텍처 상에서는 /queue-1 , /queue-2 가 분기 처리 되어 있으나
구현 방법은 동일하기 때문에 /queue-1 에 대한 방법을 대표로 알아볼 예정입니다.
1.2. 서비스에 대한 간략한 소개
SQS
SQS 는 AWS 의 대표적인 Managed Queue 서비스입니다.
다만 SQS 는 1:1 통신만을 허용하기 때문에,
다대일 통신은 SNS - SQS 의 Pub-Sub 구조를 사용하며,
다대다 통신은 MSK 를 사용하게 됩니다.
API Gateway
API Gateway 는 말 그대로 API 에 대한 단일 진입점을 제공하는 AWS Managed Service 입니다.
API 접근 제어, 제한 등의 기능을 제공하고 있습니다.
현 구조에서 API Gateway 는 SQS 로 전송하는 Queue Data 의 진입점 역할을 수행합니다.
Lambda
AWS 의 대표적인 Serverless 서비스입니다.
현 구조에서 SQS 에 저장된 Queue Data 를 꺼내어 EC2 서버로 전송하는 역할을 수행합니다.
EC2
현 구조에서 SQS 로 전송된 데이터를 처리하는 서버 역할을 수행합니다.
DynamoDB
보통 SQS 전송이 실패할 경우를 대비하여 DLQ 설정을 하게 됩니다.
DLQ 에서 다시 SQS 에 데이터를 넣는 로직도 가능하나,
엄격한 FIFO 로직이 요구되는 경우 DynamoDB 를 활용하여 실패한 내용을 저장하여 관리자가 직접 처리하도록하는 방안도 있습니다.
Route53
API Gateway 에 Domain 을 부여하기 위함이며, 해당 서비스는 반드시 필요하지는 않습니다.
2. SQS 구축
AWS SQS 는 표준 대기열과 FIFO 대기열을 지원합니다.
저는 메시지 순서가 유지되는 FIFO 대기열을 사용할 예정입니다.
2.1. DLQ 생성
생성하는 SQS 간 Dependency 가 존재하기 때문에, 편의를 위해 DLQ 먼저 생성해줍니다.
DLQ 를 나중에 생성해도 되나, 한 번 더 작업해야 하는 번거로움이 있습니다.
이를 방지하기 위해 DLQ 부터 생성하겠습니다.
SQS FIFO 를 사용하기 때문에 DLQ 또한 FIFO 대기열을 사용합니다.
주의해야할 점은 FIFO 대기열은 이름에 .fifo 가 반드시 존재해야 합니다.

2.2. SQS 생성
실제 구현할 SQS 를 생성합니다.
DLQ 와 동일하게 FIFO 대기열로 생성하되, 배달못한 편지 대기열에 앞서 생성한 DLQ 를 입력합니다.

SQS 는 매우 간단하게 생성할 수 있었습니다.
그럼 SQS 앞단에서 데이터를 받아서 전송해줄 API Gateway 를 만들어 보겠습니다.
3. API Gateway 구축
API Gateway 는 REST API 를 사용하여 생성합니다.
External 과 Internal 이 존재하는데, 외부 서비스를 통해 API Call 을 받을 예정이므로 External 로 생성합니다.

3.1. API Gateway 를 위한 IAM Role 생성
API Gateway 에서 SQS 에 데이터를 전송할 수 있도록 IAM Role 과 Policy 를 생성해주어야 합니다.
다음 명령어를 통해 테스트를 위한 IAM Policy 와 Role 을 한 번에 생성해줍니다.
export AWS_PAGER="" # API Gateway Policy 생성 cat << EOT > apigw-policy.json { "Version": "2012-10-17", "Statement": [ { "Action": [ "sqs:SendMessage", "sqs:GetQueueUrl", "sqs:SendMessageBatch" ], "Resource": ["*"], "Effect": "Allow" }, { "Action": [ "logs:CreateLogGroup", "logs:CreateLogStream", "logs:DescribeLogGroups", "logs:DescribeLogStreams", "logs:PutLogEvents", "logs:GetLogEvents", "logs:FilterLogEvents" ], "Resource": "*", "Effect": "Allow" } ] } EOT # IAM Policy 생성 aws iam create-policy \ --policy-name api-gateway-policy \ --policy-document file://apigw-policy.json # IAM Role 생성 aws iam create-role \ --role-name api-gateway-role \ --assume-role-policy-document '{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "apigateway.amazonaws.com" }, "Action": "sts:AssumeRole" } ] }' # Attach Policy ACCOUNT_ID=$(aws sts get-caller-identity | jq .Account -r) aws iam attach-role-policy \ --role-name api-gateway-role \ --policy-arn arn:aws:iam::${ACCOUNT_ID}:policy/api-gateway-policy
생성 화면

3.2. Method 생성
이번에 API Gateway 의 Web UI 가 변경되었습니다. [2023.10.31 이후로는 구 콘솔 사용 불가]
해당 내용은 변경된 Web UI 로 진행할 예정입니다.
API Gateway 에서 작업하기 위해서는 API 경로 생성 후, HTTP Method 를 설정합니다.
이후 스테이지 배포를 통해 실제 서비스 배포를 진행하게 되는데, 우선 API Method 를 생성해보겠습니다.

리소스 생성을 통해 queue-1 에 대한 경로를 만들어 줍니다.

SQS 서비스를 지정 후, 앞서 생성한 IAM Role ARN 을 연결해줍니다.

생성한 SQS 와 연결하기 위해 경로 재정의를 사용합니다.
Path Override 에 AccountID/SQS명을 입력하여 엔드포인트를 만들어 줍니다.

3.3. 매핑 템플릿 생성
API Gateway 에서는 매핑 템플릿을 통해 SQS 에 전달할 데이터를 편집해줄 수 있습니다.
통합 요청을 편집하여 헤더 부분에 다음과 같이 입력합니다.
Content-Type : 'application/x-www-form-urlencoded'

그 후, 매핑 템플릿을 수정하여 다음과 같이 요청받은 Data 본문을 가공하여 SQS 로 전달할 수 있게 편집합니다.
매핑 템플릿은 Velocity 문법을 따릅니다.
매핑 템플릿 이해 - Amazon API Gateway
이 페이지에 작업이 필요하다는 점을 알려 주셔서 감사합니다. 실망시켜 드려 죄송합니다. 잠깐 시간을 내어 설명서를 향상시킬 수 있는 방법에 대해 말씀해 주십시오.
docs.aws.amazon.com
SQS 로 메시지를 전송할 때, MessageBody 에 data 부분과 MessageGroupId 를 구분하여 보내는 형식입니다.
Action=SendMessage&MessageBody={"data":$input.body}&MessageGroupId=$input.path('$.MessageGroupID')

SQS FIFO 에서는 MessageGroupID 가 같으면 같은 FIFO 대기열로 인식하여 순서를 엄격하게 지켜줍니다.
반대로, MessageGroupID 가 다르면 다른 FIFO 대기열로 처리됩니다.
3.4. API Gateway 테스트
API Gateway 구성이 잘 되었다면 테스트 화면에서 다음과 같은 데이터를 전송해볼 수 있습니다.
{ "MessageGroupID": "1", "http_status": 200, "Hello":"World" }

실제 SQS 콘솔에서 메시지 확인이 가능합니다.


3.5. API Gateway 배포
테스트가 성공했다면, 실제 API Gateway 를 사용할 수 있도록 배포해보겠습니다.
우선, 만들어 둔 리소스를 통해 API 를 배포합니다.

그 후, API Gateway Stage 화면에서 실제 배포된 API Gateway 의 URL 을 확인할 수 있습니다.

해당 URL 을 통해 SQS 에 메시지를 전송하려면 curl 을 이용하면 됩니다.
API_URL=3yjcxrq2x5.execute-api.ap-northeast-2.amazonaws.com curl -X POST https://${API_URL}/kimalarm/queue-1 \ --header 'Content-Type: application/json' \ -d "{\"MessageGroupID\": \"10\", \"http_status\":200}"

4. 응답 서버 생성
SQS 앞단에서 Data 를 전달받는 API Gateway 가 생성되었습니다.
이제, SQS 뒤에서 SQS 로 전송된 Data 를 실제로 응답해줄 서버를 생성해보겠습니다.
테스트 서버는 fastAPI 를 사용하여 구성할 예정입니다.
FastAPI
FastAPI framework, high performance, easy to learn, fast to code, ready for production
fastapi.tiangolo.com
4.1. EC2 생성
저는 반복 테스트를 할 때, 스크립트를 작성하여 사용하는 것을 선호합니다.
아래 코드를 user-data 에 넣어서 생성 시, 응답 서버에 필요한 모든 준비를 자동으로 마치게 됩니다.
인스턴스 타입은 Amazon Linux 로 진행했으며,
편의를 위해 Public Subnet 에 배포하고 fastAPI 실행을 위해 8000 포트가 필요합니다.
#!/bin/bash sudo timedatectl set-timezone Asia/Seoul sudo hostnamectl set-hostname fastapi # PIP3 Install sudo yum install -y python3-pip # Package Install pip3 install fastapi pip3 install "uvicorn[standard]" # Python Code cat << EOT > /home/ec2-user/response.py from fastapi import FastAPI, Request from fastapi.logger import logger from fastapi.responses import JSONResponse import time import json from MakeTextFile import MakeTextFile app = FastAPI() text_file = MakeTextFile("logs/res_server.txt") transSeq_log_file = MakeTextFile("logs/res_server_Seq.txt") @app.post("/") # async def root(info : Request): async def root(info : Request): #time.sleep(2) print("Response Server : 8000") req_info = await info.json() # REQ_INFO Print json_string = json.dumps(req_info, indent=2) print(json_string) # Logging text_file.writeSaveln(json_string) #time.sleep(req_info['time_out']) transSeq_log_file.writeSaveln(req_info['data']['transSeq']) return { "status_code" : req_info['data']['http_status'], "content" : req_info } EOT # Make Log File Code cat << EOT > /home/ec2-user/MakeTextFile.py #!/usr/bin/python # -*- coding:utf-8 -*- import os class MakeTextFile: fileName="" def __init__(self,fileName): directory_path = os.path.dirname(fileName) os.makedirs(directory_path, exist_ok=True) self.fileName = fileName def writeSave(self, str): if os.path.isfile(self.fileName): f = open(self.fileName, 'a') else: f = open(self.fileName, 'w') f.write(str) f.close() def writeSaveln(self, str): if os.path.isfile(self.fileName): f = open(self.fileName, 'a') else: f = open(self.fileName, 'w') f.write(str) f.write("\n") f.close() def writeFile(self, file_name): if os.path.isfile(self.fileName): f = open(self.fileName, 'a') else: f = open(self.fileName, 'w') f.write(str) f.write("\n") f.close() def closeFile(self): pass def printMessage(self, message): print(message) if __name__ == "__main__": textFile = makeTextOneOpenFile("cli_test.txt") textFile.writeSave("hello") EOT # Response Server cat << EOT > /home/ec2-user/response_server.sh #!/bin/bash uvicorn response:app --reload --host=0.0.0.0 --port 8000 EOT chown ec2-user:ec2-user /home/ec2-user/*
5. Lambda 생성
Lambda 는 SQS 에서 데이터를 꺼내어 EC2 에 전송해주는 역할을 합니다.
해당 Lambda 를 Python 을 사용하여 간편하게 만들어 보겠습니다.
5.1. Lambda IAM Role 생성
Lambda 는 SQS 에 대한 접근 권한이 필요합니다.
다음 CLI 명령어를 통해 IAM Policy 와 Role 을 간편하게 만들 수 있습니다.
export AWS_PAGER="" ACCOUNT_ID=$(aws sts get-caller-identity | jq .Account -r) REGION=ap-northeast-2 # API Gateway Policy 생성 cat << EOT > lmd-sqs-policy.json { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "sqs:ReceiveMessage", "sqs:DeleteMessage", "sqs:GetQueueAttributes", "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents", "sqs:SendMessage*" ], "Resource": "*" } ] } EOT # IAM Policy 생성 aws iam create-policy \ --policy-name lmd-sqs-policy \ --policy-document file://lmd-sqs-policy.json # IAM Role 생성 aws iam create-role \ --role-name lmd-sqs-role \ --assume-role-policy-document '{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "lambda.amazonaws.com" }, "Action": "sts:AssumeRole" } ] }' # Attach Policy aws iam attach-role-policy \ --role-name lmd-sqs-role \ --policy-arn arn:aws:iam::${ACCOUNT_ID}:policy/lmd-sqs-policy
5.2. Lambda 생성
실제 Python 을 통해서 Lambda 를 생성해봅니다.
Lambda 에 대한 역할은 앞서 생성한 IAM Role 으로 지정해줍니다.
아래 코드를 붙여넣어줍니다.
SQS 로부터 전달받은 Data 를 Response 서버에 전송 후, 실패 처리 시 DLQ 로 보내는 로직입니다.
해당 코드는 requests 모듈이 필요합니다.
Lambda Layer 에 requests 모듈을 사전에 추가해야 합니다.
import json import os import requests import boto3 DEAD_LETTER_SQS_QUEUE_URL = os.environ['DEAD_LETTER_SQS_QUEUE_URL'] RESPONSE_SERVER_IP = os.environ['RESPONSE_SERVER_IP'] def lambda_handler(event, context): sqs = boto3.client('sqs') sqs_batch_response = {} batch_item_failures = [] for record in event['Records']: try: print("Record : ", record) # Message Body body = record["body"] print("MessageBody : ", body) # Request post_response = requests.post( f"http://{RESPONSE_SERVER_IP}:8000", headers={"Content-Type": "application/json"}, data=body ) # Byte Type to String json_str = post_response.content.decode('utf-8') # String Type to JSON json_data = json.loads(json_str) print("Response : ", json.dumps(json_data)) # print(json_data["status_code"]) # print(type(json_data["status_code"])) if json_data["status_code"] == 200: print("Success Response") else: print("DLQ URL", DEAD_LETTER_SQS_QUEUE_URL) sqs_response = sqs.send_message( QueueUrl=DEAD_LETTER_SQS_QUEUE_URL, MessageBody=json.dumps(json_data["content"]), MessageGroupId=json_data["content"]["data"]["MessageGroupID"] ) print("DLQ Response", sqs_response) except Exception as e: print("Error : ", e) batch_item_failures.append({"itemIdentifier": record['messageId']}) sqs_batch_response["batchItemFailures"] = batch_item_failures print("batch_item_failures : ", batch_item_failures) print("sqs_batch_response : ", sqs_batch_response) return sqs_batch_response # Echo back the first key value
Lambda 환경변수에 DLQ URL 과 응답 서버 IP 를 입력합니다.

5.3. SQS Trigger 연동
다음으로는 SQS 에 데이터가 적재되면 Lambda 가 동작할 수 있게 만드는 것입니다.
람다 화면에서 트리거 추가를 선택합니다.

SQS 트리거 추가

SQS 에서도 마찬가지로 함수 트리거가 설정되어 있는 지 확인합니다.

6. API Call 실행
드디어 모든 인프라 구성이 완료되었습니다.
실제로 API Gateway Call 을 통해 EC2 응답서버까지 데이터가 전송되는 지 확인할 차례입니다.
6.1. EC2 fastAPI 실행
앞서 생성한 EC2 에 접속하여 response_server.sh 를 실행해줍니다.
sh response_server.sh

6.2. API Call 실행
앞에서 API Gateway 를 테스트할 때 사용했던 curl 명령어를 활용하여 연속된 API Call 을 실행해보겠습니다.
다음과 같은 스크립트 파일을 작성 후, 실행해봅니다.
API URL 과 API Stage 만 변경하여 사용하면 됩니다.
cat << EOT > api-call-test.sh #!/bin/bash MessageGroupId=10 transSeq=1 time_out=0.5 http_status=200 SITE_DOMAIN=3yjcxrq2x5.execute-api.ap-northeast-2.amazonaws.com # transSeq 가 5보다 작을 때까지 반복 while [ \$transSeq -lt 5 ]; do date_time=\$(date '+%Y-%m-%d %H:%M:%S-%3N') echo transSeq : \${transSeq} echo "curl -X POST http://\${SITE_DOMAIN}/kimalarm/queue-1" echo " --header 'Content-Type: application/json'" echo " -d \"{\"MessageGroupID\": \"\${MessageGroupId}\", \"transSeq\":\"\${transSeq}\", \"time_out\":\${time_out}, \"http_status\":\${http_status}, \"createdAt\":\"\${date_time}\"}" echo "===================================================" curl -X POST https://\${SITE_DOMAIN}/kimalarm/queue-1 \ --header 'Content-Type: application/json' \ -d "{\"MessageGroupID\": \"\${MessageGroupId}\", \"transSeq\":\"\${transSeq}\", \"time_out\":\${time_out}, \"http_status\":\${http_status}, \"createdAt\":\"\${date_time}\"}" echo "===================================================" let transSeq=transSeq+1 done EOT
스크립트는 아래와 같은 화면으로 출력되어야 합니다.
오류 발생 시 차이점을 찾아내면 됩니다.

해당 스크립트를 실행하면 다음과 같이 SQS 가 작동하여 EC2 서버에서 응답이 오는 것을 알 수 있습니다.

다음 포스팅에서는 사용하지 않았던 API Gateway 의 X-API-Key 을 통해 권한 있는 사용자만 접근할 수 있도록 설정하고,
Route53 을 통해 도메인을 매핑시켜주는 작업을 해보겠습니다.
또한, 이번 포스팅에서는 해당 아키텍처를 콘솔에서 직접 생성했지만,
실제로는 많은 리소스를 직접 하나씩 만드는 것은 굉장히 힘든 일입니다.
AWS Serverless 아키텍처를 구현할 때 사용할 수 있는 AWS SAM 으로 현재 만든 리소스를 쉽게 배포하는 방법을 알아보겠습니다.