Big Data/Kafka

[Kafka] Consumer: kafka broker로부터 메시지 가져오기

ooeunz 2020. 7. 27. 16:45
반응형

이번 포스팅에서는 kafka consumer(소비)에 대해서 알아보겠습니다. kafka producer가 메시지를 생산하고 토픽으로 전송하는 역할을 한다면 consumer는 메시지를 가져와서 소비하는 역할을 하는 애플리케이션 또는 서버를 지칭합니다. 컨슈머의 주요 기능은 특정 파티션을 관리하고 있는 파티션 리더에게 메시지를 가져오기 요청을 하는 것입니다. 각 요청은 로그의 오프셋을 명시하고 그 위치로부터 로그 메시지를 수신합니다. 때문에 컨슈머는 가져올 메시지의 위치를 조정할 수 있고, 또 이미 가져왔던 메시지를 다시 가져오는 것 역시 가능합니다.

 

Consumer option

카프카 컨슈머에는 Old Consumer와 New Consumer 두 가지가 있습니다. 두 컨슈머의 가장 큰 차이는 주키퍼의 사용 유무입니다. Old Consumer는 컨슈머의 offset을 주키퍼의 지노드에 저장하는 방식을 지원하다가 0.9 version 이후로는 offset을 주키퍼가 아닌 카프카 토픽에 저장하는 방식으로 변경되었습니다.

 

※ 여기서 offset이란, 카프카 파티션 내에서 메시지를 식별하는 단위입니다.

 

아직까진 Old Consumer(주키퍼에 offset을 저장하는 컨슈머)의 지원이 계속되고 있지만, 이후의 릴리즈 버전에서는 해당 기능이 사라질 예정이므로 새로운 카프카 프로젝트를 시작하거나 인프라를 구축할 때에는 New Consumer를 기준으로 하는 것이 좋습니다.

bootstrap.servers

