Big Data/Kafka

[Kafka] Producer: kafka broker로 메시지 보내기(sync, async)

ooeunz 2020. 7. 23. 15:13
반응형

Producer

카프카에서 메시지를 생산해서 카프카 토픽으로 보내는 역할을 하는 애플리케이션, 서버 등을 모두 Producer라고 부릅니다. 프로듀서의 주요 기능은 각각의 메시지를 토픽 파티션에 매핑하고 파티션의 리더에 요청을 보내는 것입니다. 만약 전송하는 메시지에 키 값을 지정하게 된다면 원하는 파티션으로 전송되게 됩니다. 하지만 키 값을 지정하지 않는다면, 파티션은 Round-Robin 방식으로 각 파티션에 메시지를 균등하게 분배합니다.

 

Kafka / Zookeeper Install

이번 포스팅에서는 특별히 docker-compose를 이용해 간단하게 카프카 브로커를 띄운 다음 프로듀서에서 브로커로 메시지를 전송해보도록 하겠습니다. 꼭 docker를 이용해서 kafka와 zookeeper를 설치할 필요는 없지만 카프카 설치의 편의를 위해 아래 docker-compose 템플릿을 이용해 카프카를 컨테이너로 띄운 다음 실습을 진행해보도록 하겠습니다.

version: '2'

networks:
  test:

services:
  zookeeper:
    image: wurstmeister/zookeeper:3.4.6
    container_name: zookeeper
    ports:
      - "2181:2181"
    networks:
      - test

  kafka:
    image: wurstmeister/kafka:2.12-2.0.1
    container_name: kafka
    environment:
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
      KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
      KAFKA_ADVERTISED_PORT: 9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_CREATE_TOPICS: "kafka-my-topic:1:1"   # Topic명:Partition개수:Replica개수
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
    ports:
      - "9092:9092"
    depends_on:
      - zookeeper
    networks:
      - test

 

 

Producer 사용해보기

이번엔 카프카로 메시지를 전송하기 위한 프로듀서 프로젝트를 생성해보도록 하겠습니다. 어떤 언어로 프로듀서를 구현하더라도 무관하지만 이번 포스팅해서는 Java 랭귀지를 사용하도록 하겠습니다. 카프카는 스칼라를 기반으로 개발되었습니다. 그리고 메인 클라이언트 라이브러리는 자바로 만들어진 애플리케이션으로 가장 많은 기능을 제공하고 있습니다. Java Project를 생성했다면 maven 또는 gradle을 이용하여 클라이언트 라이브러리를 추가하도록 하겠습니다.

 

카프카는 모든 버전의 클라이언트 라이브러리와 호환되지 않기 때문에 브로커와 클라이언트 라이브러리가 호환되는 버전인지 확인 후 진행하시는 것을 추천드립니다.

dependencies {
    implementation 'org.springframework.kafka:spring-kafka'
}

 

dependecy를 추가했다면 이제 직접 프로듀서 코드를 구현해보도록 하겠습니다. 의외로 개념에 비해서 카프카 프로듀서의 코드 자체는 간단한 편입니다.

 

 

전송 결과를 확인하지 않고 전송

지금 작성하는 코드는 프로듀서가 서버로 메시지를 보내고 난 후 성공적으로 도착했는지까지는 확인하지 않는 프로듀서의 코드입니다. 카프카가 살아있다면 프로듀서는 메시지 전송에 실패하더라도 자동으로 재전송하기 때문에 대부분의 경우 성공적으로 전송되지만 앞선 포스팅에서 다뤘듯 일부 메시지를 손실될 수도 있습니다.

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class ProducerApplication {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "127.0.0.1:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<String, String>(props);
        try {
            producer.send(new ProducerRecord<String, String>("kafka-my-topic", "Apache Kafka Producer Test"));
        } catch (Exception exception) {
            exception.printStackTrace();
        } finally {
            producer.close();
        }
    }
}

 

  1. 먼저 Properties 오브젝트를 생성합니다. (properties 객체는 자바의 Hashtable을 상속받은 객체로 key-value로 저장되는 객체입니다.) 이제 Properties 객체 props에 프로듀서와 관련된 설정들을 지정할 것입니다.
  2. props안에 브로커의 리스트를 정의합니다. 현재는 브로커의 리스트가 하나뿐이라 "127.0.0.1:9092" 형태로 값을 넣었지만, 여러 개일 경우에는 "127.0.0.1:9092,127.0.0.2:9092,127.0.0.3:9092"과 같은 형태로 입력해주면 됩니다.
    (해당 옵션에서 가급적 모든 카프카 클러스터를 추가하는 것이 좋습니다.)
  3. 카프카로 보내는 메시지의 키와 값을 문자열을 사용할 것이므로 StringSerializer를 지정합니다.
  4. 이제 지정한 옵션들을 매개변수로 받는 실제 Producer 객체를 생성합니다.
  5. ProducuerRecord 객체를 생성하고 producer의 send() 메서드를 이용해서 kafka-my-topic이라는 이름의 토픽으로 메시지를 전송합니다.
  6. producer를 close합니다.

