취미가 좋다

kafka 카프카 쉬운 간단 정리 본문

Data Engineer/데이터 인프라

kafka 카프카 쉬운 간단 정리

benlee73 2023. 9. 21. 12:49

카프카

  • 실시간으로 대용량 스트리밍 데이터를 처리하는 메시지 큐 플랫폼
  • 풀어서 설명
    • 데이터를 주고받는 굉장히 많은 어플리케이션과 DB 들이 있다.
    • 이 많은 데이터 파이프라인을 관리하기란 어렵다.
    • 파이프라인의 중간 통로로 카프카를 사용해서 데이터를 카프카에 넣고 데이터를 카프카로부터 꺼내가도록 한다.
    • 높은 처리량, 확장성, 영속성, 고가용성의 장점을 통해 데이터 파이프라인을 안전하고 확장성 높게 운영할 수 있다.

브로커

  • 데이터를 저장하고 관리하는 서버
  • 주로 여러 서버를 묶어서 클러스터로 운영한다.

컨트롤러

  • 클러스터 내 브로커 중 한 대가 수행
  • 클러스터 내 브로커가 죽으면 그 브로커가 가진 리더 파티션을 재분배한다.

코디네이터

  • 클러스터 내 브로커 중 한 대가 수행
  • 컨슈머 그룹의 상태를 체크하고 파티션과 컨슈머를 매칭하는 리밸런싱을 수행한다.

토픽

  • 데이터를 구분하는 단위
    • RDB 의 테이블이라고 어렴풋이 생각하자

레코드

  • 토픽에 들어가고 나오는 데이터 하나하나의 단위는 레코드라고 한다.
타임스탬프 메시지 키 메시지 값 오프셋 헤더

 

파티션

  • 토픽의 레코드를 여러 브로커에 분포되어 저장되도록 한다.
  • 카프카 병렬처리의 핵심이다.
  • 적절한 파티션 개수
    • 프로듀서 전송 데이터양 < 컨슈머 데이터 처리량 x 파티션 개수

토픽 삭제 정책

  • 시간 or 용량을 기준으로 삭제
    • 시간 : retention.ms 설정
    • 용량 : retention.bytes 설정
  • 세그먼트 단위로 토픽을 삭제
    • 세그먼트 : 토픽 데이터를 저장하는 파일 시스템 단위
    • 세그먼트의 이름은 가진 레코드의 오프셋 중 가장 작은 값 (ex. 04.log)

토픽 압축 정책

  • zip, snappy 의 압축과는 다르다.
  • 같은 메시지 키를 가진 여러 레코드 중 최신 레코드만 남기고 삭제하는 정책

카프카 클러스터

  • 클러스터에 4대의 broker 가 있다.
  • 클러스터에 4개의 topic 이 있다.
  • 각 토픽에는 2개의 partition 이 있다. 토픽의 레코드는 이 partition 에 나뉘어 저장된다.
  • partition 은 broker 에 분포되어 있다.
  • 토픽의 replication 값에 따라 같은 레코드가 여러 partition 에 저장될 수도 있다.

프로듀서

  • 프로듀서는 내부적으로 partitioner, accumulator 가 있다.
  • send() 를 호출하면 partiitioner가 어느 파티션으로 레코드를 전송할지 결정한다.
  • 파티셔너로 구분된 레코드는 accumulator 에서 버퍼로 쌓이고 배치 단위로 브로커로 전송된다.

acks

  • 얼마나 신뢰성 높게 저장할지 결정하는 옵션
acks 설명
0 프로듀서는 데이터가 잘 저장됐는지 확인하지 않는다.
1 프로듀서는 데이터가 '리더 파티션'에 저장됐는지 확인한다. 팔로워 파티션에 저장됐는지는 알 수 없다.
-1 (all) 프로듀서는 데이터가 '모든 파티션'에 저장됐는지 확인한다. (여기서 모든 파티션은 ISR 로 묶인 파티션들)

컨슈머

  • poll() 메서드 호출 시점에 카프카 클러스터에서 데이터를 가져오는 게 아니다.
  • 컨슈머 어플리케이션이 실행되면 내부에서 fetcher 를 생성하고 미리 레코드를 내부 큐로 가져온다.
  • poll() 을 호출하면 내부 큐에서 레코드를 가져온다.
  • (이건 java 의 경우고 다른 언어의 라이브러리에서는 확인이 필요하다.)

리밸런싱

  • 컨슈머 그룹에 컨슈머가 추가되거나 제거될 때 각 컨슈머에 파티션을 재분배하는 동작

커밋

  • 컨슈머는 어떤 토픽을 어느 오프셋까지 읽었는지 기록한다.
  • 로컬에도 기록하겠지만 브로커에도 기록해야 한다.
  • 브로커에 어디까지 읽었다고 오프셋을 보내 기록하는 것을 커밋이라고 한다.
  • default 로 5초마다 커밋을 수행한다. (enable.auto.commit=true)
  • 만약 커밋하기 전에 리밸런싱이 수행되거나 컨슈머 강제종료가 일어난다면 데이터가 재처리되거나 유실될 수 있다.
  • auto.commit 대신 동기/비동기로 직접 커밋을 수행할 수 있다.

