안녕세계
[Redis] 지연 큐(Delay Queue)란? 본문
[Redis] 지연 큐(Delay Queue)란?
Junhong Kim 2025. 7. 19. 15:08커머스 서비스를 이용하다보면 "주문 후 10분 이내 미결제시 자동 취소됩니다." 와 같은 안내를 볼 수 있습니다. 이처럼 이미 발생한 작업을 후 일정 시간 뒤에 후 처리하는 방법으로 지연 큐(DelayQueue)를 활용할 수 있으며, 대표적인 구현 방식으로 Java 표준 DelayQueue, Amazon SQS DelayQueue, Redis 기반 DelayQueue가 있습니다. 이 글에서는 각 구현 방식에 대해 간략히 살펴보고, 이 중 Redis 기반 DelayQueue의 구체적인 사용 예시를 공유하겠습니다.
Java 표준 Delay Queue
Java 표준 java.util.concurrent.DelayQueue는 JDK에 내장된 블로킹 지연 큐입니다. DelayQueue는 Delayed 인터페이스를 구현한 객체만 저장할 수 있습니다. 내부적으로 PriorityQueue를 사용해 남은 지연 시간이 짧은 순서대로 요소를 관리하며, 지연 시간이 만료된 항목만 take()로 꺼낼 수 있습니다. 외부 라이브러리 없이 바로 사용할 수 있다는 장점이 있지만, 단일 JVM 안에서만 동작하기 때문에 분산 환경에서는 적용하기 어렵다는 한계가 있습니다.
블로킹 큐 (BlockingQueue)
- 항목이 큐에 들어오는 즉시 소비자(take())가 꺼낼 수 있습니다. 큐가 비어 있으면 항목이 들어올 때까지 소비자는 대기합니다.
블로킹 지연 큐 (DelayQueue)
- 항목이 큐에 들어왔더라도, 각 항목이 지연 시간(getDelay())만큼 지나야만 소비자에게 노출됩니다. 지연 시간이 남아 있으면 소비자는 대기하다가, 그 항목의 지연이 만료된 시점에만 꺼낼 수 있습니다.
// 1) Delayed 인터페이스 구현
class OrderDelay(
private val orderId: String,
delay: Long,
unit: TimeUnit,
) : Delayed {
private val expireTime = System.currentTimeMillis() + unit.toMillis(delay)
// 남은 지연 시간
override fun getDelay(unit: TimeUnit): Long =
unit.convert(expireTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS)
// 지연 순서
override fun compareTo(other: Delayed): Int =
getDelay(TimeUnit.MILLISECONDS)
.compareTo(other.getDelay(TimeUnit.MILLISECONDS))
fun getOrderId() = orderId
}
fun main() {
// 2) DelayQueue 생성, 10분 뒤 취소되는 작업 등록
val queue = DelayQueue<OrderDelay>()
queue.offer(OrderDelay("order-123", 10, TimeUnit.MINUTES))
// 3) 만료된 작업을 꺼내서 처리
thread(start = true) {
while (true) {
val task = queue.take()
cancelOrder(task.getOrderId())
}
}
}
AWS SQS DelayQueue
Amazon SQS의 DelayQueue는 메시지를 전송한 시점부터 최대 900초(15분) 동안 소비자에게 숨겨두고, 지정된 시간이 지난 후에만 수신할 수 있도록 하는 기능입니다. 큐 자체에 기본 지연 시간을 설정할 수도 있고, 표준 큐에서는 메시지별로 개별 지연 시간을 지정할 수도 있습니다. 이렇게 하면 별도의 스케줄러 없이도 간단히 지연 메시지 처리가 가능하며, AWS 관리형 서비스의 안정성과 확장성을 그대로 누릴 수 있습니다.
다만 장기(15분 초과) 지연이 필요할 경우 EventBridge Scheduler 같은 별도 서비스 연동이 필요하고, FIFO 큐는 큐 생성 시 설정한 DelaySeconds만 적용되어 메시지별 지연 지정이 불가능합니다. 또한 표준 큐는 메시지 순서를 보장하지 않으므로, 지연된 메시지가 처리 순서와 다르게 소비될 수 있다는 점을 유의해야 합니다.
DelaySeconds는 Visibility Timeout 차이
- DelaySeconds는 메시지가 최초 수신 가능 시점을 미루는 것
- Visibility Timeout은 이미 수신된 메시지를 다시 노출하지 않도록 숨기는 시간
fun main() {
// 1) SQS 클라이언트 생성
val sqsClient = SqsClient.builder()
.region(Region.AP_NORTHEAST_2)
.build()
// 2) DelaySeconds 기본 지연(10분) 설정으로 큐 생성
val createResponse = sqsClient.createQueue(
CreateQueueRequest.builder()
.queueName("delayed-orders")
.attributes(mapOf(QueueAttributeName.DELAY_SECONDS to "600"))
.build()
)
val queueUrl = createResponse.queueUrl()
// 3) 개별 메시지에 delaySeconds=300(5분) 지정하여 전송
val sendResponse = sqsClient.sendMessage(
SendMessageRequest.builder()
.queueUrl(queueUrl)
.messageBody("주문ID: 12345")
.delaySeconds(300)
.build()
)
println("지연 메시지 전송 완료, MessageId=${sendResponse.messageId()}")
// 4) 지연 시간이 지난 후 메시지 수신
val receiveRequest = ReceiveMessageRequest.builder()
.queueUrl(queueUrl)
.maxNumberOfMessages(5)
.waitTimeSeconds(20)
.build()
val messages = sqsClient.receiveMessage(receiveRequest).messages()
for (msg in messages) {
println("수신된 메시지: ${msg.body()}")
// 메시지 처리 후 삭제
sqsClient.deleteMessage(
DeleteMessageRequest.builder()
.queueUrl(queueUrl)
.receiptHandle(msg.receiptHandle())
.build()
)
}
sqsClient.close()
}
Redis 기반 DelayQueue
Redisson의 RDelayedQueue는 Redis의 Sorted Set(ZSET) 과 백그라운드 스케줄러를 결합한 분산 지연 큐입니다. 작업을 등록하면 만료 시각을 score로 하는 ZSET에 저장되고, 별도의 스레드로 동작하는 스케줄러가 주기적으로 이 ZSET을 확인해 만료된 항목만 골라내어 실제 처리 큐인 RBlockingQueue로 이동시킵니다. 덕분에 여러 서버 인스턴스가 같은 Redis를 공유해도, 지정된 시간이 지난 작업은 중복 없이 자동으로 꺼내져 안정적으로 처리됩니다.
RBlockingQueue?
- RBlockingQueue는 Redisson이 제공하는 Redis 기반의 분산 블로킹 큐로, 내부적으로 Redis의 리스트(List) 자료구조와 BLPOP/BRPOP 블로킹 명령어를 활용해 구현되어 있습니다. 생산자가 RPUSH로 리스트에 요소를 추가하면, 소비자는 BLPOP 또는 BRPOP 명령으로 해당 리스트를 감시하다가 요소가 들어올 때까지 대기했다가 꺼내서 처리합니다. 이 과정은 Redis 서버 차원에서 요청을 순차적으로 처리하기 때문에, 여러 애플리케이션 인스턴스가 동일한 Redis를 공유하더라도 메시지 중복 소비 없이 안전하게 분산 처리가 가능합니다.
// 1) RedissonClient 설정
@Configuration
class RedisConfig {
@Bean
fun redissonClient(): RedissonClient {
val config = Config().apply {
useSingleServer()
.setAddress("redis://127.0.0.1:6379")
.setPassword("password")
}
return Redisson.create(config)
}
}
---
// 2) 지연 큐 서비스
@Service
class OrderDelayService(
private val redissonClient: RedissonClient
) {
// 실제 처리할 작업 큐
private val processingQueue: RBlockingQueue<OrderCancelTask> =
redissonClient.getBlockingQueue("order:cancel:processing")
// 지연 작업을 임시 보관할 큐 (인자로 processingQueue 받음!!)
private val delayedQueue: RDelayedQueue<OrderCancelTask> =
redissonClient.getDelayedQueue(processingQueue)
/**
* 주문 취소 작업을 delayMinutes 분 뒤에 실행하도록 예약
*/
fun scheduleCancel(orderId: String, delayMinutes: Long) {
delayedQueue.offer(OrderCancelTask(orderId), delayMinutes, TimeUnit.MINUTES)
}
/**
* 예약된 주문 취소 작업을 취소
*/
fun cancelScheduled(orderId: String): Boolean =
delayedQueue.remove(OrderCancelTask(orderId))
@PostConstruct
fun startConsumer() {
Executors.newSingleThreadExecutor().submit {
while (!Thread.currentThread().isInterrupted) {
// 지연 큐에서 만료된 작업이 processingQueue로 이동될 때마다 cancelOrder() 호출
val task = processingQueue.take()
cancelOrder(task.orderId)
}
}
}
private fun cancelOrder(orderId: String) {
// TODO: 실제 주문 취소 로직
println("주문 자동 취소 실행: $orderId")
}
@PreDestroy
fun cleanup() {
redissonClient.shutdown()
}
}
data class OrderCancelTask(val orderId: String)
---
// 3) 주문 서비스
@Service
class OrderService(
private val orderRepo: OrderRepository,
private val orderDelayService: OrderDelayService
) {
// 주문 완료시 호출
fun createOrder(request: CreateOrderRequest) {
val order = orderRepo.save(request.toEntity())
// 결제 미완료 시 30분 뒤 자동 취소 예약
orderDelayService.scheduleCancel(order.id.toString(), 30)
}
// 결제 완료시 호출
fun paymentCompleted(orderId: String) {
// 결제 완료 시 예약된 취소 작업 취소
orderDelayService.cancelScheduled(orderId)
}
}
RedisConfig에서 RedissonClient를 스프링 빈으로 등록한 뒤, OrderDelayService에서 RDelayedQueue와 RBlockingQueue를 생성하여 지연 작업을 등록/처리하도록 구성했습니다. 그 위에 OrderService에서는 주문이 생성될 때 scheduleCancel()을 호출해 30분 뒤 자동 취소를 예약하고, 결제 완료 시에는 cancelScheduled()를 이용해 해당 예약을 취소하도록 구현합니다.
마무리
세 가지 지연 큐 방식은 장단점을 가지고 있으므로, 서비스 요구사항과 운영 환경에 맞춰 선택하는 것이 중요합니다. 단일 JVM 환경에서 가볍고 빠른 구현이 필요하다면 Java 표준 DelayQueue가 적합하며, AWS 관리형 서비스의 안정성과 확장성을 원한다면 최대 15분 지연까지 지원하는 Amazon SQS DelayQueue를 고려할 수 있습니다. 다수 인스턴스 간 분산 처리와 정밀한 지연 제어가 필요하다면 Redis 기반 RDelayedQueue가 좋은 선택이 될 것으로 생각듭니다.
'Database > Redis' 카테고리의 다른 글
| [Redis] 분산 락은 어떻게 걸어야할까? (0) | 2024.02.18 |
|---|---|
| [Redis] 분산 락과 상태 키를 사용한 동시성 이슈 제어 (0) | 2024.02.04 |
| [Redis] 분산 락 (feat. Redisson) (0) | 2024.01.21 |