본문 바로가기

Server/Kafka

[Kafka] Producer acks, Consumer AckMode 그리고 Listener

반응형

Kafka Producer는 메시지가 기록될 Partition의 Leader Broker를 찾아서 해당 Leader에게 직접 메시지를 전송합니다. Follower Broker는 Producer로부터 메시지를 직접 받지 않습니다. Follower는 Leader Broker로부터 데이터를 복제합니다.

 

이 구조를 이해하면 acks, ISR(In-Sync Replica), min.insync.replicas, enable.idempotence, max.in.flight.requests.per.connection 같은 설정이 왜 필요한지 더 쉽게 이해할 수 있습니다. 이 설정들은 단순히 전송 속도만 정하는 옵션이 아니라, 장애 상황에서 메시지를 어디까지 안전하게 저장할 것인지, 중복 기록을 어떻게 막을 것인지, 파티션 내 순서를 어떻게 유지할 것인지와 연결됩니다.

Producer는 Leader Broker에게 메시지를 보낸다

Producer는 처음부터 모든 Partition의 Leader 정보를 알고 있지 않습니다. 먼저 bootstrap.servers에 지정된 Broker 중 하나에 연결해서 클러스터 메타데이터를 받아옵니다. 이 메타데이터에는 Topic, Partition, 각 Partition의 Leader Broker 정보 등이 포함됩니다.

 

Producer는 이 정보를 바탕으로 실제 메시지가 들어갈 Partition의 Leader Broker에게 연결을 맺고 메시지를 전송합니다. 예를 들어 특정 Record가 order-events Topic의 partition 3으로 라우팅된다면, Producer는 partition 3의 Leader Broker에게 메시지를 보냅니다.

Producer
   ↓
Partition Leader Broker
   ↓
Follower Broker들이 Leader로부터 복제

 

만약 장애나 리더 선출로 인해 Partition의 Leader Broker가 바뀌면 Producer는 일시적으로 NotLeaderForPartition 계열의 오류를 만날 수 있습니다. 이후 메타데이터를 갱신하고, 새 Leader Broker를 찾아 재시도하는 흐름으로 동작합니다.

acks(acknowledgments, 확인 응답)

acks는 Producer가 메시지 전송을 성공으로 판단하기 위해 Kafka Broker로부터 어떤 수준의 저장 확인 응답을 받을지 정하는 설정입니다. Producer가 메시지를 보내면 partition leader broker가 이를 받아 log에 기록하고 Producer에게 응답을 반환합니다. 이때 acks 값에 따라 Producer가 응답을 기다리는 기준이 달라집니다.

여기서 주의할 점은 Producer의 acks 와 Consumer의 ack는 전혀 다른 개념이라는 점입니다. Producer ack는 Kafka가 Producer에게 “메시지가 저장됐다”고 응답하는 것이고, Consumer ack는 Consumer가 “처리를 완료했으니 offset을 commit해도 된다”고 알리는 것입니다.

acks=0

acks=0은 Producer가 Broker의 응답을 기다리지 않는 방식입니다.

 

가장 빠르지만 Broker가 메시지를 실제로 받았는지 Producer가 확인하지 않습니다. 따라서 네트워크 장애, Broker 장애, Leader 변경 같은 상황에서 메시지가 유실되어도 Producer가 이를 감지하기 어렵습니다. 처리량은 높일 수 있지만, 안정성이 중요한 메시지에는 적합하지 않습니다.

acks=1

acks=1은 Leader Broker가 메시지를 자신의 로컬 로그에 기록하면 성공으로 보는 방식입니다.
(Follower Replica의 복제 완료까지 기다리지는 않습니다.)

 

이 설정은 성능과 안정성 사이의 절충안입니다. 다만 Leader가 메시지를 로컬 로그에 기록하고 Producer에게 성공 응답을 보낸 직후, Follower들이 아직 복제하지 못한 상태에서 Leader가 장애로 내려가면 문제가 생길 수 있습니다. 이때 해당 메시지를 가지고 있지 않은 Follower가 새 Leader로 선출되면, Producer 입장에서는 이미 성공 응답을 받은 메시지라도 Kafka 로그에서는 유실될 수 있습니다.

