Producer
- 카프카 클러스터에 토픽으로 메시지를 저장하게끔 전달하는 생산자(애플리케이션, 서버)
- 데이터 레코드를 파티션에 매핑하고 파티션 리더(broker)에 요청을 보냄
- 요청 받은 메시지는 라운드 로빈 방식으로 각 파티션에 분배
특징
bootstrap url : producer는 cluster에 대한 metadata를 가져오기 위해 최소 하나 이상의 broker에 연결한다. failover를 위해 1개 이상의 url 리스트를 설정한다.
serializer: 카프카는 tcp 기반의 byte array 포맷을 가진 데이터를 송수신한다. 데이터를 write할 때 producer는 미리 설정한 직렬화 클래스를 사용하여 byte array로 직렬화 한 후 전송한다.
matching partition : 파티션을 매칭하여 데이터를 전송하는 것은 producer의 책임이다. 만일 파티션을 명시하지 않으면 key의 해쉬 값을 이용하여 파티션을 결정하고 데이터를 보낸다. 만일 키값도 존재하지 않는다면 라운드 로빈 방식으로 저장한다.
retry : 처리 실패 응답이나 재시도를 결정할 수 있다.
batch: 효율적인 메시지 전송을 위해 batch를 사용할 수 있다.
의존성 추가
dependencies {
compile('org.apache.kafka:kafka-clients:2.3.0')
}
Producer properties 설정
private final static String TOPIC = "my-example-topic";
private final static String BOOTSTRAP_SERVERS = "localhost:9092";
private static Producer<Long, String> createProducer() {
Properties props = new Properties();
//카프카 서버의 부트스트랩 주소를 설정한다.
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaExampleProducer");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, 1);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 20000);
// props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
// props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 24568545);
return new KafkaProducer<>(props);
}
Kafka의 Producer의 Config는 java.utils.Properties를 통해 바인딩되며 옵션은 아래 링크에서 확인 가능하다.
https://kafka.apache.org/documentation/#producerconfigs
Producer Sync & Async Send Message
ublic static void runProducerSync(final int sendMessageCount) throws Exception {
final Producer<Long, String> producer = createProducer();
long time = System.currentTimeMillis();
try {
for (long index = time; index < time + sendMessageCount; index++) {
final ProducerRecord<Long, String> record =
new ProducerRecord<>(TOPIC, index,
"Hello Mom " + index);
// record 기록 시 key value, timestamp 기록
RecordMetadata metadata = producer.send(record).get();
long elapsedTime = System.currentTimeMillis() - time;
System.out.printf("sent record(key=%s value=%s) " +
"meta(partition=%d, offset=%d) time=%d\n",
record.key(), record.value(), metadata.partition(),
metadata.offset(), elapsedTime);
}
} finally {
producer.flush();
producer.close();
}
}
public static void runProducerAsync(final int sendMessageCount) throws InterruptedException {
final Producer<Long, String> producer = createProducer();
long time = System.currentTimeMillis();
final CountDownLatch countDownLatch = new CountDownLatch(sendMessageCount);
try {
for (long index = time; index < time + sendMessageCount; index++) {
final ProducerRecord<Long, String> record =
new ProducerRecord<>(TOPIC, index, "Hello Mom " + index);
producer.send(record, (metadata, exception) -> {
long elapsedTime = System.currentTimeMillis() - time;
if (metadata != null) {
System.out.printf("sent record(key=%s value=%s) " +
"meta(partition=%d, offset=%d) time=%d\n",
record.key(), record.value(), metadata.partition(),
metadata.offset(), elapsedTime);
} else {
exception.printStackTrace();
}
});
}
countDownLatch.await(25, TimeUnit.SECONDS);
} finally {
producer.flush();
producer.close();
}
}
카프카 프로듀서를 사용할 때 유의해야 하는 사항
- 잘못된 파티셔닝 방식의 사용방지 : 파티션은 카프카에서 병렬 처리의 단위다. 메시지가 모든 토픽 파티션으로 균일하게 분배되도록 올바른 파티셔닝 전략을 선택해야한다. 만약 잘못된 파티셔닝 방식을 도입하게 되면 특정 파티션에만 요청이 집중되는 최악의 상황이 발생할 수 있으니 메시지가 모든 가용한 파티션에 분배 되도록 올바른 파티셔닝 방식을 정의하자.
- 기존 토픽에 새로운 파티션 추가 방지 : 메시지를 분산시키기 위해서 키 값을 사용하여 파티션을 선택하는 토픽에는 파티션을 추가하는 것을 피해야한다. 새로운 파티션 추가는 각 키에 대해 산출된 해시 코드를 변경시키며, 이는 파티션의 수도 산출에 필요한 입력 중 하나로 사용하기 때문이다.
http://cloudurable.com/blog/kafka-tutorial-kafka-producer/index.html
'Cloud & NoSQL & Middleware > Kafka' 카테고리의 다른 글
Kafka - Consumer API (0) | 2019.11.03 |
---|---|
Kafka Overview (0) | 2019.11.02 |
Zookeeper api 테스트 (0) | 2019.11.02 |
Zookeeper (0) | 2019.11.02 |