새소식

AWS/Serverless

AWS SQS FIFO를 활용한 서버리스 아키텍처 구축하기 - 1

  • -

최근 업무에서 SQS 를 활용한 Serverless 아키텍처를 구축하는 프로젝트를 진행했습니다.

이번 포스팅에서는 해당 프로젝트에서 배운 인사이트를 활용하여 API Gateway - SQS FIFO - Lambda 로 이어지는

Serverless 기반 Message Queue 서비스 아키텍처를 구축해보겠습니다. 

 

 

1. 개요

 

이번 포스팅에서는 SQS 를 단순히 생성하여 콘솔에서 메시지를 저장하는 것에 그치지 않고,
앞 뒤로 API Gateway , Lambda 를 연결하여 SQS 서비스를 어떻게 구현할 수 있는 지 알아보겠습니다.

 

이번 글에서 사용할 AWS 서비스는 다음과 같습니다.

  1. API Gateway
  2. SQS
  3. Lambda
  4. EC2
  5. DynamoDB
  6. Route53

 

1.1. 아키텍처

 

 

아키텍처 상에서는 /queue-1 , /queue-2 가 분기 처리 되어 있으나
구현 방법은 동일하기 때문에 /queue-1 에 대한 방법을 대표로 알아볼 예정입니다.

 

 

1.2. 서비스에 대한 간략한 소개

 

SQS

SQS 는 AWS 의 대표적인 Managed Queue 서비스입니다.
다만 SQS 는 1:1 통신만을 허용하기 때문에,
다대일 통신은 SNS - SQS 의 Pub-Sub 구조를 사용하며,
다대다 통신은 MSK 를 사용하게 됩니다.

 

API Gateway

API Gateway 는 말 그대로 API 에 대한 단일 진입점을 제공하는 AWS Managed Service 입니다.
API 접근 제어, 제한 등의 기능을 제공하고 있습니다.

현 구조에서 API Gateway 는 SQS 로 전송하는 Queue Data 의 진입점 역할을 수행합니다.

 

Lambda

AWS 의 대표적인 Serverless 서비스입니다.
현 구조에서 SQS 에 저장된 Queue Data 를 꺼내어 EC2 서버로 전송하는 역할을 수행합니다.

 

EC2

현 구조에서 SQS 로 전송된 데이터를 처리하는 서버 역할을 수행합니다.

 

DynamoDB

보통 SQS 전송이 실패할 경우를 대비하여 DLQ 설정을 하게 됩니다.
DLQ 에서 다시 SQS 에 데이터를 넣는 로직도 가능하나,
엄격한 FIFO 로직이 요구되는 경우 DynamoDB 를 활용하여 실패한 내용을 저장하여 관리자가 직접 처리하도록하는 방안도 있습니다.

 

Route53

API Gateway 에 Domain 을 부여하기 위함이며, 해당 서비스는 반드시 필요하지는 않습니다.

 

 

2. SQS 구축

 

AWS SQS 는 표준 대기열과 FIFO 대기열을 지원합니다.
저는 메시지 순서가 유지되는 FIFO 대기열을 사용할 예정입니다.

 

 

2.1. DLQ 생성

 

생성하는 SQS 간 Dependency 가 존재하기 때문에, 편의를 위해 DLQ 먼저 생성해줍니다.
DLQ 를 나중에 생성해도 되나, 한 번 더 작업해야 하는 번거로움이 있습니다.
이를 방지하기 위해 DLQ 부터 생성하겠습니다.

 

SQS FIFO 를 사용하기 때문에 DLQ 또한 FIFO 대기열을 사용합니다.
주의해야할 점은 FIFO 대기열은 이름에 .fifo 가 반드시 존재해야 합니다.

 

 

2.2. SQS 생성

 

실제 구현할 SQS 를 생성합니다.
DLQ 와 동일하게 FIFO 대기열로 생성하되, 배달못한 편지 대기열에 앞서 생성한 DLQ 를 입력합니다.

 

SQS 는 매우 간단하게 생성할 수 있었습니다.
그럼 SQS 앞단에서 데이터를 받아서 전송해줄 API Gateway 를 만들어 보겠습니다.

 

 

3. API Gateway 구축

 

