Comsumer
- 토픽에 저장된 메시지를 가져와서 소비하는 소비자
- 파티션 리더에게 메시지를 가져오기 요청을 한다.
- 컨슈머는 offset을 명시하고 그 위치로부터 메시지를 수신한다.
- offset을 재조정하여 과거 데이터를 가져올 수 있으며, 이 때문에 받았던 데이터를 재 수신할 수 있다.
Old Consumer vs New Consumer
- offset에 대한 zookeeper 사용 유무
- 기존에는 offset을 zookeeper의 znode에 저장했다면, kafka 0.9 버전 부터 토픽에 저장한다. 이 때문에 new consumer는 카프카의 토픽으로부터 offset을 가져온다.
의존성 추가
dependencies {
compile('org.apache.kafka:kafka-clients:2.3.0')
}
Producer properties 설정 및 consumer 코드
private final static String TOPIC = "my-example-topic";
private final static String BOOTSTRAP_SERVERS = "localhost:9092,localhost:9093,localhost:9094";
static Properties props = new Properties();
static {
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "KafkaExampleConsumer");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
}
private static Consumer<Long, String> createConsumer() {
// Create the consumer using props.
final Consumer<Long, String> consumer = new KafkaConsumer<Long, String>(props);
// Subscribe to the topic.
consumer.subscribe(Collections.singletonList(TOPIC));
return consumer;
}
public static void runConsumer() {
final Consumer<Long, String> consumer = createConsumer();
final int giveUp = 100; int noRecordsCount = 0;
while (true) {
final ConsumerRecords<Long, String> consumerRecords =
consumer.poll(Duration.ofMillis(1000l));
if (consumerRecords.count()==0) {
noRecordsCount++;
if (noRecordsCount > giveUp) break;
else continue;
}
consumerRecords.forEach(record -> {
System.err.printf("Consumer Record:(%d, %s, %d, %d)\n",
record.key(), record.value(),
record.partition(), record.offset());
});
consumer.commitSync();
}
consumer.close();
System.err.println("DONE");
}
consumer group
- 카프카의 장점은 하나의 토픽에 여러 consumer가 동시에 접속해 메시지를 가져올 수 있다.
- 컨슈머 그룹의 각각의 컨슈머는 토픽 데이터를 적절히 나누어 처리가 가능하다.
- 컨슈머 그룹 내 컨슈머는 토픽의 파티션에 대해 소유권을 공유하며 하나의 토픽에 대해 책임을 갖는다.
- 만일 p0 partiotion의 소유권을 갖는 consumer가 down 되거나, 메시지가 너무 많아 partition을 소비할 수 없는 상황이 생긴다면 소유권을 추가된 consumer에게 넘겨줘야 하는데 이를 리밸런스라고 한다.
- 단점은 컨슈머가 리밸런스를 수행할 때, 메시지를 일시적으로 가져오지 않는다.
https://coding-start.tistory.com/137 [코딩스타트]
https://kafka.apache.org/documentation.html#consumerapi
http://cloudurable.com/blog/kafka-tutorial-kafka-consumer/index.html
'Cloud & NoSQL & Middleware > Kafka' 카테고리의 다른 글
Kafka - Producer API (0) | 2019.11.03 |
---|---|
Kafka Overview (0) | 2019.11.02 |
Zookeeper api 테스트 (0) | 2019.11.02 |
Zookeeper (0) | 2019.11.02 |