1. Producer → Leader에게 메시지 전송
2. Leader 로컬 로그 기록
3. Leader → Producer에게 성공 응답
4. Follower 복제 전 Leader 장애
5. 메시지를 가지지 않은 Follower가 새 Leader로 선출
6. 메시지 유실 가능

acks=all (또는 -1)

acks=all은 Leader Broker뿐 아니라 현재 ISR에 포함된 Replica들이 메시지를 기록해야 성공으로 보는 방식입니다.

 

이 설정은 가장 강한 내구성을 제공합니다. 대신 Leader가 ISR Replica들의 응답을 기다려야 하므로 acks=0이나 acks=1보다 지연이 늘어날 수 있습니다. 다만 acks=all만 단독으로 보면 부족합니다. 실제 운영에서는 min.insync.replicas와 함께 이해해야 합니다.

ISR은 Leader와 동기화된 Replica 집합이다

ISR은 In-Sync Replicas의 약자로, 특정 Partition의 Leader replica와 동기화 상태를 유지하고 있는 replica들의 집합입니다.

ISR에는 Leader replica 자신이 포함되고, Leader를 일정 기준 내에서 따라오고 있는 Follower replica들도 포함됩니다.

여기서 replication.factor의 개수를 헷갈리기 쉽습니다. replication.factor=3은 Leader를 제외한 Follower가 3개라는 뜻이 아닙니다. Leader replica를 포함해 총 3개의 replica를 가진다는 의미입니다.

replication.factor=3

Broker 1: Leader replica
Broker 2: Follower replica
Broker 3: Follower replica

 

즉, replication.factor=3이면 Leader 1개와 Follower 2개로 구성됩니다.

replication.factor=3
= Leader 1개 + Follower 2개
= 총 replica 3개

 

Follower replica가 일정 시간 동안 Leader에게 fetch 요청을 보내지 못하거나, Leader의 Log End Offset까지 충분히 따라오지 못하면 Leader는 해당 Follower를 ISR에서 제거할 수 있습니다.

 

즉, ISR에 포함되어 있다는 것은 해당 replica가 Leader 장애 시 데이터를 잃지 않고 새 Leader 후보가 될 수 있을 만큼 최신 상태를 유지하고 있다는 의미로 볼 수 있습니다. 다만 ISR은 고정된 집합이 아닙니다. Broker 장애, 네트워크 지연, 디스크 지연 등에 따라 계속 바뀔 수 있습니다.

정상 상태

전체 replica = [Leader, Follower 1, Follower 2]
현재 ISR     = [Leader, Follower 1, Follower 2]
Follower 2 지연 상태

전체 replica = [Leader, Follower 1, Follower 2]
현재 ISR     = [Leader, Follower 1]

 

여기서 중요한 점은 Follower 2가 ISR에서 빠졌다고 해서 replica 자체가 사라진 것은 아니라는 점입니다. 다만 현재 Leader와 충분히 동기화되어 있지 않기 때문에 ISR에서 제외된 상태입니다.

min.insync.replicas는 최소 ISR 개수 조건이다

min.insync.replicasacks=all로 메시지를 전송할 때, 쓰기 성공으로 인정하기 위해 필요한 최소 ISR replica 개수를 정하는 설정입니다. 정확히는 “현재 ISR이 최소 몇 개 이상 남아 있어야 쓰기 요청을 받을 것인가”를 제어하는 옵션입니다. 예를 들어 다음과 같이 설정했다고 가정해보겠습니다.

replication.factor=3
min.insync.replicas=2
acks=all

이때 전체 replica는 Leader를 포함해 총 3개입니다. 그리고 min.insync.replicas=2이므로 ISR에는 최소 2개의 replica가 남아 있어야 쓰기 요청을 성공으로 처리할 수 있습니다. 여기서 2개에는 Leader도 포함됩니다.

replication.factor=3
= Leader 1개 + Follower 2개

min.insync.replicas=2
= Leader 포함 최소 2개의 ISR replica 필요

 

현재 ISR이 다음과 같다면 쓰기가 가능합니다.

현재 ISR = [Leader, Follower 1, Follower 2]
ISR 개수 = 3
min.insync.replicas=2 조건 만족
→ 쓰기 가능

 