API Gateway 는 REST API 를 사용하여 생성합니다.
External 과 Internal 이 존재하는데, 외부 서비스를 통해 API Call 을 받을 예정이므로 External 로 생성합니다.

 

 

3.1. API Gateway 를 위한 IAM Role 생성

 

API Gateway 에서 SQS 에 데이터를 전송할 수 있도록 IAM Role 과 Policy 를 생성해주어야 합니다.
다음 명령어를 통해 테스트를 위한 IAM Policy 와 Role 을 한 번에 생성해줍니다.

 

export AWS_PAGER=""

# API Gateway Policy 생성
cat << EOT > apigw-policy.json
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Action": [
                "sqs:SendMessage",
                "sqs:GetQueueUrl",
                "sqs:SendMessageBatch"
            ],
            "Resource": ["*"],
            "Effect": "Allow"
        },
        {
            "Action": [
                "logs:CreateLogGroup",
                "logs:CreateLogStream",
                "logs:DescribeLogGroups",
                "logs:DescribeLogStreams",
                "logs:PutLogEvents",
                "logs:GetLogEvents",
                "logs:FilterLogEvents"
            ],
            "Resource": "*",
            "Effect": "Allow"
        }
    ]
}
EOT

# IAM Policy 생성
aws iam create-policy \
    --policy-name api-gateway-policy \
    --policy-document file://apigw-policy.json

# IAM Role 생성
aws iam create-role \
  --role-name api-gateway-role \
  --assume-role-policy-document '{
    "Version": "2012-10-17",
    "Statement": [
      {
        "Effect": "Allow",
        "Principal": {
          "Service": "apigateway.amazonaws.com"
        },
        "Action": "sts:AssumeRole"
      }
    ]
  }'

# Attach Policy
ACCOUNT_ID=$(aws sts get-caller-identity | jq .Account -r)

aws iam attach-role-policy \
  --role-name api-gateway-role \
  --policy-arn arn:aws:iam::${ACCOUNT_ID}:policy/api-gateway-policy

 

생성 화면

 

 

3.2. Method 생성

 

이번에 API Gateway 의 Web UI 가 변경되었습니다. [2023.10.31 이후로는 구 콘솔 사용 불가]
해당 내용은 변경된 Web UI 로 진행할 예정입니다.

 

API Gateway 에서 작업하기 위해서는 API 경로 생성 후, HTTP Method 를 설정합니다.
이후 스테이지 배포를 통해 실제 서비스 배포를 진행하게 되는데, 우선 API Method 를 생성해보겠습니다.

 

 

리소스 생성을 통해 queue-1 에 대한 경로를 만들어 줍니다.

 

 

SQS 서비스를 지정 후, 앞서 생성한 IAM Role ARN 을 연결해줍니다.

 

 

생성한 SQS 와 연결하기 위해 경로 재정의를 사용합니다.
Path Override 에 AccountID/SQS명을 입력하여 엔드포인트를 만들어 줍니다.

 

 

3.3. 매핑 템플릿 생성

 

API Gateway 에서는 매핑 템플릿을 통해 SQS 에 전달할 데이터를 편집해줄 수 있습니다.

통합 요청을 편집하여 헤더 부분에 다음과 같이 입력합니다.

Content-Type  : 'application/x-www-form-urlencoded'

 

 

그 후, 매핑 템플릿을 수정하여 다음과 같이 요청받은 Data 본문을 가공하여 SQS 로 전달할 수 있게 편집합니다.
매핑 템플릿은 Velocity 문법을 따릅니다.

 

매핑 템플릿 이해 - Amazon API Gateway

이 페이지에 작업이 필요하다는 점을 알려 주셔서 감사합니다. 실망시켜 드려 죄송합니다. 잠깐 시간을 내어 설명서를 향상시킬 수 있는 방법에 대해 말씀해 주십시오.

docs.aws.amazon.com

 

 

SQS 로 메시지를 전송할 때, MessageBody 에 data 부분과 MessageGroupId 를 구분하여 보내는 형식입니다.

 

Action=SendMessage&MessageBody={"data":$input.body}&MessageGroupId=$input.path('$.MessageGroupID')

 

 

SQS FIFO 에서는 MessageGroupID 가 같으면 같은 FIFO 대기열로 인식하여 순서를 엄격하게 지켜줍니다.
반대로, MessageGroupID 가 다르면 다른 FIFO 대기열로 처리됩니다.

 

 