카프카 클러스터에 연결 하기 위한 카프카 정보를 나타냅니다. 프로듀서 포스팅에서도 언급했듯이 하나의 호스트만을 적용하더라도 동작은 하지만, 클러스터는 살아있지만 해당 호스트가 다운될 시에는 접속이 불가능하기 때문에 호스트 리스트 전체[를 입력하는 방식을 권장합니다.


fetch.min.bytes

한번에 가져올 수 있는 최소 데이터 사이즈입니다. 만약 옵션에서 지정한 사이즈보다 모인 데이터가 적을 시에는 바로 요청에 대해 응답하지 않고 데이터가 누적될 때까지 기다리거나 응답을 기다리는 최대 시간까지 기다렸다가 응답하게 됩니다.


group.id

컨슈머가 속한 컨슈머 그룹을 식별하는 식별자입니다.


enable.auto.commit

백그라운드에서 주기적으로 offset을 커밋합니다.


auto.offset.reset

카프카에서 초기 offset이 없거나 현재 offset이 더 이상 존재하지 않는 경우(데이터가 삭제된 경우) 아래의 옵션으로 리셋합니다.

  • earliest : 가장 초기의 오프셋 값으로 설정합니다.
  • latest : 가장 마지막의 오프셋값으로 설정합니다.
  • none : 이전 오프셋값을 찾지 못하면 에러를 나타냅니다.

 

fetch.max.bytes

한 번에 가져올 수 있는 최대 데이터 사이즈


fetch.max.wait.ms

fetch.min.bytes에 설정된 데이터보다 적은 경우 요청에 응답을 기다리는 최대 시간입니다.


request.timeout.ms

요청에 대한 응답을 기다리는 최대 시간입니다. 만약 최소 데이터 사이즈가 차지 않았을 경우 해당 시간만큼 기다렸다가 데이터를 응답하게 됩니다.

 

session.timeout.ms

컨슈머와 브로커 사이의 세션 타임 아웃 시간을 지정하는 옵션입니다. 즉 다시 말해 브로커가 컨슈머가 살아있는 것으로 판단하는 시간입니다. (default 10초) 만약 컨슈머가 그룹 코디네이터(주피터)에게 하트 비트를 보내지 않고 session.timeout.ms가 지나면 브로커는 컨슈머가 종료되었거나 장애가 발생한 것으로 판단하고 컨슈머 그룹은 리벨런스를 시도하게 됩니다.

해당 옵션은 일반적으로 heartbeat.interval.ms와 함께 사용됩니다. session.timeout.ms를 기본값보다 낮게 설정하면 싶패를 빨리 감지할 수 있지만, 가비지 컬렉션이나 poll 루프를 완료하는 시간이 길어지면 원하지 않는 리밸런스가 일어나게 됩니다. 반면 session.timeout.ms를 높게 설정하면 원하지 않는 리밸런스가 일어날 가능성은 줄어들지만, 실제 오류를 감지하는 데 시간이 오래 걸릴 수 있습니다.


heartbeat.interval.ms

그룹 코디네이터에게 얼마나 자주 하트비트를 보낼지 지정합니다. 당연히 session.timeout.ms보다 낮아야하고, 일반적으로 session.timeout.ms의 3분의 1 정도로 설정합니다. (default 값은 3초입니다)


max.poll.records

단일 호출에 대한 최대 레코드 수를 조정합니다.


max.poll.interval.ms

앞에서 컨슈머가 살아있는지 체크하기 위해 하트비트를 주기적으로 보낸다고 했습니다. 그런데 컨슈머가 하트비트만 보내고 실제로 메시지를 가져가지 않는 경우가 있을 수 있습니다. 이러한 경우 해당 컨슈머가 파티션을 무한정 점유할 수 없도록 주기적으로 poll을 요청하지 않으면 장애라고 판단하고, 컨슈머를  제외한 후 다른 컨슈머가 해당 파티션에서 메시지를 가져갈 수 있도록 합니다.


auto.commit.interval.ms

주기적으로 offset을 커밋하는 시간입니다.

 

※ 리밸런스란 컨슈머의 소유권이 넘어가는 것을 뜻합니다.

 

 

Consumer 구현해보기

그럼 직접 Java를 이용해서 Consumer를 구현해보도록 하겠습니다. 구현 코드는 아래와 같습니다.

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;

public class ConsumerBasic {
    public void receiveFromKafka() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "127.0.0.1:9092");
        props.put("group.id", "basic-consumer");
        props.put("enable.auto.commit", "true");
        props.put("auto.offset.reset", "latest");
        props.put("key.deserializer", "org.apache.kafka.common.serialization");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
        consumer.subscribe(Arrays.asList("basic-consumer"));
        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(100);
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("Topic: %s, Partition: %s, Offset: %d, Key: %s, Value: %n\n",
                            record.topic(), record.partition(), record.offset(), record.key(), record.value());
                }
            }
        } finally {
            consumer.close();
        }
    }
}

코드를 살펴보면 Producer와 마찬가지로 카프카 리스트를 입력하고, 그 후로 gropd id와 auto commit과 offset reset과 관련된 옵션을 지정합니다. 앞선 포스팅 producer에서 메시지와 키 값에 문자열을 사용했기 때문에 내장된 StringDeserializer를 지정합니다.

 

subscribe() 메서드를 이용해서 메시지를 가져올 토픽을 구독합니다. (리스트의 형태로 여러개의 토픽을 입력할 수도 있습니다.) 그리고 무한 루프로 토픽에서 메시지를 지속적으로 poll()하게 됩니다. 이때 중요한 것이 카프카에 폴링 하는 것을 계속 유지해야 한다는 것입니다. 그렇지 않으면 컨슈머가 다운된 것으로 카프카가 판단하게 되어서 해당 컨슈머에 할당된 파티션은 다른 컨슈머에게 전달되어 소비됩니다.

 