Follower 하나가 지연되어 ISR에서 빠진 상태라도, Leader와 Follower 하나가 ISR에 남아 있다면 여전히 쓰기가 가능합니다.

현재 ISR = [Leader, Follower 1]
ISR 개수 = 2
min.insync.replicas=2 조건 만족
→ 쓰기 가능

 

하지만 Leader만 ISR에 남은 상태라면 조건을 만족하지 못합니다.

현재 ISR = [Leader]
ISR 개수 = 1
min.insync.replicas=2 조건 미달
→ 쓰기 실패 가능

 

이 경우 Kafka는 쓰기 요청을 성공으로 처리하지 않고, Producer는 NOT_ENOUGH_REPLICAS 또는 NOT_ENOUGH_REPLICAS_AFTER_APPEND 계열의 오류를 받을 수 있습니다.

정리하면 acks=all은 현재 ISR에 속한 replica들의 기록 응답을 기다리는 설정이고, min.insync.replicas는 쓰기를 허용하기 위한 최소 ISR 개수 조건입니다.

acks=all
= 현재 ISR에 속한 replica들이 기록해야 성공

min.insync.replicas
= 쓰기를 허용하기 위해 필요한 최소 ISR 개수

 

운영에서는 보통 다음과 같은 조합을 많이 사용합니다.

replication.factor=3
min.insync.replicas=2
acks=all

 

이 조합은 Broker 하나가 장애 나거나 ISR에서 빠져도 Leader와 다른 Follower 하나가 살아 있으면 쓰기를 계속할 수 있습니다. 반대로 Leader만 남은 위험한 상태에서는 쓰기를 막아 데이터 유실 가능성을 줄일 수 있습니다.

Producer ack와 Consumer ack는 다르다

Kafka Producer의 acks와 Spring Kafka Consumer의 AckMode가 헷갈릴 수 있습니다. 둘은 완전히 다른 개념입니다.

Producer ack는 Kafka가 Producer에게 “메시지가 저장됐다”고 응답하는 것입니다. 즉, 메시지 발행 성공 여부와 관련됩니다.

 

반면 Consumer ack는 Consumer 애플리케이션이 Spring Kafka listener container에게 “처리를 완료했으니 offset을 commit해도 된다”고 알리는 것입니다. 즉, 메시지 소비 위치 저장과 관련됩니다. Offset commit은 Consumer Group이 다음에 읽을 위치를 Kafka의 __consumer_offsets에 저장하는 행위입니다.

 

Producer ack 흐름

Kafka가 Producer에게 메시지 저장 성공 여부를 응답하는 것

Producer
   |
   | send(record)
   v
Kafka Broker
   |
   | 메시지 저장
   | - acks=0   : 응답 기다리지 않음
   | - acks=1   : leader 기록 후 응답
   | - acks=all : ISR 복제까지 확인 후 응답
   v
Producer

Consumer ack 흐름

Consumer가 처리를 완료했으니 offset commit을 진행해도 된다고 알리는 것
(Offset commit: Consumer Group의 다음 읽을 위치를 Kafka에 저장하는 것)

Kafka Broker
   |
   | poll(record)
   v
Consumer
   |
   | 비즈니스 로직 처리 완료
   v
Consumer ack
   |
   | offset commit
   v
Kafka Broker
   |
   | 처리 완료 offset 저장
   v
__consumer_offsets

 

Spring Kafka의 AckMode와 Listener

Spring Kafka를 사용하면 Consumer가 처리한 offset을 언제 commit할지 AckMode로 제어할 수 있습니다.
다만
AckMode는 Kafka Consumer의 자동 커밋이 꺼져 있을 때 의미가 있습니다.

 

enable.auto.commit=true이면 Kafka Consumer 클라이언트가 offset을 자동으로 commit합니다. 이때 commit 주기는 auto.commit.interval.ms 설정을 따르며, 기본값은 5초입니다. 즉, 애플리케이션의 비즈니스 로직이 실제로 성공했는지와 관계없이 Kafka Consumer 클라이언트가 설정된 주기에 따라 offset을 commit합니다.

 