3.4. API Gateway 테스트

 

API Gateway 구성이 잘 되었다면 테스트 화면에서 다음과 같은 데이터를 전송해볼 수 있습니다.

{
    "MessageGroupID": "1",
    "http_status": 200, 
    "Hello":"World"
}

 

 

실제 SQS 콘솔에서 메시지 확인이 가능합니다.

 

 

 

3.5. API Gateway 배포

 

테스트가 성공했다면, 실제 API Gateway 를 사용할 수 있도록 배포해보겠습니다.
우선, 만들어 둔 리소스를 통해 API 를 배포합니다.

 

그 후, API Gateway Stage 화면에서 실제 배포된 API Gateway 의 URL 을 확인할 수 있습니다.

 

해당 URL 을 통해 SQS 에 메시지를 전송하려면 curl 을 이용하면 됩니다.

API_URL=3yjcxrq2x5.execute-api.ap-northeast-2.amazonaws.com

curl -X POST https://${API_URL}/kimalarm/queue-1 \
    --header 'Content-Type: application/json' \
    -d "{\"MessageGroupID\": \"10\", \"http_status\":200}"

 

 

4. 응답 서버 생성

 

SQS 앞단에서 Data 를 전달받는 API Gateway 가 생성되었습니다.
이제, SQS 뒤에서 SQS 로 전송된 Data 를 실제로 응답해줄 서버를 생성해보겠습니다.


테스트 서버는 fastAPI 를 사용하여 구성할 예정입니다.

 

FastAPI

FastAPI framework, high performance, easy to learn, fast to code, ready for production

fastapi.tiangolo.com

 

 

4.1. EC2 생성

 

저는 반복 테스트를 할 때, 스크립트를 작성하여 사용하는 것을 선호합니다.
아래 코드를 user-data 에 넣어서 생성 시, 응답 서버에 필요한 모든 준비를 자동으로 마치게 됩니다.

 

인스턴스 타입은 Amazon Linux 로 진행했으며,
편의를 위해 Public Subnet 에 배포하고 fastAPI 실행을 위해 8000 포트가 필요합니다.

 

#!/bin/bash

sudo timedatectl set-timezone Asia/Seoul
sudo hostnamectl set-hostname fastapi

# PIP3 Install
sudo yum install -y python3-pip

# Package Install
pip3 install fastapi
pip3 install "uvicorn[standard]"

# Python Code
cat << EOT > /home/ec2-user/response.py
from fastapi import FastAPI, Request
from fastapi.logger import logger
from fastapi.responses import JSONResponse
import time
import json
from MakeTextFile import MakeTextFile

app = FastAPI()
text_file = MakeTextFile("logs/res_server.txt")
transSeq_log_file = MakeTextFile("logs/res_server_Seq.txt")

@app.post("/")
# async def root(info : Request):
async def root(info : Request):
    #time.sleep(2)
    print("Response Server : 8000")

    req_info = await info.json()

    # REQ_INFO Print
    json_string = json.dumps(req_info, indent=2)

    print(json_string)

    # Logging
    text_file.writeSaveln(json_string)
    #time.sleep(req_info['time_out'])
    transSeq_log_file.writeSaveln(req_info['data']['transSeq'])

    return {
        "status_code" : req_info['data']['http_status'],
        "content" : req_info
    }
EOT


# Make Log File Code
cat << EOT > /home/ec2-user/MakeTextFile.py
#!/usr/bin/python
# -*- coding:utf-8 -*-
import os

class MakeTextFile:
        fileName=""

        def __init__(self,fileName):
                directory_path = os.path.dirname(fileName)
                os.makedirs(directory_path, exist_ok=True)
                self.fileName = fileName

        def writeSave(self, str):
                if os.path.isfile(self.fileName):
                        f = open(self.fileName, 'a')
                else:
                        f = open(self.fileName, 'w')
                f.write(str)
                f.close()

        def writeSaveln(self, str):
                if os.path.isfile(self.fileName):
                        f = open(self.fileName, 'a')
                else:
                        f = open(self.fileName, 'w')
                f.write(str)
                f.write("\n")
                f.close()

        def writeFile(self, file_name):
                if os.path.isfile(self.fileName):
                        f = open(self.fileName, 'a')
                else:
                        f = open(self.fileName, 'w')
                f.write(str)
                f.write("\n")
                f.close()

        def closeFile(self):
                pass

        def printMessage(self, message):
                print(message)

