Comsumer

 

- 토픽에 저장된 메시지를 가져와서 소비하는 소비자

- 파티션 리더에게 메시지를 가져오기 요청을 한다.

- 컨슈머는 offset을 명시하고 그 위치로부터 메시지를 수신한다.

- offset을 재조정하여 과거 데이터를 가져올 수 있으며, 이 때문에 받았던 데이터를 재 수신할 수 있다.

 

 

Old Consumer vs New Consumer

 

- offset에 대한 zookeeper 사용 유무

- 기존에는 offset을 zookeeper의 znode에 저장했다면, kafka 0.9 버전 부터 토픽에 저장한다. 이 때문에 new consumer는 카프카의 토픽으로부터 offset을 가져온다.

 

 

 

의존성 추가

dependencies {
	compile('org.apache.kafka:kafka-clients:2.3.0')
   }

 

 

 

Producer properties 설정 및 consumer 코드

    private final static String TOPIC = "my-example-topic";
    private final static String BOOTSTRAP_SERVERS = "localhost:9092,localhost:9093,localhost:9094";
    static Properties props = new Properties();
    static  {
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "KafkaExampleConsumer");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    }


    private static Consumer<Long, String> createConsumer() {
        // Create the consumer using props.
        final Consumer<Long, String> consumer = new KafkaConsumer<Long, String>(props);

        // Subscribe to the topic.
        consumer.subscribe(Collections.singletonList(TOPIC));
        return consumer;
    }

    public static void runConsumer()  {
        final Consumer<Long, String> consumer = createConsumer();

        final int giveUp = 100;   int noRecordsCount = 0;

        while (true) {
            final ConsumerRecords<Long, String> consumerRecords =
                    consumer.poll(Duration.ofMillis(1000l));

            if (consumerRecords.count()==0) {
                noRecordsCount++;
                if (noRecordsCount > giveUp) break;
                else continue;
            }

            consumerRecords.forEach(record -> {
                System.err.printf("Consumer Record:(%d, %s, %d, %d)\n",
                        record.key(), record.value(),
                        record.partition(), record.offset());
            });

            consumer.commitSync();
        }
        consumer.close();
        System.err.println("DONE");
    }

 

 

 

 

consumer group

 

- 카프카의 장점은 하나의 토픽에 여러 consumer가 동시에 접속해 메시지를 가져올 수 있다.

- 컨슈머 그룹의 각각의 컨슈머는 토픽 데이터를 적절히 나누어 처리가 가능하다.

- 컨슈머 그룹 내 컨슈머는 토픽의 파티션에 대해 소유권을 공유하며 하나의 토픽에 대해 책임을 갖는다.

- 만일 p0 partiotion의 소유권을 갖는 consumer가 down 되거나, 메시지가 너무 많아 partition을 소비할 수 없는 상황이 생긴다면 소유권을 추가된 consumer에게 넘겨줘야 하는데 이를 리밸런스라고 한다.

- 단점은 컨슈머가 리밸런스를 수행할 때, 메시지를 일시적으로 가져오지 않는다. 

 

 


https://coding-start.tistory.com/137 [코딩스타트]

https://kafka.apache.org/documentation.html#consumerapi

http://cloudurable.com/blog/kafka-tutorial-kafka-consumer/index.html

 

Kafka Tutorial: Creating a Kafka Consumer in Java

Tweet                                                                             Kafka Tutorial: Writing a Kafka Consumer in Java In this tutorial, you are going to create simple Kafka Consumer. This consumer consumes messages from the Kafka Producer you

cloudurable.com

 

'Cloud & NoSQL & Middleware > Kafka' 카테고리의 다른 글

Kafka - Producer API  (0) 2019.11.03
Kafka Overview  (0) 2019.11.02
Zookeeper api 테스트  (0) 2019.11.02
Zookeeper  (0) 2019.11.02

+ Recent posts