enable.auto.commit=false이면 Kafka Consumer 클라이언트의 자동 commit은 비활성화되고, Spring Kafka listener container가 offset commit을 제어합니다. 이때 commit 시점은 listener container의 AckMode에 따라 결정됩니다. Spring Kafka 2.3부터는 enable.auto.commit을 명시적으로 설정하지 않으면 기본적으로 false로 설정되며, listener container의 기본 AckModeBATCH입니다.

 

AckMode 종류는 다음과 같습니다.

AckMode commit 시점
RECORD record 하나를 listener가 처리하고 반환할 때마다 commit
BATCH (기본 값) poll() 한 번으로 가져온 records가 모두 처리된 뒤 commit
TIME poll 결과가 모두 처리되고, 마지막 commit 이후 ackTime이 지났을 때 commit
COUNT poll 결과가 모두 처리되고, 마지막 commit 이후 처리한 record 수가 ackCount에 도달했을 때 commit
COUNT_TIME ackCount에 도달하거나, 마지막 commit 이후 ackTime이 지났을 때 commit
MANUAL listener에서 acknowledge()를 호출하면 commit 대상이 되고,
실제 commit은 BATCH와 유사한 방식으로 처리
MANUAL_IMMEDIATE listener에서 acknowledge()를 호출하는 즉시 commit 시도

 

여기서 함께 구분해야 할 개념이 listener 형태입니다. Spring Kafka에서 listener 형태와 AckMode는 서로 다른 개념입니다. listener 형태는 @KafkaListener 메서드가 메시지를 어떤 형태로 전달받을지를 결정하고, AckMode는 처리된 offset을 언제 commit할지를 결정합니다.

 

listener는 크게 record listener와 batch listener로 나눌 수 있습니다. record listener는 메시지를 1건씩 전달받는 방식이고, batch listener는 여러 메시지를 List로 한 번에 전달받는 방식입니다.

// record listener
@KafkaListener(topics = ["order-created"])
fun listen(message: String) {
    // 메시지 1건 처리
}
// batch listener
@KafkaListener(topics = ["order-created"])
fun listen(messages: List<String>) {
    // 여러 메시지 처리
}

 

record listener를 사용하더라도 Kafka Consumer는 poll() 한 번에 여러 record를 가져올 수 있습니다. 이때 한 번에 반환할 수 있는 최대 record 수는 max.poll.records로 제어되며, 기본값은 500입니다.

 

Spring Kafka에서 record listener와 batch listener의 차이는 poll() 단위가 아니라 listener 메서드에 전달되는 방식의 차이입니다. record listener를 사용한다고 해서 Kafka Consumer가 broker에서 record를 1건씩 가져오는 것은 아닙니다. Consumer는 poll() 한 번으로 여러 record를 반환받을 수 있고, 그 최대 개수는 max.poll.records로 제한됩니다. 

 

Spring Kafka는 record listener인 경우 이 record들을 listener 메서드에 한 건씩 전달하고, batch listener인 경우 여러 record를 List 또는 ConsumerRecords 형태로 한 번에 전달합니다. 따라서 batch listener가 한 번에 전달받을 수 있는 record 수의 상한은 max.poll.records이며, 실제 전달 개수는 해당 poll()에서 반환된 record 수에 따라 달라집니다.

KafkaConsumer.poll()
→ record 100개 가져옴

record listener
→ listen(record 1)
→ listen(record 2)
→ ...
→ listen(record 100)

batch listener
→ listen(List<record> 100개)

 

Spring Kafka의 기본 AckModeBATCH입니다. 여기서 BATCH는 batch listener만을 의미하지 않습니다. AckMode.BATCH는 Kafka Consumer가 한 번의 poll()로 가져온 record 묶음이 모두 처리된 뒤 offset을 commit한다는 뜻입니다.

 

따라서 record listener를 사용하더라도 AckMode.BATCH에서는 record 1건마다 commit하지 않습니다. poll()로 가져온 record들이 모두 처리된 뒤 offset commit이 수행됩니다.

 

반면 AckMode.RECORD는 record 1건이 처리될 때마다 offset을 commit합니다. 다만 RECORD를 사용해도 Consumer가 record를 1건씩 가져오는 것은 아닙니다. 가져오는 동작은 여전히 poll() 기준이고, commit 시점만 record 단위로 달라집니다.