if __name__ == "__main__":
        textFile = makeTextOneOpenFile("cli_test.txt")
        textFile.writeSave("hello")
EOT

# Response Server
cat << EOT > /home/ec2-user/response_server.sh
#!/bin/bash

uvicorn response:app --reload --host=0.0.0.0 --port 8000
EOT

chown ec2-user:ec2-user /home/ec2-user/*

 

5. Lambda 생성

 

Lambda 는 SQS 에서 데이터를 꺼내어 EC2 에 전송해주는 역할을 합니다.
해당 Lambda 를 Python 을 사용하여 간편하게 만들어 보겠습니다.

 

 

5.1. Lambda IAM Role 생성

 

Lambda 는 SQS 에 대한 접근 권한이 필요합니다.

다음 CLI 명령어를 통해 IAM Policy 와 Role 을 간편하게 만들 수 있습니다.

 

export AWS_PAGER=""
ACCOUNT_ID=$(aws sts get-caller-identity | jq .Account -r)
REGION=ap-northeast-2


# API Gateway Policy 생성
cat << EOT > lmd-sqs-policy.json
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "sqs:ReceiveMessage",
                "sqs:DeleteMessage",
                "sqs:GetQueueAttributes",
                "logs:CreateLogGroup",
                "logs:CreateLogStream",
                "logs:PutLogEvents",
                "sqs:SendMessage*"
            ],
            "Resource": "*"
        }
    ]
}
EOT

# IAM Policy 생성
aws iam create-policy \
    --policy-name lmd-sqs-policy \
    --policy-document file://lmd-sqs-policy.json

# IAM Role 생성
aws iam create-role \
  --role-name lmd-sqs-role \
  --assume-role-policy-document '{
    "Version": "2012-10-17",
    "Statement": [
      {
        "Effect": "Allow",
        "Principal": {
          "Service": "lambda.amazonaws.com"
        },
        "Action": "sts:AssumeRole"
      }
    ]
  }'

# Attach Policy
aws iam attach-role-policy \
  --role-name lmd-sqs-role \
  --policy-arn arn:aws:iam::${ACCOUNT_ID}:policy/lmd-sqs-policy

 

5.2. Lambda 생성

 

실제 Python 을 통해서 Lambda 를 생성해봅니다.
Lambda 에 대한 역할은 앞서 생성한 IAM Role 으로 지정해줍니다.

 

아래 코드를 붙여넣어줍니다.
SQS 로부터 전달받은 Data 를 Response 서버에 전송 후, 실패 처리 시 DLQ 로 보내는 로직입니다.

해당 코드는 requests 모듈이 필요합니다.
Lambda Layer 에 requests 모듈을 사전에 추가해야 합니다.

 

import json
import os
import requests
import boto3

DEAD_LETTER_SQS_QUEUE_URL = os.environ['DEAD_LETTER_SQS_QUEUE_URL']
RESPONSE_SERVER_IP = os.environ['RESPONSE_SERVER_IP']

def lambda_handler(event, context):
    sqs = boto3.client('sqs')
    sqs_batch_response = {}
    batch_item_failures = []

    for record in event['Records']:
        try:
            print("Record : ", record)

            # Message Body
            body = record["body"]
            print("MessageBody : ", body)

            # Request
            post_response = requests.post(
                f"http://{RESPONSE_SERVER_IP}:8000",
                headers={"Content-Type": "application/json"},
                data=body
            )

            # Byte Type to String
            json_str = post_response.content.decode('utf-8')

            # String Type to JSON
            json_data = json.loads(json_str)
            print("Response : ", json.dumps(json_data))
            # print(json_data["status_code"])
            # print(type(json_data["status_code"]))

            if json_data["status_code"] == 200:
                print("Success Response")

            else:
                print("DLQ URL", DEAD_LETTER_SQS_QUEUE_URL)
                sqs_response = sqs.send_message(
                    QueueUrl=DEAD_LETTER_SQS_QUEUE_URL,
                    MessageBody=json.dumps(json_data["content"]),
                    MessageGroupId=json_data["content"]["data"]["MessageGroupID"]
                )
                print("DLQ Response", sqs_response)

        except Exception as e:
            print("Error : ", e)

            batch_item_failures.append({"itemIdentifier": record['messageId']})

    sqs_batch_response["batchItemFailures"] = batch_item_failures

    print("batch_item_failures : ", batch_item_failures)
    print("sqs_batch_response : ", sqs_batch_response)

    return sqs_batch_response  # Echo back the first key value

 

Lambda 환경변수에 DLQ URL 과 응답 서버 IP 를 입력합니다.

 

5.3. SQS Trigger 연동

 

다음으로는 SQS 에 데이터가 적재되면 Lambda 가 동작할 수 있게 만드는 것입니다.
람다 화면에서 트리거 추가를 선택합니다.

 

 

SQS 트리거 추가

 

 

SQS 에서도 마찬가지로 함수 트리거가 설정되어 있는 지 확인합니다.

 

 

6. API Call 실행

 

드디어 모든 인프라 구성이 완료되었습니다.

실제로 API Gateway Call 을 통해 EC2 응답서버까지 데이터가 전송되는 지 확인할 차례입니다.

 

 

6.1. EC2 fastAPI 실행

 

 

앞서 생성한 EC2 에 접속하여 response_server.sh 를 실행해줍니다.

sh response_server.sh

 

 

6.2. API Call 실행

 

앞에서 API Gateway 를 테스트할 때 사용했던 curl 명령어를 활용하여 연속된 API Call 을 실행해보겠습니다.
다음과 같은 스크립트 파일을 작성 후, 실행해봅니다.

API URLAPI Stage 만 변경하여 사용하면 됩니다.

 

cat << EOT > api-call-test.sh
#!/bin/bash

MessageGroupId=10
transSeq=1
time_out=0.5
http_status=200

SITE_DOMAIN=3yjcxrq2x5.execute-api.ap-northeast-2.amazonaws.com

# transSeq 가 5보다 작을 때까지 반복
while [ \$transSeq -lt 5 ]; do
  date_time=\$(date '+%Y-%m-%d %H:%M:%S-%3N')

  echo transSeq : \${transSeq}

  echo "curl -X POST http://\${SITE_DOMAIN}/kimalarm/queue-1"
  echo "  --header 'Content-Type: application/json'"
  echo "  -d \"{\"MessageGroupID\": \"\${MessageGroupId}\", \"transSeq\":\"\${transSeq}\", \"time_out\":\${time_out}, \"http_status\":\${http_status}, \"createdAt\":\"\${date_time}\"}"

  echo "==================================================="

    curl -X POST https://\${SITE_DOMAIN}/kimalarm/queue-1 \
    --header 'Content-Type: application/json' \
    -d "{\"MessageGroupID\": \"\${MessageGroupId}\", \"transSeq\":\"\${transSeq}\", \"time_out\":\${time_out}, \"http_status\":\${http_status}, \"createdAt\":\"\${date_time}\"}"
  echo "==================================================="

  let transSeq=transSeq+1
done
EOT

 

스크립트는 아래와 같은 화면으로 출력되어야 합니다.
오류 발생 시 차이점을 찾아내면 됩니다.

 

해당 스크립트를 실행하면 다음과 같이 SQS 가 작동하여 EC2 서버에서 응답이 오는 것을 알 수 있습니다.

 

 

 

다음 포스팅에서는 사용하지 않았던 API GatewayX-API-Key 을 통해 권한 있는 사용자만 접근할 수 있도록 설정하고,

Route53 을 통해 도메인을 매핑시켜주는 작업을 해보겠습니다.

 

또한, 이번 포스팅에서는 해당 아키텍처를 콘솔에서 직접 생성했지만,

실제로는 많은 리소스를 직접 하나씩 만드는 것은 굉장히 힘든 일입니다.

 

AWS Serverless 아키텍처를 구현할 때 사용할 수 있는 AWS SAM 으로 현재 만든 리소스를 쉽게 배포하는 방법을 알아보겠습니다.

 

'AWS > Serverless' 카테고리의 다른 글

Jenkins 를 이용하여 AWS SAM 교차 계정 배포하기  (0) 2023.01.04
Contents

포스팅 주소를 복사했습니다