버로우

  • 컨슈머의 컨슈머 랙을 파악하는 방법은 여러 가지가 있지만 버로우를 사용해서 모니터링하는 것이 가장 좋다.
  • 하나 이상의 카프카 클러스터에 버로우를 붙이면 REST API 로 컨슈머그룹, 토픽 별 컨슈머 랙을 쉽게 확인할 수 있다.
  • 다만 데이터를 저장하는 기능은 없기 때문에 telegraf 로 저장소(ES, prometheus)에 저장하고 대시보드(그라파나, 키바나)로 보는 게 좋다.

기타

  • ISR (In-Sync-Replicas)
    • ISR 로 리더 파티션과 팔로워 파티션을 묶으면 데이터를 동기화해서 일관성이 유지된다.
    • 기본적으로 레코드마다 다른 파티션에 저장되지만 replication 을 사용할 경우 한 레코드가 여러 파티션에 저장된다.

 


카프카 스트림즈

  • 토픽의 데이터를 실시간으로 변환하여 다른 토픽에 적재하는 라이브러리
    • 스파크, 플링크로도 비슷한 동작 수행 가능
  • 스트림즈 어플리케이션에는 1개 이상의 스레드가 있고 스레드는 1개 이상의 태스크(task)가 있다.
    • 태스크는 사용하는 토픽의 파티션 개수만큼 생성된다.
    • 토픽의 파티션과 task 를 늘려서 처리량을 높일 수 있다.
  • 스트림즈에서는 토픽으로부터 가져온 데이터를 KStream, KTable, GlobalKTable 의 형태로 다르게 바라볼 수 있다.
  • 카프카 스트림즈는 스트림즈DSL 과 프로세서 API 2가지 방법으로 구현할 수 있다.
    • 스트림즈DSL 은 자주 사용될 기능들을 자체 API 로 만들어서 쉽게 개발할 수 있다.
    • 프로세서 API 는 스트림즈DSL 이 제공하지 않는 기능들이 필요할 때 사용한다.
KStream 토픽에 있는 모든 레코드가 조회된다.
KTable 토픽에 있는 모든 레코드가 조회되고 메시지 키를 기준으로 최신 메시지 값으로 덮어 씌워진다.
GlobalKTable KTable 과 동일하다. 다만 KTable 은 1개의 파티션이 1개의 task 에 할당되지만 이건 모든 파티션이 각 task에 할당된다. 

코파티셔닝

  • 조인을 하는 2개 데이터의 파티션 개수가 동일
  • 파티셔닝 전략이 동일
  • GlobalKTable 이 아닌 이상 코파티셔닝이 되어야 두 토픽을 조인할 수 있다.

조인 예제

// 토폴로지 정의

StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream1 = builder.stream(Topic1);
KStream<String, String> stream2 = builder.stream(Topic2);
KStream<String, String> stream3 = stream1.join(
    stream2,
    (val1, val2) -> val1 + " and " + val1
)
streams3.to(Topic3);

// config 정의

Properties configs = new Properties();
configs.put(APPLICATION_ID_CONFIG, APPLICATION_NAME);
...

// kafka streams 실행

KafkaStreams streams = new KafkaStreams(builder.build(), configs);
streams.start();
  • Topic1 과 Topic2 를 메시지 키 기준으로 조인하는 예제
  • 조인된 레코드의 값을 and 로 이어서 Topic3 에 적재한다.
  • KStream-KStream 뿐 아니라 KStream-KTable, KStream-GloabalKTable 조인도 가능하다.

카프카 커넥트

  • 커넥트는 카프카 클러스터와 연동되어 미리 정해놓은 동작을 수행하는 파이프라인들을 반복적으로 쉽게 생성할 수 있는 도구다.
  • 커넥트는 하나의 프로세스고 여러 프로세스로 운영되는 분산 모드 커넥트가 주로 사용된다.
  • 커넥트는 커넥터 생성, 조회, 수정, 삭제 등을 할 수 있는 REST API 를 제공한다.
  • 파이프라인에는 소스 커넥터와 싱크 커넥터가 있다.
    • 소스 커넥터 : 어플리케이션(DB) 으로부터 데이터를 가져와서 처리한 후 카프카 토픽에 저장
    • 싱크 커넥터 : 카프카 토픽으로부터 데이터를 가져와서 처리한 후 어플리케이션(DB)에 저장
  • 커넥터는 직접 구현할 수도 있고 공개된 오픈소스 커넥터를 가져와서 사용할 수도 있다.
  • jar 파일 형태로 제공되고 jar 파일이 있는 경로를 커넥트에게 알려주면 커넥트는 해당 커넥터 파이프라인을 생성할 수 있게 된다.

 

시나리오

  1. 카프카 커넥트(프로세스)를 실행한다.
  2. REST API 를 통해 커넥트에 파이프라인 생성을 요청한다.
  3. 커넥트가 커넥터를 생성해서 파이프라인을 만든다.
  4. 지정한 task 개수만큼 커넥트에 새로운 task(스레드) 를 띄운다.
  5. 그 쓰레드들은 여러 커넥트(프로세스)에 띄워지게 된다.
  6. 파이프라인 용도에 맞게 데이터를 ETL 한다.
Comments