Producer

 

- 카프카 클러스터에 토픽으로 메시지를 저장하게끔 전달하는 생산자(애플리케이션, 서버)

- 데이터 레코드를 파티션에 매핑하고 파티션 리더(broker)에 요청을 보냄

- 요청 받은 메시지는 라운드 로빈 방식으로 각 파티션에 분배

 

 

특징

 

bootstrap url : producer는 cluster에 대한 metadata를 가져오기 위해 최소 하나 이상의 broker에 연결한다. failover를 위해 1개 이상의 url 리스트를 설정한다.

 

serializer: 카프카는 tcp 기반의 byte array 포맷을 가진 데이터를 송수신한다. 데이터를 write할 때 producer는 미리 설정한 직렬화 클래스를 사용하여 byte array로 직렬화 한 후 전송한다. 

 

matching partition : 파티션을 매칭하여 데이터를 전송하는 것은 producer의 책임이다. 만일 파티션을 명시하지 않으면 key의 해쉬 값을 이용하여 파티션을 결정하고 데이터를 보낸다. 만일 키값도 존재하지 않는다면 라운드 로빈 방식으로 저장한다.

 

retry : 처리 실패 응답이나 재시도를 결정할 수 있다.

 

batch: 효율적인 메시지 전송을 위해 batch를 사용할 수 있다.

 

 

 

 

의존성 추가

dependencies {
	compile('org.apache.kafka:kafka-clients:2.3.0')
   }

 

 

 

Producer properties 설정

private final static String TOPIC = "my-example-topic";
private final static String BOOTSTRAP_SERVERS = "localhost:9092";


private static Producer<Long, String> createProducer() {
        Properties props = new Properties();
        //카프카 서버의 부트스트랩 주소를 설정한다.
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaExampleProducer");

       
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.RETRIES_CONFIG,  1);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 20000);
//        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
//        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 24568545);

                return new KafkaProducer<>(props);
    }

 

Kafka의 Producer의 Config는 java.utils.Properties를 통해 바인딩되며 옵션은 아래 링크에서 확인 가능하다.

https://kafka.apache.org/documentation/#producerconfigs

 

 

 

Producer Sync & Async Send Message

 

ublic static void runProducerSync(final int sendMessageCount) throws Exception {

        final Producer<Long, String> producer = createProducer();
        long time = System.currentTimeMillis();

        try {
            for (long index = time; index < time + sendMessageCount; index++) {
                final ProducerRecord<Long, String> record =
                        new ProducerRecord<>(TOPIC, index,
                                "Hello Mom " + index);

                // record 기록 시 key value, timestamp 기록
                RecordMetadata metadata = producer.send(record).get();

                long elapsedTime = System.currentTimeMillis() - time;
                System.out.printf("sent record(key=%s value=%s) " +
                                "meta(partition=%d, offset=%d) time=%d\n",
                        record.key(), record.value(), metadata.partition(),
                        metadata.offset(), elapsedTime);

            }
        } finally {
            producer.flush();
            producer.close();
        }
    }

    public static void runProducerAsync(final int sendMessageCount) throws InterruptedException {
        final Producer<Long, String> producer = createProducer();
        long time = System.currentTimeMillis();
        final CountDownLatch countDownLatch = new CountDownLatch(sendMessageCount);

        try {
            for (long index = time; index < time + sendMessageCount; index++) {
                final ProducerRecord<Long, String> record =
                        new ProducerRecord<>(TOPIC, index, "Hello Mom " + index);

                producer.send(record, (metadata, exception) -> {
                    long elapsedTime = System.currentTimeMillis() - time;
                    if (metadata != null) {
                        System.out.printf("sent record(key=%s value=%s) " +
                                        "meta(partition=%d, offset=%d) time=%d\n",
                                record.key(), record.value(), metadata.partition(),
                                metadata.offset(), elapsedTime);
                    } else {
                        exception.printStackTrace();
                    }
                });
            }
            countDownLatch.await(25, TimeUnit.SECONDS);
        } finally {
            producer.flush();
            producer.close();
        }
    }

 

 

 

카프카 프로듀서를 사용할 때 유의해야 하는 사항

  • 잘못된 파티셔닝 방식의 사용방지 : 파티션은 카프카에서 병렬 처리의 단위다. 메시지가 모든 토픽 파티션으로 균일하게 분배되도록 올바른 파티셔닝 전략을 선택해야한다. 만약 잘못된 파티셔닝 방식을 도입하게 되면 특정 파티션에만 요청이 집중되는 최악의 상황이 발생할 수 있으니 메시지가 모든 가용한 파티션에 분배 되도록 올바른 파티셔닝 방식을 정의하자.
  • 기존 토픽에 새로운 파티션 추가 방지 : 메시지를 분산시키기 위해서 키 값을 사용하여 파티션을 선택하는 토픽에는 파티션을 추가하는 것을 피해야한다. 새로운 파티션 추가는 각 키에 대해 산출된 해시 코드를 변경시키며, 이는 파티션의 수도 산출에 필요한 입력 중 하나로 사용하기 때문이다.

 

 

 

 

https://kafka.apache.org/

http://cloudurable.com/blog/kafka-tutorial-kafka-producer/index.html

https://coding-start.tistory.com/193?category=790331

'Cloud & NoSQL & Middleware > Kafka' 카테고리의 다른 글

Kafka - Consumer API  (0) 2019.11.03
Kafka Overview  (0) 2019.11.02
Zookeeper api 테스트  (0) 2019.11.02
Zookeeper  (0) 2019.11.02

+ Recent posts