카프카 조금 아는척 하기

카프카 조금 아는척 하기 강의 자료를 정리합니다.

최범균님의 카프카 조금 아는 척하기 유튜브 내용을 보고 정리합니다.


카프카란?

고성능 분산 이벤트 스트리밍 플랫폼

기본구조

image

카프카 클러스터

메세지를 저장하는 역할을 하는 저장소

  • N개의 브로커를 지니고 있음. 브로커는 각각의 하나씩의 서버라고 생각하면 됩니다.
  • 이곳에서 메세지를 분산처리해서 저장하고, 장애가 난경우 처리하고 한다고 생각하면 됩니다.
  • 데이터를 이동시키는데 핵심적인 역할을 하는 저장소 입니다.

주키퍼 클러스터

카프카 클러스터를 관리하는 역할

  • 카프카 클러스터에 대한 정보를 가지고 있고 관리합니다.

프로듀서

메시지(이벤트) 를 카프카로 전송하는 역할을 합니다.

컨슈머

메시지(이벤트) 를 카프카로부터 읽는 역할을 합니다.


토픽과 파티션

image

토픽

메시지를 구분하는 단위 입니다.

  • 브로커안에 토픽이 담기게 됩니다.
  • 토픽은 파일시스템에서의 폴더라고 생각하면 됩니다.
  • 토픽안에서는 N개의 파티션을 지니고 있습니다.
  • 파티션 은 메시지를 저장하는 물리적인 파일 을 뜻합니다.
  • 프로듀서와 컨슈머 입장에서는 토픽을 기준으로 메시지를 주고받게 되어집니다.

예시

  • 음식 토픽 (음식이라는 디렉토리, 얘 자체는 데이터를 가지고 있지 않음.)
    • 음식 파티션1 (음식에 대한 실제 메시지 정보)
      • 컨슈머1 는 여기서 메시지 정보를 얻어감.
    • 음식 파티션2 (음식에 대한 실제 메시지 정보)
      • 컨슈머2 는 여기서 메시지 정보를 얻어감.

파티션, 오프셋

파티션은 추가만 가능한 파일 (설정으로 인해서 삭제가 되게 할 수 도 있음, ex 일정 시간이 자닌뒤에 삭제를 하게됩니다.)

image

  • 오프셋: 메시지가 저장된 순서를 뜻함.
    • 3번 오프셋부터 데이터를 주세요 하면, 4번~6번의 데이터를 순서대로 읽어가게 됨.
    • 파일 상태이고 추가만 가능한 상태이기 때문에, 한개의 컨슈머에서 읽는 행위가, 다른 컨슈머에서 읽기 결과에 영향을 미치지 않음.

같은 토픽을 지닌 파티션이 N개인 경우

프로듀서와 파티션과의 관계

둘다 동일하게 topic이 1인 경우

image

  • 기본설정으로는 라운드 로빈으로 번갈아면서 저장하게되어 집니다. (각각 저장되므로 순서보장 안됨)
  • key 가 있다면, 해당 특정 파티션으로만 저장하게 되어집니다. (key 가 같다면 순서보장 됨)

파티션과 컨슈머와의 관계

image

  • 컨슈머는 그룹이라는곳에 항상 속해 있습니다.
  • 그룹내에서 컨슈머는 같은 파티션에 연결할 수 없습니다.
    • 컨슈머 A-1이 파티션 0번을 바라보고 있을때, 컨슈머 A-2는 파티션1을 바라볼 수 없습니다. (컨슈머 A-1, A-2 는 같은 그룹(A)이기 때문에 같은 파티션을 바라볼수 없음)
    • 컨슈머 B-1 은 파티션 0을 바라볼 수 있음. (다른 그룹이기 때문)
    • 이로 인해서, 한개의 컨슈머 그룹(예시 A) 에서는 파티션의 메시지가 순서대로 처리가 되는것이 보장되어 집니다. (그룹내에서 여러개의 컨슈머가 한개의 파티션을 바라보는일이 없기 때문)
    • 헷갈리지 말점: 한개의 파티션에 붙는것이 다른 그룹의 컨슈머인 경우, 한개의 파티션에 N개가 붙을 수 있음.