※ 파티션 지정해서 메시지 보내기

만약 라운드 로빈 방식으로 파티션에 메시지를 보내는 것이 아니라, 원하는 파티션을 지정해서 메시지를 보낼 때는, producer.send() 메서드에서 두 번째 인자로 key 값을 넣어주면 됩니다.

 

Sync 전송

사실 프로듀서는 메시지를 보내고 send() 메서드에서 Java Future 객체로 RecordMetadata를 리턴 받게 됩니다. 이번에는 .get() 메서드를 이용해서 Future를 기다린 후 send() 메서드가 성공적으로 수행되었는지 확인해보도록 하겠습니다. 이와 같은 방법으로 메시지가 브로커에게 메시지를 성공적으로 전달했는지 확인할 수 있기 때문에 더욱 신뢰성 있는 메시지 전송을 할 수 있다는 장점이 있습니다.

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Properties;

public class ProducerApplication {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "127.0.0.1:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<String, String>(props);
        try {
            RecordMetadata metadata = producer.send(new ProducerRecord<String, String>("kafka-my-topic", "Apache Kafka Producer Test")).get();
            System.out.printf("Partition: %d, Offset %d", metadata.partition(), metadata.offset());
        } catch (Exception exception) {
            exception.printStackTrace();
        } finally {
            producer.close();
        }
    }
}

 

 

Async 전송

앞의 동기 방식으로 메시지를 전송하게 될 경우, 만약 프로듀서가 보낸 모든 메시지에 대해 응답을 기다리며 스레드들이 정지한다면 효율성이 떨어지고 시간이 오래 걸리게 됩니다. 하지만 비동기적으로 전송하게 된다면 응답을 기다리지 않고 바로 다음 일을 수행할 수 있기 때문에 더욱 빠른 전송을 할 수 있습니다. 아래의 코드에서 프로듀서는 send() 메서드를 콜백과 같이 호출하고 카프카 브로커에서 응답을 받으면 콜백 합니다.

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;

public class ProducerCallback implements Callback {

    public void onCompletion(RecordMetadata metadata, Exception exception) {
        if (metadata != null) {
            System.out.printf("Partition: %d, Offset: %d", metadata.partition(), metadata.offset());
        } else {
            exception.printStackTrace();
        }
    }
}
import com.kafka.producer.async.ProducerCallback;
import org.apache.kafka.clients.producer.*;

import java.util.Properties;

public class ProducerApplication {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "127.0.0.1:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<String, String>(props);
        try {
            producer.send(new ProducerRecord<String, String>("kafka-my-topic", "Apache Kafka Producer Test"), new ProducerCallback());
        } catch (Exception exception) {
            exception.printStackTrace();
        } finally {
            producer.close();
        }
    }
}

이전에 동기 방식에서 .get 메서드로 Future객체를 받을 때까지 기다리지 않고 콜백 형태로 ProducerCallback 객체를 넣어줌으로써 비동기적으로 실행할 수 있습니다. 위의 예시에서는 카프카 오류 시에 exception을 출력하는 형태의 예시 코드를 작성했지만, 실제 운영 상황에서는 추가적인 예외처리를 필요로 하게 됩니다.

 

 

Producer 옵션

지금까지의 예제들에서 카프카 토픽으로 메시지를 보낼 때 필요한 몇 가지 옵션을 간단하게 살펴보았습니다. 그밖에도 프로듀서 동작과 관련된 다양한 옵션들이 있는데, 그러한 옵션들에 대해서 살펴보도록 하겠습니다.

 