poll()로 record 100개 가져옴

AckMode.BATCH
→ 100개 모두 처리 후 commit

AckMode.RECORD
→ record 1 처리 후 commit
→ record 2 처리 후 commit
→ ...
→ record 100 처리 후 commit

 

Listener와 AckMode 조합별 동작 방식은 다음과 같습니다.

Listener 형태 AckMode 동작
record listener BATCH record를 1건씩 전달받지만, poll 결과 전체 처리 후 commit
record listener RECORD record를 1건씩 전달받고, 각 record 처리 후 commit
batch listener BATCH record 목록을 한 번에 전달받고, 목록 처리 후 commit
batch listener MANUAL record 목록을 한 번에 전달받고, 직접 acknowledge() 호출
batch listener RECORD ⚠️ 적합하지 않음.
batch listener는 poll 결과 전체를 한 번에 받기 때문에
AckMode.RECORD를 지원하지 않음

 

BATCH는 처리량 측면에서 유리하지만, poll 결과 중 일부만 처리한 상태에서 장애가 발생하면 이미 처리한 메시지도 다시 전달될 수 있습니다. 예를 들어 poll()로 100개의 record를 가져왔고, 앞의 10개를 DB에 반영한 뒤 애플리케이션이 종료되면 아직 poll 결과 전체가 처리되지 않았기 때문에 offset commit은 수행되지 않습니다. Consumer가 재시작되면 마지막으로 commit된 offset부터 다시 읽게 되고, 이미 DB에 반영된 10개 record도 다시 전달될 수 있습니다.

 

RECORD는 record마다 commit하므로 장애 발생 시 재처리 범위를 줄이는 데 도움이 됩니다. 하지만 DB 반영은 성공했고 offset commit 직전에 장애가 발생하면 해당 record는 다시 전달될 수 있습니다. 즉, RECORD를 사용해도 중복 처리가 완전히 사라지는 것은 아닙니다.

 

결국 어떤 AckMode를 사용하더라도 Consumer는 동일한 메시지를 다시 받을 수 있습니다. 따라서 Spring Kafka Consumer는 offset commit 전략과 별개로 멱등하게 설계해야 합니다. DB unique key, 처리 이력 테이블, 상태 기반 업데이트 등을 활용해 동일한 메시지가 여러 번 전달되어도 결과가 깨지지 않도록 구성하는 것이 중요합니다.

결론

정리하면 Kafka Producer는 메시지를 Partition의 Leader Broker에게 전송하고, Follower는 Leader로부터 데이터를 복제합니다. 따라서 메시지 저장 안정성은 Producer가 어떤 수준의 응답을 기다릴 것인지, 그리고 현재 ISR에 충분한 replica가 남아 있는지에 따라 달라집니다.

 

acks=all은 현재 ISR에 포함된 replica들의 기록까지 확인한 뒤 성공으로 응답받는 설정이고, min.insync.replicas는 쓰기를 허용하기 위한 최소 ISR 개수 조건입니다. 운영 환경에서는 보통 replication.factor=3, min.insync.replicas=2, acks=all 조합을 사용해 Leader만 남은 위험한 상태에서는 쓰기를 막고, 메시지 유실 가능성을 줄입니다.

 

또한 Producer의 acks와 Consumer의 ack는 서로 다른 개념입니다. Producer acks는 메시지 발행 성공 여부와 관련되고, Consumer ack는 처리 완료 후 offset commit 시점과 관련됩니다. Spring Kafka에서는 AckMode로 offset commit 시점을 제어할 수 있지만, 어떤 AckMode를 사용하더라도 장애 상황에서는 동일한 메시지가 다시 전달될 수 있습니다.

 

따라서 Kafka 설정만으로 중복 처리를 완전히 막을 수는 없습니다. Producer 쪽에서는 acks=all, min.insync.replicas, enable.idempotence 등을 통해 유실과 중복 기록 가능성을 줄이고, Consumer 쪽에서는 DB unique key, 처리 이력 테이블, 상태 기반 업데이트 같은 애플리케이션 레벨의 멱등성 설계가 함께 필요합니다.

반응형