성능

  • 파티션 파일에 대해서 OS 페이지캐시를 사용합니다.
    • 파티션에 대한 FILE IO 를 메모리단에서 처리하게 됩니다.
  • Zero Copy
    • 디스크 버퍼에서 네트워크 버퍼로 직접 데이터 복사하여 처리하게 됩니다.
  • 브로커가 하는일이 단순함
    • 컨슈머 추적을 위해서 크게 하는게 없습니다.
    • 메시지 필터, 메시지 재전송같은것은 브로커에서 하지않고, 프로듀서, 컨슈머가 직접 관리하기 떄문입니다.
    • 브로커는 단순히 컨슈머와 파티션간의 매핑만 관리해줍니다.
  • 배치처리
    • 프로듀서 -> 카프카 <- 컨슈머 관계에서 메시지를 여러개로 모아서 전송, 조회가 가능합니다.

확장성

image

  • 카프카 클러스터가 용량의 한계에 다한경우
    • 브로커를 추가하고, 파티션을 추가해서 확장하면 됩니다.
    • 다른 파티션인경우에는 한개의 컨슈머가 전부다 조회가 가능합니다.
  • 컨슈머가 느려진 경우
    • 컨슈머를 추가해주면 됩니다. 이때 같은 컨슈머 그룹에 추가된경우에는 같은 파티션을 볼 수 없으므로, 파티션도 추가해주는식으로 진행해야 합니다.

레플리카

image

  • 레플리카를 만들게 된다면, 똑같은 파티션 정보를 가진채 다른 브로커에 저장이 되어집니다.
    • 토픽을 생성할때 레플리카 설정값이 2라면, 2개의 브로커가 추가적으로 생성되어 집니다.
    • 추가된것은 팔로워이고, 본체는 리더라고 명칭합니다.
    • 기본적으로는 리더를 통해서 메시지를 계속 처리합니다.
    • 팔로워는 리더로부터 계속 데이터를 복제해갑니다.
  • 리더가 장애가난경우에는, 1개의 팔로워가 리더가 되어집니다.

토픽에 메시지를 전송하는 구조

image

  • property 를 통해서 설정하고, 카프카 프로듀서 객체를 만듭니다.
  • send 메서드 안에는 ProducerRecord 가 들어가는데 이것이 메시지가 됩니다.
    • key 를 넣어주면, 파티션을 정해서 전송되게 됩니다.

프로듀서의 기본 흐름

image

  • 버퍼를 바로 전송하는것이 아니라, 배치에 넣어서 한꺼번에 전송하게 됩니다.
  • sender 는 배치를 전송합니다.

Sender 의 기본 동작

image

  • sender 는 별도 스레드로 동작하게 되어 있습니다.
  • 즉 보내는 도중에, 메시지가 버퍼에 더 쌓일 수 있습니다. (서로 의존성없음)
  • sender 는 배치가 다 안차도 그냥 보내 버립니다.
    • 배치에 메시지가 1개던 N개던 메시지를 보낼수 있으면 그냥 보내 버리게됩니다.

처리량 관련 주요 속성

image

  • 배치사이즈가 적으면, 한번에 보내는게 적어져서 처리량이 떨어지게 됩니다.
  • linger.ms 를 통해서 배치를 전송한뒤 다음 배치를 보내기전까지의 대기시간을 줄 수 있습니다. (기본값 = 0)
  • 대기시간이 길수록 더 많은 데이터를 한번에 보낼 수 있습니다.

전송 실패 확인

기본 send 는 전송 결과를 확인하지 않습니다.