bootstrap.servers

카프카 클러스터는 마스터라는 개념이 없기 때문에 모든 클러스터 서버가 클라이언트의 요청을 받을 수 있습니다. 모든 카프카 호스트를 지정하지 않는다고 동작하지 않는 것은 아니지만, 카프카 클러스터는 살아있지만, 입력한 호스트가 다운될 경우에는 프로듀서가 살아 있음에도 다른 카프카 호스트를 찾지 못해서 메시지를 전송하지 못하는 경우가 생깁니다. 하지만 모든 호스트를 입력해 두었을 경우에는 주어진 리스트의 서버 중 하나가 장애가 발생하더라도 프로듀서가 자동으로 다른 서버에 재접속을 시도하기 때문에 장애에 내성이 생기게 됩니다.

 

acks

프로듀서가 카프카 토픽의 리더에게 메시지를 보낸 후 요청을 완료하기 전 ack(승인)의 수에 관한 옵션입니다. 해당 옵션의 정수가 낮으면 성능이 좋지만, 메시지 손실 가능성이 있고 수가 높을수록 성능이 높지만 메시지 손실률이 줄어듭니다.

  • ack=0
    프로듀서는 카프카로부터 어떠한 ack도 기다리지 않습니다. 즉 프로듀서에서 전송한 메시지가 실패하더라도 결과를 알지 못하기 때문에 재요청 설정도 적용되지 않습니다. 하지만 카프카로부터 ack에 대한 응답을 기다리지 않기 때문에 매우 빠르게 메시지를 보낼 수 있어 높은 처리량으로 기능을 수행할 수 있습니다.
  • ack=1
    카프카 리더에 한해서 데이터를 정상적으로 받았다는 응답을 받습니다. 하지만 모든 팔로워에까지 메시지가 정상적으로 도착했는지에 관해서는 응답받지 않습니다.
  • ack=all / ack=-1
    all 또는 -1로 설정하는 경우 모든 팔로워로부터 데이터에 대한 ack를 기다리기 때문에 하나 이상의 팔로워가 존재하는 한 데이터는 손실되지 않습니다. 때문에 데이터 무손실에 대해 가장 강력하게 보장하지만, 동시에 모든 팔로워로부터 ack 응답을 기다려야 하므로 성능이 떨어집니다.

 

buffer.memory

프로듀서가 카프카 서버로 데이터를 보내기 위해 잠시 대기할 수 있는 전체 메모리 바이트입니다. 배치 전송과 같은 딜레이가 발생할 때 사용할 수 있습니다.

 

compression.type

프로듀서가 데이터를 압축해서 보낼 수 있는데, 어떤 타입으로 압출할지를 정할 수 있습니다. 옵션으로 none, gzip, snappy, lz4와 같은 다양한 포맷을 선택할 수 있습니다.

 

batch.size

프로듀서는 같은 같은 파티션으로 보내는 여러 데이터를 일정 용량만큼 모아서 배치로 보내게 됩니다. 이때 해당 설정으로 배치 크기를 바이트 단위로 조정할 수 있습니다. 정의된 배치 크기보다 큰 데이터는 배치를 시도하지 않게 됩니다. 또한 배치를 보내기 전에 클라이언트에서 장애가 발생하게 되면 배치 내에 있던 메시지는 전달되지 않게 됩니다. 따라서 만약 고가용성이 필요한 메시지라면 배치 기능을 사용하지 않는 것도 하나의 방법입니다.

 

linger.ms

아직 배치 사이즈가 덜 채워졌을 때 추가적인 메시지들을 기다리는 시간을 조정하게 됩니다. 프로듀서는 지정된 배치 사이즈에 도달하면 linger.ms 옵션과 관계없이 즉시 메시지를 전송하지만, 만약 배치 사이즈에 아직 도달하지 못한 상황이라면 해당 설정의 제한 시간에 도달했을 때 메시지들을 전송하게 됩니다. default값은 0이며(지연 없음), 0보다 큰 값을 설정하면 지연 시간은 조금 발생하지만 처리량이 올라갑니다.

 

max.request.size

프로듀서가 보낼 수 있는 최대 메시지 바이트 사이즈입니다. default 값은 1MB입니다.

반응형