AWS SQS FIFO를 활용한 서버리스 아키텍처 구축하기 - 1
- -
최근 업무에서 SQS 를 활용한 Serverless 아키텍처를 구축하는 프로젝트를 진행했습니다.
이번 포스팅에서는 해당 프로젝트에서 배운 인사이트를 활용하여 API Gateway - SQS FIFO - Lambda 로 이어지는
Serverless 기반 Message Queue 서비스 아키텍처를 구축해보겠습니다.
1. 개요
이번 포스팅에서는 SQS 를 단순히 생성하여 콘솔에서 메시지를 저장하는 것에 그치지 않고,
앞 뒤로 API Gateway , Lambda 를 연결하여 SQS 서비스를 어떻게 구현할 수 있는 지 알아보겠습니다.
이번 글에서 사용할 AWS 서비스는 다음과 같습니다.
- API Gateway
- SQS
- Lambda
- EC2
- DynamoDB
- Route53
1.1. 아키텍처
아키텍처 상에서는 /queue-1 , /queue-2 가 분기 처리 되어 있으나
구현 방법은 동일하기 때문에 /queue-1 에 대한 방법을 대표로 알아볼 예정입니다.
1.2. 서비스에 대한 간략한 소개
SQS
SQS 는 AWS 의 대표적인 Managed Queue 서비스입니다.
다만 SQS 는 1:1 통신만을 허용하기 때문에,
다대일 통신은 SNS - SQS 의 Pub-Sub 구조를 사용하며,
다대다 통신은 MSK 를 사용하게 됩니다.
API Gateway
API Gateway 는 말 그대로 API 에 대한 단일 진입점을 제공하는 AWS Managed Service 입니다.
API 접근 제어, 제한 등의 기능을 제공하고 있습니다.
현 구조에서 API Gateway 는 SQS 로 전송하는 Queue Data 의 진입점 역할을 수행합니다.
Lambda
AWS 의 대표적인 Serverless 서비스입니다.
현 구조에서 SQS 에 저장된 Queue Data 를 꺼내어 EC2 서버로 전송하는 역할을 수행합니다.
EC2
현 구조에서 SQS 로 전송된 데이터를 처리하는 서버 역할을 수행합니다.
DynamoDB
보통 SQS 전송이 실패할 경우를 대비하여 DLQ 설정을 하게 됩니다.
DLQ 에서 다시 SQS 에 데이터를 넣는 로직도 가능하나,
엄격한 FIFO 로직이 요구되는 경우 DynamoDB 를 활용하여 실패한 내용을 저장하여 관리자가 직접 처리하도록하는 방안도 있습니다.
Route53
API Gateway 에 Domain 을 부여하기 위함이며, 해당 서비스는 반드시 필요하지는 않습니다.
2. SQS 구축
AWS SQS 는 표준 대기열과 FIFO 대기열을 지원합니다.
저는 메시지 순서가 유지되는 FIFO 대기열을 사용할 예정입니다.
2.1. DLQ 생성
생성하는 SQS 간 Dependency 가 존재하기 때문에, 편의를 위해 DLQ 먼저 생성해줍니다.
DLQ 를 나중에 생성해도 되나, 한 번 더 작업해야 하는 번거로움이 있습니다.
이를 방지하기 위해 DLQ 부터 생성하겠습니다.
SQS FIFO 를 사용하기 때문에 DLQ 또한 FIFO 대기열을 사용합니다.
주의해야할 점은 FIFO 대기열은 이름에 .fifo 가 반드시 존재해야 합니다.
2.2. SQS 생성
실제 구현할 SQS 를 생성합니다.
DLQ 와 동일하게 FIFO 대기열로 생성하되, 배달못한 편지 대기열에 앞서 생성한 DLQ 를 입력합니다.
SQS 는 매우 간단하게 생성할 수 있었습니다.
그럼 SQS 앞단에서 데이터를 받아서 전송해줄 API Gateway 를 만들어 보겠습니다.
3. API Gateway 구축
API Gateway 는 REST API 를 사용하여 생성합니다.
External 과 Internal 이 존재하는데, 외부 서비스를 통해 API Call 을 받을 예정이므로 External 로 생성합니다.
3.1. API Gateway 를 위한 IAM Role 생성
API Gateway 에서 SQS 에 데이터를 전송할 수 있도록 IAM Role 과 Policy 를 생성해주어야 합니다.
다음 명령어를 통해 테스트를 위한 IAM Policy 와 Role 을 한 번에 생성해줍니다.
export AWS_PAGER=""
# API Gateway Policy 생성
cat << EOT > apigw-policy.json
{
"Version": "2012-10-17",
"Statement": [
{
"Action": [
"sqs:SendMessage",
"sqs:GetQueueUrl",
"sqs:SendMessageBatch"
],
"Resource": ["*"],
"Effect": "Allow"
},
{
"Action": [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:DescribeLogGroups",
"logs:DescribeLogStreams",
"logs:PutLogEvents",
"logs:GetLogEvents",
"logs:FilterLogEvents"
],
"Resource": "*",
"Effect": "Allow"
}
]
}
EOT
# IAM Policy 생성
aws iam create-policy \
--policy-name api-gateway-policy \
--policy-document file://apigw-policy.json
# IAM Role 생성
aws iam create-role \
--role-name api-gateway-role \
--assume-role-policy-document '{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "apigateway.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
}'
# Attach Policy
ACCOUNT_ID=$(aws sts get-caller-identity | jq .Account -r)
aws iam attach-role-policy \
--role-name api-gateway-role \
--policy-arn arn:aws:iam::${ACCOUNT_ID}:policy/api-gateway-policy
생성 화면
3.2. Method 생성
이번에 API Gateway 의 Web UI 가 변경되었습니다. [2023.10.31 이후로는 구 콘솔 사용 불가]
해당 내용은 변경된 Web UI 로 진행할 예정입니다.
API Gateway 에서 작업하기 위해서는 API 경로 생성 후, HTTP Method 를 설정합니다.
이후 스테이지 배포를 통해 실제 서비스 배포를 진행하게 되는데, 우선 API Method 를 생성해보겠습니다.
리소스 생성을 통해 queue-1 에 대한 경로를 만들어 줍니다.
SQS 서비스를 지정 후, 앞서 생성한 IAM Role ARN 을 연결해줍니다.
생성한 SQS 와 연결하기 위해 경로 재정의를 사용합니다.
Path Override 에 AccountID/SQS명을 입력하여 엔드포인트를 만들어 줍니다.
3.3. 매핑 템플릿 생성
API Gateway 에서는 매핑 템플릿을 통해 SQS 에 전달할 데이터를 편집해줄 수 있습니다.
통합 요청을 편집하여 헤더 부분에 다음과 같이 입력합니다.
Content-Type : 'application/x-www-form-urlencoded'
그 후, 매핑 템플릿을 수정하여 다음과 같이 요청받은 Data 본문을 가공하여 SQS 로 전달할 수 있게 편집합니다.
매핑 템플릿은 Velocity 문법을 따릅니다.
SQS 로 메시지를 전송할 때, MessageBody 에 data 부분과 MessageGroupId 를 구분하여 보내는 형식입니다.
Action=SendMessage&MessageBody={"data":$input.body}&MessageGroupId=$input.path('$.MessageGroupID')
SQS FIFO 에서는 MessageGroupID 가 같으면 같은 FIFO 대기열로 인식하여 순서를 엄격하게 지켜줍니다.
반대로, MessageGroupID 가 다르면 다른 FIFO 대기열로 처리됩니다.
3.4. API Gateway 테스트
API Gateway 구성이 잘 되었다면 테스트 화면에서 다음과 같은 데이터를 전송해볼 수 있습니다.
{
"MessageGroupID": "1",
"http_status": 200,
"Hello":"World"
}
실제 SQS 콘솔에서 메시지 확인이 가능합니다.
3.5. API Gateway 배포
테스트가 성공했다면, 실제 API Gateway 를 사용할 수 있도록 배포해보겠습니다.
우선, 만들어 둔 리소스를 통해 API 를 배포합니다.
그 후, API Gateway Stage 화면에서 실제 배포된 API Gateway 의 URL 을 확인할 수 있습니다.
해당 URL 을 통해 SQS 에 메시지를 전송하려면 curl 을 이용하면 됩니다.
API_URL=3yjcxrq2x5.execute-api.ap-northeast-2.amazonaws.com
curl -X POST https://${API_URL}/kimalarm/queue-1 \
--header 'Content-Type: application/json' \
-d "{\"MessageGroupID\": \"10\", \"http_status\":200}"
4. 응답 서버 생성
SQS 앞단에서 Data 를 전달받는 API Gateway 가 생성되었습니다.
이제, SQS 뒤에서 SQS 로 전송된 Data 를 실제로 응답해줄 서버를 생성해보겠습니다.
테스트 서버는 fastAPI 를 사용하여 구성할 예정입니다.
4.1. EC2 생성
저는 반복 테스트를 할 때, 스크립트를 작성하여 사용하는 것을 선호합니다.
아래 코드를 user-data 에 넣어서 생성 시, 응답 서버에 필요한 모든 준비를 자동으로 마치게 됩니다.
인스턴스 타입은 Amazon Linux 로 진행했으며,
편의를 위해 Public Subnet 에 배포하고 fastAPI 실행을 위해 8000 포트가 필요합니다.
#!/bin/bash
sudo timedatectl set-timezone Asia/Seoul
sudo hostnamectl set-hostname fastapi
# PIP3 Install
sudo yum install -y python3-pip
# Package Install
pip3 install fastapi
pip3 install "uvicorn[standard]"
# Python Code
cat << EOT > /home/ec2-user/response.py
from fastapi import FastAPI, Request
from fastapi.logger import logger
from fastapi.responses import JSONResponse
import time
import json
from MakeTextFile import MakeTextFile
app = FastAPI()
text_file = MakeTextFile("logs/res_server.txt")
transSeq_log_file = MakeTextFile("logs/res_server_Seq.txt")
@app.post("/")
# async def root(info : Request):
async def root(info : Request):
#time.sleep(2)
print("Response Server : 8000")
req_info = await info.json()
# REQ_INFO Print
json_string = json.dumps(req_info, indent=2)
print(json_string)
# Logging
text_file.writeSaveln(json_string)
#time.sleep(req_info['time_out'])
transSeq_log_file.writeSaveln(req_info['data']['transSeq'])
return {
"status_code" : req_info['data']['http_status'],
"content" : req_info
}
EOT
# Make Log File Code
cat << EOT > /home/ec2-user/MakeTextFile.py
#!/usr/bin/python
# -*- coding:utf-8 -*-
import os
class MakeTextFile:
fileName=""
def __init__(self,fileName):
directory_path = os.path.dirname(fileName)
os.makedirs(directory_path, exist_ok=True)
self.fileName = fileName
def writeSave(self, str):
if os.path.isfile(self.fileName):
f = open(self.fileName, 'a')
else:
f = open(self.fileName, 'w')
f.write(str)
f.close()
def writeSaveln(self, str):
if os.path.isfile(self.fileName):
f = open(self.fileName, 'a')
else:
f = open(self.fileName, 'w')
f.write(str)
f.write("\n")
f.close()
def writeFile(self, file_name):
if os.path.isfile(self.fileName):
f = open(self.fileName, 'a')
else:
f = open(self.fileName, 'w')
f.write(str)
f.write("\n")
f.close()
def closeFile(self):
pass
def printMessage(self, message):
print(message)
if __name__ == "__main__":
textFile = makeTextOneOpenFile("cli_test.txt")
textFile.writeSave("hello")
EOT
# Response Server
cat << EOT > /home/ec2-user/response_server.sh
#!/bin/bash
uvicorn response:app --reload --host=0.0.0.0 --port 8000
EOT
chown ec2-user:ec2-user /home/ec2-user/*
5. Lambda 생성
Lambda 는 SQS 에서 데이터를 꺼내어 EC2 에 전송해주는 역할을 합니다.
해당 Lambda 를 Python 을 사용하여 간편하게 만들어 보겠습니다.
5.1. Lambda IAM Role 생성
Lambda 는 SQS 에 대한 접근 권한이 필요합니다.
다음 CLI 명령어를 통해 IAM Policy 와 Role 을 간편하게 만들 수 있습니다.
export AWS_PAGER=""
ACCOUNT_ID=$(aws sts get-caller-identity | jq .Account -r)
REGION=ap-northeast-2
# API Gateway Policy 생성
cat << EOT > lmd-sqs-policy.json
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"sqs:ReceiveMessage",
"sqs:DeleteMessage",
"sqs:GetQueueAttributes",
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents",
"sqs:SendMessage*"
],
"Resource": "*"
}
]
}
EOT
# IAM Policy 생성
aws iam create-policy \
--policy-name lmd-sqs-policy \
--policy-document file://lmd-sqs-policy.json
# IAM Role 생성
aws iam create-role \
--role-name lmd-sqs-role \
--assume-role-policy-document '{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "lambda.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
}'
# Attach Policy
aws iam attach-role-policy \
--role-name lmd-sqs-role \
--policy-arn arn:aws:iam::${ACCOUNT_ID}:policy/lmd-sqs-policy
5.2. Lambda 생성
실제 Python 을 통해서 Lambda 를 생성해봅니다.
Lambda 에 대한 역할은 앞서 생성한 IAM Role 으로 지정해줍니다.
아래 코드를 붙여넣어줍니다.
SQS 로부터 전달받은 Data 를 Response 서버에 전송 후, 실패 처리 시 DLQ 로 보내는 로직입니다.
해당 코드는 requests 모듈이 필요합니다.
Lambda Layer 에 requests 모듈을 사전에 추가해야 합니다.
import json
import os
import requests
import boto3
DEAD_LETTER_SQS_QUEUE_URL = os.environ['DEAD_LETTER_SQS_QUEUE_URL']
RESPONSE_SERVER_IP = os.environ['RESPONSE_SERVER_IP']
def lambda_handler(event, context):
sqs = boto3.client('sqs')
sqs_batch_response = {}
batch_item_failures = []
for record in event['Records']:
try:
print("Record : ", record)
# Message Body
body = record["body"]
print("MessageBody : ", body)
# Request
post_response = requests.post(
f"http://{RESPONSE_SERVER_IP}:8000",
headers={"Content-Type": "application/json"},
data=body
)
# Byte Type to String
json_str = post_response.content.decode('utf-8')
# String Type to JSON
json_data = json.loads(json_str)
print("Response : ", json.dumps(json_data))
# print(json_data["status_code"])
# print(type(json_data["status_code"]))
if json_data["status_code"] == 200:
print("Success Response")
else:
print("DLQ URL", DEAD_LETTER_SQS_QUEUE_URL)
sqs_response = sqs.send_message(
QueueUrl=DEAD_LETTER_SQS_QUEUE_URL,
MessageBody=json.dumps(json_data["content"]),
MessageGroupId=json_data["content"]["data"]["MessageGroupID"]
)
print("DLQ Response", sqs_response)
except Exception as e:
print("Error : ", e)
batch_item_failures.append({"itemIdentifier": record['messageId']})
sqs_batch_response["batchItemFailures"] = batch_item_failures
print("batch_item_failures : ", batch_item_failures)
print("sqs_batch_response : ", sqs_batch_response)
return sqs_batch_response # Echo back the first key value
Lambda 환경변수에 DLQ URL 과 응답 서버 IP 를 입력합니다.
5.3. SQS Trigger 연동
다음으로는 SQS 에 데이터가 적재되면 Lambda 가 동작할 수 있게 만드는 것입니다.
람다 화면에서 트리거 추가를 선택합니다.
SQS 트리거 추가
SQS 에서도 마찬가지로 함수 트리거가 설정되어 있는 지 확인합니다.
6. API Call 실행
드디어 모든 인프라 구성이 완료되었습니다.
실제로 API Gateway Call 을 통해 EC2 응답서버까지 데이터가 전송되는 지 확인할 차례입니다.
6.1. EC2 fastAPI 실행
앞서 생성한 EC2 에 접속하여 response_server.sh 를 실행해줍니다.
sh response_server.sh
6.2. API Call 실행
앞에서 API Gateway 를 테스트할 때 사용했던 curl 명령어를 활용하여 연속된 API Call 을 실행해보겠습니다.
다음과 같은 스크립트 파일을 작성 후, 실행해봅니다.
API URL 과 API Stage 만 변경하여 사용하면 됩니다.
cat << EOT > api-call-test.sh
#!/bin/bash
MessageGroupId=10
transSeq=1
time_out=0.5
http_status=200
SITE_DOMAIN=3yjcxrq2x5.execute-api.ap-northeast-2.amazonaws.com
# transSeq 가 5보다 작을 때까지 반복
while [ \$transSeq -lt 5 ]; do
date_time=\$(date '+%Y-%m-%d %H:%M:%S-%3N')
echo transSeq : \${transSeq}
echo "curl -X POST http://\${SITE_DOMAIN}/kimalarm/queue-1"
echo " --header 'Content-Type: application/json'"
echo " -d \"{\"MessageGroupID\": \"\${MessageGroupId}\", \"transSeq\":\"\${transSeq}\", \"time_out\":\${time_out}, \"http_status\":\${http_status}, \"createdAt\":\"\${date_time}\"}"
echo "==================================================="
curl -X POST https://\${SITE_DOMAIN}/kimalarm/queue-1 \
--header 'Content-Type: application/json' \
-d "{\"MessageGroupID\": \"\${MessageGroupId}\", \"transSeq\":\"\${transSeq}\", \"time_out\":\${time_out}, \"http_status\":\${http_status}, \"createdAt\":\"\${date_time}\"}"
echo "==================================================="
let transSeq=transSeq+1
done
EOT
스크립트는 아래와 같은 화면으로 출력되어야 합니다.
오류 발생 시 차이점을 찾아내면 됩니다.
해당 스크립트를 실행하면 다음과 같이 SQS 가 작동하여 EC2 서버에서 응답이 오는 것을 알 수 있습니다.
다음 포스팅에서는 사용하지 않았던 API Gateway 의 X-API-Key 을 통해 권한 있는 사용자만 접근할 수 있도록 설정하고,
Route53 을 통해 도메인을 매핑시켜주는 작업을 해보겠습니다.
또한, 이번 포스팅에서는 해당 아키텍처를 콘솔에서 직접 생성했지만,
실제로는 많은 리소스를 직접 하나씩 만드는 것은 굉장히 힘든 일입니다.
AWS Serverless 아키텍처를 구현할 때 사용할 수 있는 AWS SAM 으로 현재 만든 리소스를 쉽게 배포하는 방법을 알아보겠습니다.
'AWS > Serverless' 카테고리의 다른 글
Jenkins 를 이용하여 AWS SAM 교차 계정 배포하기 (0) | 2023.01.04 |
---|