1
2
3
4
5
6
Future<RecordMetadata> f = producer.send(ProducerRecord<>("myTopic", "value"));
try {
  RecordMetadata meta = f.get() // 블로킹 처리가 됨
  } catch (Exception ex) {
    ...
  }
  • Future 를 사용하는경우, 응답이 올때까지 블로킹 처리하면서 메시지 성공이 되었는지를 매번 확인하게 됩니다.
  • 매번 확인하기 때문에, 배치에 메시지가 제대로 쌓이지 않게됩니다.
  • 처리량이 아주 떨어지만, 확실히 전송했음을 확인 할 수 있습니다.
1
2
3
4
5
6
7
producer.send(ProducerRecord<>("myTopic", "value"), Callback() {
    @Override
    fun on Completion(metadata: RecordMetadata, exception: Exception){
       // 콜백 처리가 된다.
    }
  }
)
  • 블로킹 처리가 아니기때문에, 대기 하다가 배치에 데이터가 쌓이지 않는다던가 하는 이슈는 없습니다.
  • exception 이 담겨있으면 전송에 실패한것을 의미합니다.

Ack 를 통한 전송 보장처리

image

  • ack 를 이용해서 전송보장을 하게됩니다.
  • ack = all 인경우
    • min.insync.replicas
    • 요청에 대해 동기화된 레플리카의 최소 개수를 확인합니다.
    • 값이 3일때
      • 리더1, 팔로워2 에 전송이 성공하면 pass
      • 리더1, 팔로워1에 전송이 성공하면 2 < 3 이기 떄문에 fail 처리가 됩니다.

에러가 발생하는 경우

전송 전,후 2가지로 분류할 수 있겠습니다.

  • 전송 과정에서 실패하는 경우
    • 타임아웃
    • 리더 다운되어서, 다른 리더를 뽑고 있음
    • 브로커 설정의 메시지 한도 초과
  • 전송 전에 실패하는 경우
    • 직렬화 실패, 요청 크기 제한 초과
    • 버퍼가 차서 대기한 시간이 초과

재시도

  • 재시도가 가능한 에러는(응답 타임아웃, 일적 리더 손실) 그냥 재시도 하게 합니다. (하지만 무한 재시도는 금지)
  • 프로듀서는 기본적으로 재시도를 합니다.(브로커 전송 과정에서 재시도 가능한 에러인경우)
  • send() 에서 발생한경우 직접 프로그래머가 재시도 처리를 하게 할 수 있습니다.
  • 재시도를 바로 당시에 처리 하지않고 기록하는 방법도 있습니다..
    • 해당 방법은 DB 에 실패 이력을 적어놓았다가 재전송하거나 하는 방법으로 구현될 수 있겠습니다.

image

  • 재시도시 중복처리될 수도 있음.
  • 중복처리 확률을 줄일 수 있는 옵션이 있음.

image

  • max.in.flight.requests.per.connection
  • 한 커넥션이 동시에 요청할수 있는 요청 개수를 뜻합니다.
  • 3개의 메시지를 보내는 상황일때 1,2,3 순서로 가다가, 1이 실패해서 재전송 하게 되는경우 2 3 1 로 순서가 뒤바뀌게 됩니다.

컨슈머

image

  • 설정시 retry 횟수를 설정할 수 있습니다.
  • groupId 를 지정하는것이 중요하고, subscribe 에서 토픽을 구독합니다.
  • poll 은 브로커로부터 레코드를 읽어갑니다.

image

  • 컨슈머 개수 > 파티션 개수 가 있도록 하지말아야합니다.
  • 컨슈머 측에서 접근할 수 있는 파티션이 없어서, 남는 파티션은 놀아버리게 됩니다.

image

  • 마지막 읽어온 레코드에 커밋을 하는것이다.
  • poll(), close() 사용시에는 자동 커밋이 실행된다.

자동 커밋 / 수동커밋

  • enable.auto.commit 설정
    • true: 일정주기로 (auto.commit.internal.ms 기본값 5000ms) 으로 컨슈머가 읽은 오프셋을 커밋합니다. (기본값)
    • false: 수동으로 커밋을 실행합니다.