poll()은 전체 레코드를 리턴합니다. 따라서 레코드 안에는 토픽, 파티션, 파티션의 offset, key, value와 같은 다양한 데이터를 포함하고 있습니다. 또한 한 번에 하나의 메시지만을 불러오는 것이 아니기 때문에 N개의 메시지를 처리하기 위해서 반복문을 사용하도록 합니다. 해당 예시에서는 단순히 System.out.printf()로 데이터를 출력하는 코드를 추가했지만, 실제 운영환경에서는 Hadoop이나 데이터베이스에 저장하거나 수신한 메시지를 분석하는 등 추가적인 로직을 작성하도록 합니다.

 

마지막으로 consumer가 종료하기 전에 close() 메서드를 이용해서 네트워크 연결과 소켓을 종료하도록 합니다. close() 메서드는 컨슈머가 하트 비트를 보내지 않아서 코디네이터가 해당 컨슈머의 종료를 감지하는 것보다 빠르게 진행되기 때문에 즉시 리벨런스가 발생합니다.

 

 

Partition Message 순서

컨슈머에 들어오는 메시지들의 순서는 파티션 단위로 지켜집니다. 예를 들어 아래의 이미지와 같이 파티션이 4개 있는 토픽이 있을 때 0, 1, 2, 3, 4, 5라는 메시지를 전송했다고 가정해보겠습니다. 각각의 파티션은 병렬로 메시지를 받기 때문에 0, 1, 2, 3이 offset1로 적재되었습니다. 그리고 4, 5가 offset2로 적재되었습니다. 카프카는 offset 기준으로만 메시지를 가져오기 때문에 offset이 1인 0, 1, 2, 3은 offset이 2인 4, 5에 비해서는 확실히 먼저 도착한 것이보장되지만 같은 offset을 가진 메시지끼리는 순서를 보장하지 않습니다. 따라서 아래의 이미지와 같이 0, 1, 2, 3, 4, 5로 전송하였지만 0, 2, 1, 3, 5, 4와 같은 순서로 consumer가 메시지를 받게 됩니다.

 

파티션끼리도 메시지의 순서를 보장하기 위해서는 파티션을 1개만 지정해서 사용해야합니다. 하지만 파티션 수를 1로 지정하면 모든 메시지의 순서를 보장할 수 있지만 파티션 수가 하나이기 때문에 분산해서 처리할 수 없고 하나의 컨슈머에서만 처리할 수 있기 때문에 처리량이 높지 않습니다.

 

Consumer Group

이번에는 컨슈머 그룹에 대해서 알아보도록 하겠습니다. 컨슈머 그룹은 카프카의 커플링을 낮추는 데에 큰 기여를 한 기능입니다. 컨슈머 그룹은 하나의 토픽에 여러 컨슈머 그룹이 동시에 접근해서 메시지를 순서를 유지하며 가져올 수 있습니다. 즉 하나의 데이터를 일관성 있게 다수의 그룹에서 불러옴으로써 다양한 형태로 데이터를 변형 및 사용할 수 있게 됩니다.

 

또한 컨슈머 그룹은 컨슈머를 확장시킬 수 있습니다. 만약 프로듀서가 토픽에 보내는 메시지 속도가 급증해서 컨슈머가 메시지를 가져가는 속도보다 빨라지면 어떻게 될까요? 컨슈머가 처리하지 못한 메시지들이 점점 쌓이게 될 것입니다. 처음에는 큰 문제가 발생하지 않을지 모르지만, 점점 들어오는 메시지와 받아가는 메시지의 차이의 간격이 벌어져 서비스의 문제가 생길 수 있습니다.

 

이런 상황일 때 컨슈머 그룹은 유용한 기능을 제공합니다. 기본적으로 컨슈머 그룹 안에서 컨슈머들은 메시지를 가져오고 있는 토픽의 파티션에 대해 서유권을 공유합니다. 따라서 컨슈머 그룹 내 컨슈머 수가 부족해서 프로듀서가 전송하는 메시지를 처리하지 못하는 경우에는 컨슈머의 수를 추가해서 위의 이미지와 같이 하나의 컨슈머가 처리하던 메시지들을 병렬적으로 처리할 수 있습니다.

 

