1. 컨슈머 주요 옵션
bootstrap.servers
카프카 클러스터에 처음 연결하기 위한 호스트 정보, 포트 정보로 구성된 리스트 정보를 나타낸다. {호스트:포트},{호스트:포트} 처럼 콤마(,)로 이어진 문자열을 적어주면 된다.
프로듀서 옵션과 마찬가지로 모든 브로커 주소를 적는 것을 권장한다.
fetch.min.bytes
한 번에 가져오는 최소 데이터 사이즈를 의미한다. 이보다 데이터 사이즈가 작을 경우에는 데이터가 누적될 때까지 기다린다.
group.id
컨슈머 그룹을 식별하는 식별자다.
enable.auto.commit
이 옵션이 켜져있을 때 주기적으로 백그라운드로 오프셋을 커밋한다.
auto.offset.reset
카프카에서 초기 오프셋, 현재 오프셋 정보가 없을 때 다음 옵션으로 리셋한다.
- earliest: 가장 초기의 오프셋값으로 설정
- latest: 가장 마지막의 오프셋값으로 설정
- none: 이전 오프셋을 찾지 못하면 에러 발생
fetch.max.bytes
한 번에 가져올 수 있는 최대 데이터 사이즈를 의미한다.
request.timeout.ms
요청에 대해 응답을 기다리는 최대 시간.
session.timeout.ms
컨슈머와 브로커 사이의 세션 타임 아웃 시간.
컨슈머가 그룹 코디네이터에게 하트비트를 보내지 않고 이 시간이 지나면 해당 컨슈머는 문제가 있다고 생각하고 컨슈머 그룹은 리밸런스를 시도한다.
너무 짧으면 의도치 않은 리밸런싱이 일어날 수 있고, 너무 길면 컨슈머가 문제있다고 발견하는 시간이 늦어진다.
보통 heartbeat.interval.ms 값과 같이 수정된다.
heartbeat.interval.ms
그룹 코디네이터에게 컨슈머가 KafkaConsumer.poll() 메소드로 얼마나 자주 하트비트를 보낼 것인 지 설정한다.
heartbeat.interval.ms 는 session.timeout.ms 보다 짧아야 한다. 그렇지 않으면 컨슈머가 살아있어도 의도치 않은 리밸런싱이 일어날 수 있다.
보통 1/3 수준으로 설정한다.
max.poll.records
한 번 컨슘에 대한 최대 레코드 갯수를 설정한다.
max.poll.interval.ms
컨슈머가 하트비트만 보내고 메시지를 가져가지 않는 경우가 있을 수 있다.
그래서 설정된 주기 내에 poll() 메소드를 호출하지 않으면 컨슈머에 문제가 있다고 판단하고, 컨슈머 그룹에서 제외 및 다른 컨슈머를 파티션에 할당한다.
auto.commit.interval.ms
주기적으로 최신 오프셋을 커밋하는 시간.
fetch.max.wait.ms
fetch.ms.bytes 에 의해 설정된 데이터보다 사이즈가 작은 경우 요청에 응답을 기다리는 최대 시간.
2. 파티션과 메시지 순서
결론부터 말하자면, 파티션 내에서는 메시지 순서가 보장이 된다. 반면, 서로 다른 파티션끼리는 메시지 순서가 보장이 되지 않는다. 다음 2가지 상황을 살펴보자.
2-1. 파티션이 여러 개일 때
프로듀서가 여러 개의 파티션에 여러 메시지를 프로듀스할 때, 파티션에는 아래와 같이 메시지가 담긴다.
컨슈머는 오프셋 기반으로 파티션에서 메시지를 컨슘한다. 그래서 C는 3보다 먼저 컨슘되고, A는 D보다 먼저 컨슘되지만, A 와 C 는 어떤 것이 먼저 컨슘될 지 보장하지 못 한다.
2-2. 파티션이 한 개일 때
프로듀서가 한 개의 파티션에 여러 메시지를 프로듀스할 때, 파티션에는 아래와 같이 메시지가 담긴다.
파티션이 한 개일 때에는 프로듀서가 메시지 보낸 순서대로 저장이 된다. 그렇기 때문에 컨슈머도 먼저 저장된 순서대로 컨슘한다.
따라서, 메시지 순서가 보장이 되어야 하는 경우에는 파티션 한 개로 처리해야 한다. 하지만, 파티션이 한 개면 처리량이 여러 개보다 낮아지기 때문에 메시지 순서가 중요하지 않다면 파티션을 여러 개 사용하여 처리량을 높일 수 있다.
3. 컨슈머 그룹
컨슈머 그룹은 메시지 갯수가 증가하여 지금보다 더 높은 처리량이 필요할 때 쉽게 확장하기 위해 만든 개념이다. 컨슈머 그룹에 컨슈머를 추가함으로써 안정적으로 처리량을 늘릴 수 있다.
뿐만 아니라, 컨슈머 그룹을 잘 이용하면 컨슈머 장애가 발생해도 빠르게 대응할 수 있다.
기존에 다음과 같이 하나의 토픽(여러 개의 파티션)에 하나의 컨슈머가 메시지를 컨슘하고 있다고 하자.
위와 같은 상황에서 파티션에 갑자기 메시지가 많이 전송됐다고 하자. 컨슈머가 처리하는 속도보다 메시지가 쌓이는 속도가 더 빠르면 처리되지 않거나 늦게 처리되어 문제가 발생할 수 있다. 컨슈머를 추가하면 이 문제를 해결할 수 있다.
컨슈머를 파티션 갯수만큼 충분히 늘려줬다.
동일한 컨슈머 그룹(같은 컨슈머 그룹 아이디를 가짐)에 컨슈머를 추가하면 파티션1, 파티션2 의 소유권이 컨슈머1, 컨슈머2 로 이동하게 되어 메시지를 컨슘하게 된다.
이렇게 파티션에 대한 컨슈머의 소유권이 바뀌는 상황을 **리밸런스**라고 한다.
컨슈머 리밸런스가 일어나면 파티션 하나마다 컨슈머 하나가 연결된다. 그리고 리밸런스가 끝나면 컨슈머들은 각자 담당하고 있는 파티션에서 메시지를 컨슘하기 시작한다.
다만, 컨슈머 리밸런스가 일어나는 과정에서는 컨슈머 그룹 전체가 일시적으로 사용할 수 없다.
이렇게 컨슈머 그룹의 리밸런스를 통해 컨슈머를 안전하게 추가 및 삭제할 수 있어 높은 가용성과 확장성을 확보할 수 있다. 그래서 결과적으로 더 높은 처리량을 제공할 수 있다.
하지만, 컨슈머만 계속 늘린다고 무조건 처리량이 증가하지 않는다. 위 상태에서 컨슈머를 하나 더 늘려보자.
이처럼 컨슈머를 하나 늘려도 컨슈머3은 연결될 파티션이 없기 때문에 계속해서 메시지를 컨슘하지 않고 노는 상태가 된다. 왜냐하면 하나의 파티션에는 하나의 컨슈머만 연결할 수 있기 때문이다.
만약 하나의 파티션에 여러 개의 컨슈머가 연결된다면 offset 기준으로 컨슈머는 메시지를 컨슘하기 때문에 순서를 안정적으로 보장할 수 없을 뿐더러 메시지가 소실될 가능성도 있다.
그래서 파티션 수만큼 컨슈머를 늘렸는 데도 메시지 쌓이는 속도를 따라잡지 못 한다면 컨슈머만큼 파티션도 늘려야 한다.
이번에는 잘 동작하던 컨슈머 그룹 내에서 컨슈머 하나가 다운되는 경우에 대해 살펴보자.
컨슈머 그룹 내에서 컨슈머가 살아있다라고 판단하는 방법은 주기적으로 하트비트를 보내는 것이다. 하트비트는 컨슈머가 poll 할 때와 메시지 컨슘 후 오프셋을 커밋할 때 보내게 된다.
만약, 일정 시간동안 하트비트를 보내지 않으면 컨슈머 그룹은 컨슈머가 살아있지 않다고 판단하여 리밸런스를 시작하게 된다.
컨슈머 3이 다운되면서 컨슈머3이 담당하던 파티션3은 컨슈머2가 처리하게 된다. 그래서 컨슈머2는 두 개의 파티션에서 메시지를 처리한다.
컨슈머 한 개가 두 개의 파티션을 처리하다보니 메시지가 다른 파티션에 비해 느리게 처리될 수는 있지만 전체적으로 안정적으로 동작한다.
4. 커밋과 오프셋
컨슈머 그룹에서 각각의 컨슈머들은 자신에게 할당된 파티션에서 어디까지 가져갔는 지, 위치 정보(오프셋)를 기록하고 있다.
카프카는 각 컨슈머 그룹의 파티션별로 오프셋 정보를 토픽(__consumer_offsets) 에 저장하고 있다.
(예전 버전 카프카는 주키퍼에 저장했었다.)
만약에 커밋된 오프셋이 실제 마지막으로 처리한 오프셋보다 작으면 그 오프셋 사이에 위치한 메시지는 중복으로 처리될 것이고, 커밋된 오프셋이 실제 마지막으로 처리한 오프셋보다 크면 그 오프셋 사이에 위치한 메시지는 컨슈머가 처리하지 못해서 소실될 것이다.
이렇듯 카프카에서는 커밋이 중요하다.
4-1. 자동 커밋
사용자가 직접 오프셋을 관리하지 않도록 자동 커밋하는 방식이 있다.
컨슈머 옵션 enable.auto.commit=true 로 설정하면 5초마다 컨슈머는 poll() 을 호출할 때 가장 마지막 오프셋을 커밋한다.
5초는 기본값이며 auto.commit.interval.ms 설정으로 조정이 가능하다.
하지만, 메시지를 auto.commit.interval.ms 설정값(기본값: 5초) 보다 더 많은 시간이 소요되다가 오류가 나면 메시지가 처리되지 않았는 데, 자동 커밋되어 메시지가 손실이 날 수 있다.
4-2. 수동 커밋
그래서 메시지 손실 확률을 줄이기 위해 메시지가 완전히 처리되었을 때에만 수동으로 사용자가 직접 수동으로 커밋해주는 방식이 있다.
컨슈머 옵션 enable.auto.commit=false 로 설정하고, 비지니스 로직이 끝난 후에 수동 커밋 함수를 호출한다.
(ack.acknowledge() 혹은 commitSync())
ref
- 카프카, 데이터 플랫폼의 최강자 / 고승범 저 공용준 공저
- https://camel-context.tistory.com/54
'카프카' 카테고리의 다른 글
카프카 프로듀서 (0) | 2025.03.20 |
---|---|
카프카 디자인 (0) | 2025.02.11 |
카프카란 무엇인가? (0) | 2024.12.27 |