컨슈머가 메시지를 읽고가면, 5초뒤에 커밋을하게된다. 만약 처리에 6초가 걸렸고, 뒤늦게 메시지를 다시 읽어야한다면 이미 커밋이 되어버려서 앞메시지를 읽지못한다. 또한 처리에 1초가 걸려서, 다시 메시지를 보게되면 커밋전이라 중복된 메시지를 읽게된다. (추가 조사 필요)

image

  • 보통 None 은 사용하지 않습니다.

조회에 영향을 주는 주요 설정

  • fetch.min.bytes: 조회시 브로커가 전송할 최소 데이터의 크기 입니다.
    • 기본값은 1
    • 값이 클수록 대기시간이 늘어나지만, 많은양을 한번에 보내게 되어 성능에 도움이 될 수 있습니다.
  • fetch.max.wait.ms
    • 데이터가 최소 크기가 될떄까지 기다리는 시간입니ㅏㄷ.
    • 기본값은 500ms
    • 최소크기가 안된다고 무한정 기다릴수가 없으니, 일정시간이 초과되면 최소크기가 안되도 전송합니다.
    • 브로커가 리턴을 할때까지 기다리는것이기때문에, consumer.poll(Duratoin.ofMillis(N)) 의 대기시간과 다릅니다.
      • fetch.max.wait.ms: 500ms 이고, consumer.poll(Duratoin.ofMillis(700)) 의 경우
      • 0.7 초 동안 대기하다가 컨슈머로부터 읽으러감. -> 최소 크기가 안잡혀있다면 0.5초간 대기했다가 읽어오게 됩니다.
  • max.partition-fetch.bytes
    • 파티션당 서버가 리턴할 수 있는 최대 크기입니다.
    • 기본값은 1MB
    • 최대크기가 차면 바로 전송해버립니다.

재처리와 순서를 고려해야한다.

동일 메시지의 조회 가능성은 항상 존재합니다.

  • 일시적인 커밋실패, 파티션 추가, 제거등으로 인한 리밸런싱 과정에서 발생할 가능성이 존재합니다.
  • 동일한 메시지를 받아서 여러번 처리하게 된다면, 멱등성이 지켜지지 않아 결과값이 영향을 줄 수 있습니다.
  • 데이터 특성에 따라 타임스탬프나 유니크한 일련번호를 통해서 컨슈머측에서 중복 처리하지 않도록 코드를 작성해야 합니다.

세션, 하트비트, poll 관리

  • 컨슈머는 하트비트를 브로커에 전송하면서 연결을 유지 합니다.
    • heartbeat.interval.ms: 하트비트 전송주기 (기본값 3초)
  • 브로커입장에서 하트비트가 일정시간 이상 컨슈머로부터 안온다면, 컨슈머를 그룹에서 빼고 리밸런싱을 진행합니다.
    • session.timeout.ms: 세션 타임아웃 시간 (기본값 10초)

공식문서에 따르면 둘의 비율은 1/3 이하로 잡는것이 좋다고 합니다.

  • 컨슈머쪽에서 poll() 을 일정 시간이상 호출하지 않으면 그룹에서 빼버리고, 리밸런싱을 진행합니다.
  • max.poll.interval.ms (메서드의 최대 호출간격)

종료처리

image

  • 컨슈머 작업이 다끝나면 close 처리가 필요하합니다.
  • 보통 일반적으로 while(true) 해서 poll() 을 계속 가져오는 무한루프를 돌게 설정해둡니다.
  • wakeup() 메서드가 트리거가 되어서 종료를 처리하게 됩니다.
  • 다른 스레드에서 wakeup() 을 호출시키게 되고, 이때 poll() 에서 Exception 이 발생되어 close() 가 진행되게 로직을 짜는것이 일반적입니다.
  • 주의: 카프카 컨슈머는 스레드 세이프하지 않습니다.
    • wakeUp 을 처리하는 쪽만 스레드 세이프하게 되어있습니디
    • 여러개의 스레드에서 카프카 컨슈머 객체를 절대 동시에 사용해서는 안됩니다.

Reference

  • https://youtu.be/0Ssx7jJJADI