아래의 이미지처럼 컨슈머 그룹 내에 컨슈머가 증가하게 되면 리밸런스 과정을 거치게 되는데, 즉 파티션에 대한 각각의 컨슈머의 소유권이 이동하는 것을 뜻합니다. 그런데 리밸런스를 하는 중에는 일시적으로 메시지를 가져올 수 없기 때문에 유의해야 합니다. 리밸런스가 일어나게 되면 토픽의 각 파티션마다 하나의 컨슈머가 연결됩니다. 그리고 리밸런스가 끝나게 되면 컨슈머들은 각자 담당하고 있는 파티션으로부터 메시지를 가져오게 됩니다.

 

컨슈머 그룹이라는 기능으로 인해 간단하게 컨슈머를 추가할 수 있습니다. 하지만 이미지처럼 4개의 컨슈머를 추가했음에도 프로듀서가 보내는 메시지보다 컨슈머가 가져가는 메시지의 양이 적다면 어떻게 해야 할까요? 단순히 컨슈머의 수를 늘리면 된다고 생각할 수 있지만, 토픽의 파티션에는 하나의 컨슈머만 연결할 수 있기 때문에 추가한 새로운 컨슈머는 대기상태에 놓이게 됩니다. 따라서 토픽의 파티션 수와 동일하게 컨슈머 수를 늘렸음에도 프로듀서가 보내는 메시지의 속도를 따라가지 못한다면 컨슈머만 추가하는 것이 아니라, 토픽의 파티션과 컨슈머를 같이 늘려줘야 합니다.

 

또한 반대로 잘 동작하고 있던 consumer4가 갑자기 다운될 경우를 생각해 보겠습니다. 이러한 경우에는 consumer4로 메시지를 보내던 partition3이 consumer3으로 리밸런스 되어서 partition2와 3이 모두 consumer3에게 할당되게 됩니다. 이런 경우 consumer 1, 2에 비해 처리량이 불균등해지기 때문에 지속적인 모니터링을 통해서 추가적으로 컨슈머를 할당해줄 필요가 있습니다.

 


처음 카프카를 소개할 때 카프카는 데이터의 중앙 집중화를 위한 시스템이라고 말씀드렸습니다. (리마인드를 위해 다시 이미지를 보도록 하겠습니다.) 카프카 이전의 시스템 흐름도는 우측의 카프카가 적용된 시스템 흐름도에 비해서 굉장히 커플링이 심하고 복잡합니다. 카프카는 이러한 복잡한 시스템 흐름도를 어떻게 개선한 걸까요?

 

(좌) 카프카 이전 / (우) 카프카 이후

예를 들어 A라는 서비스를 하는 팀이 로그 메시지를 topic-01 토픽으로 보내고 컨슈머 그룹01을 이용해서 이 메시지들을 처리하고 있었습니다. 그런데 얼마 후 B서비스팀에서 A 서비스팀의 로그 메시지들을 필요하게 되었습니다. 이전 같은 상황이었으면 A서비스 팀이 직접 로그 메시지를 전달해주었지만 (이러한 상황이 반복되어 복잡한 시스템 흐름도가 만들어짐) 카프카를 사용하고 있는 지금은 A팀이 사용하고 있는 카프카와 토픽 정보를 B팀에게 알려주어서 B팀이 새로운 컨슈머 그룹으로 접근하여 A팀이 카프카에서 가져가고 있는 메시지를 동일하게 가져갈 수 있습니다. 이렇게 여러 컨슈머 그룹들이 하나의 토픽에서 메시지를 가져갈 수 있는 이유는 컨슈머 그룹마다 각자의 오프셋을 별도로 관리하기 때문입니다. 그렇기 때문에 하나의 토픽에 두 개 이상의 컨슈머 그룹뿐만 아니라 그 이상의 컨슈머 그룹이 연결되어도 다른 컨슈머 그룹에 영향 없이 메시지를 가져갈 수 있게 됩니다.

 

반응형