Comsumer

 

- 토픽에 저장된 메시지를 가져와서 소비하는 소비자

- 파티션 리더에게 메시지를 가져오기 요청을 한다.

- 컨슈머는 offset을 명시하고 그 위치로부터 메시지를 수신한다.

- offset을 재조정하여 과거 데이터를 가져올 수 있으며, 이 때문에 받았던 데이터를 재 수신할 수 있다.

 

 

Old Consumer vs New Consumer

 

- offset에 대한 zookeeper 사용 유무

- 기존에는 offset을 zookeeper의 znode에 저장했다면, kafka 0.9 버전 부터 토픽에 저장한다. 이 때문에 new consumer는 카프카의 토픽으로부터 offset을 가져온다.

 

 

 

의존성 추가

dependencies {
	compile('org.apache.kafka:kafka-clients:2.3.0')
   }

 

 

 

Producer properties 설정 및 consumer 코드

    private final static String TOPIC = "my-example-topic";
    private final static String BOOTSTRAP_SERVERS = "localhost:9092,localhost:9093,localhost:9094";
    static Properties props = new Properties();
    static  {
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "KafkaExampleConsumer");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    }


    private static Consumer<Long, String> createConsumer() {
        // Create the consumer using props.
        final Consumer<Long, String> consumer = new KafkaConsumer<Long, String>(props);

        // Subscribe to the topic.
        consumer.subscribe(Collections.singletonList(TOPIC));
        return consumer;
    }

    public static void runConsumer()  {
        final Consumer<Long, String> consumer = createConsumer();

        final int giveUp = 100;   int noRecordsCount = 0;

        while (true) {
            final ConsumerRecords<Long, String> consumerRecords =
                    consumer.poll(Duration.ofMillis(1000l));

            if (consumerRecords.count()==0) {
                noRecordsCount++;
                if (noRecordsCount > giveUp) break;
                else continue;
            }

            consumerRecords.forEach(record -> {
                System.err.printf("Consumer Record:(%d, %s, %d, %d)\n",
                        record.key(), record.value(),
                        record.partition(), record.offset());
            });

            consumer.commitSync();
        }
        consumer.close();
        System.err.println("DONE");
    }

 

 

 

 

consumer group

 

- 카프카의 장점은 하나의 토픽에 여러 consumer가 동시에 접속해 메시지를 가져올 수 있다.

- 컨슈머 그룹의 각각의 컨슈머는 토픽 데이터를 적절히 나누어 처리가 가능하다.

- 컨슈머 그룹 내 컨슈머는 토픽의 파티션에 대해 소유권을 공유하며 하나의 토픽에 대해 책임을 갖는다.

- 만일 p0 partiotion의 소유권을 갖는 consumer가 down 되거나, 메시지가 너무 많아 partition을 소비할 수 없는 상황이 생긴다면 소유권을 추가된 consumer에게 넘겨줘야 하는데 이를 리밸런스라고 한다.

- 단점은 컨슈머가 리밸런스를 수행할 때, 메시지를 일시적으로 가져오지 않는다. 

 

 


https://coding-start.tistory.com/137 [코딩스타트]

https://kafka.apache.org/documentation.html#consumerapi

http://cloudurable.com/blog/kafka-tutorial-kafka-consumer/index.html

 

Kafka Tutorial: Creating a Kafka Consumer in Java

Tweet                                                                             Kafka Tutorial: Writing a Kafka Consumer in Java In this tutorial, you are going to create simple Kafka Consumer. This consumer consumes messages from the Kafka Producer you

cloudurable.com

 

'Cloud & NoSQL & Middleware > Kafka' 카테고리의 다른 글

Kafka - Producer API  (0) 2019.11.03
Kafka Overview  (0) 2019.11.02
Zookeeper api 테스트  (0) 2019.11.02
Zookeeper  (0) 2019.11.02

Producer

 

- 카프카 클러스터에 토픽으로 메시지를 저장하게끔 전달하는 생산자(애플리케이션, 서버)

- 데이터 레코드를 파티션에 매핑하고 파티션 리더(broker)에 요청을 보냄

- 요청 받은 메시지는 라운드 로빈 방식으로 각 파티션에 분배

 

 

특징

 

bootstrap url : producer는 cluster에 대한 metadata를 가져오기 위해 최소 하나 이상의 broker에 연결한다. failover를 위해 1개 이상의 url 리스트를 설정한다.

 

serializer: 카프카는 tcp 기반의 byte array 포맷을 가진 데이터를 송수신한다. 데이터를 write할 때 producer는 미리 설정한 직렬화 클래스를 사용하여 byte array로 직렬화 한 후 전송한다. 

 

matching partition : 파티션을 매칭하여 데이터를 전송하는 것은 producer의 책임이다. 만일 파티션을 명시하지 않으면 key의 해쉬 값을 이용하여 파티션을 결정하고 데이터를 보낸다. 만일 키값도 존재하지 않는다면 라운드 로빈 방식으로 저장한다.

 

retry : 처리 실패 응답이나 재시도를 결정할 수 있다.

 

batch: 효율적인 메시지 전송을 위해 batch를 사용할 수 있다.

 

 

 

 

의존성 추가

dependencies {
	compile('org.apache.kafka:kafka-clients:2.3.0')
   }

 

 

 

Producer properties 설정

private final static String TOPIC = "my-example-topic";
private final static String BOOTSTRAP_SERVERS = "localhost:9092";


private static Producer<Long, String> createProducer() {
        Properties props = new Properties();
        //카프카 서버의 부트스트랩 주소를 설정한다.
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaExampleProducer");

       
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.RETRIES_CONFIG,  1);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 20000);
//        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
//        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 24568545);

                return new KafkaProducer<>(props);
    }

 

Kafka의 Producer의 Config는 java.utils.Properties를 통해 바인딩되며 옵션은 아래 링크에서 확인 가능하다.

https://kafka.apache.org/documentation/#producerconfigs

 

 

 

Producer Sync & Async Send Message

 

ublic static void runProducerSync(final int sendMessageCount) throws Exception {

        final Producer<Long, String> producer = createProducer();
        long time = System.currentTimeMillis();

        try {
            for (long index = time; index < time + sendMessageCount; index++) {
                final ProducerRecord<Long, String> record =
                        new ProducerRecord<>(TOPIC, index,
                                "Hello Mom " + index);

                // record 기록 시 key value, timestamp 기록
                RecordMetadata metadata = producer.send(record).get();

                long elapsedTime = System.currentTimeMillis() - time;
                System.out.printf("sent record(key=%s value=%s) " +
                                "meta(partition=%d, offset=%d) time=%d\n",
                        record.key(), record.value(), metadata.partition(),
                        metadata.offset(), elapsedTime);

            }
        } finally {
            producer.flush();
            producer.close();
        }
    }

    public static void runProducerAsync(final int sendMessageCount) throws InterruptedException {
        final Producer<Long, String> producer = createProducer();
        long time = System.currentTimeMillis();
        final CountDownLatch countDownLatch = new CountDownLatch(sendMessageCount);

        try {
            for (long index = time; index < time + sendMessageCount; index++) {
                final ProducerRecord<Long, String> record =
                        new ProducerRecord<>(TOPIC, index, "Hello Mom " + index);

                producer.send(record, (metadata, exception) -> {
                    long elapsedTime = System.currentTimeMillis() - time;
                    if (metadata != null) {
                        System.out.printf("sent record(key=%s value=%s) " +
                                        "meta(partition=%d, offset=%d) time=%d\n",
                                record.key(), record.value(), metadata.partition(),
                                metadata.offset(), elapsedTime);
                    } else {
                        exception.printStackTrace();
                    }
                });
            }
            countDownLatch.await(25, TimeUnit.SECONDS);
        } finally {
            producer.flush();
            producer.close();
        }
    }

 

 

 

카프카 프로듀서를 사용할 때 유의해야 하는 사항

  • 잘못된 파티셔닝 방식의 사용방지 : 파티션은 카프카에서 병렬 처리의 단위다. 메시지가 모든 토픽 파티션으로 균일하게 분배되도록 올바른 파티셔닝 전략을 선택해야한다. 만약 잘못된 파티셔닝 방식을 도입하게 되면 특정 파티션에만 요청이 집중되는 최악의 상황이 발생할 수 있으니 메시지가 모든 가용한 파티션에 분배 되도록 올바른 파티셔닝 방식을 정의하자.
  • 기존 토픽에 새로운 파티션 추가 방지 : 메시지를 분산시키기 위해서 키 값을 사용하여 파티션을 선택하는 토픽에는 파티션을 추가하는 것을 피해야한다. 새로운 파티션 추가는 각 키에 대해 산출된 해시 코드를 변경시키며, 이는 파티션의 수도 산출에 필요한 입력 중 하나로 사용하기 때문이다.

 

 

 

 

https://kafka.apache.org/

http://cloudurable.com/blog/kafka-tutorial-kafka-producer/index.html

https://coding-start.tistory.com/193?category=790331

'Cloud & NoSQL & Middleware > Kafka' 카테고리의 다른 글

Kafka - Consumer API  (0) 2019.11.03
Kafka Overview  (0) 2019.11.02
Zookeeper api 테스트  (0) 2019.11.02
Zookeeper  (0) 2019.11.02

Kafka

카프카는 분산 스트리밍 플랫폼(streaming platform)이다.

- mom으로 구현한 MQ System

- message broker

- pub/sub 모델

- 높은 throughput(처리량)

- 빠른 처리 속도

- 파일에 데이터를 기록한다.

 

 

용어

 브로커(Broker) - 카프카 클러스터의 서버 또는 노드를 말합니다.
 토픽(Topic) - 프로튜서와 컨슈머들이 카프카로 보낸 자신들의 메시지를 구분하기 위한 카테고리

 파티션(Partition) - 토픽을 분할 한 단위. 파티션을 늘려 병렬처리를 통해 처리속도를 높일 수 있음
 프로듀서(Producer) - 메시지를 생산하여 토픽에 저장하는 개체
 컨슈머(Comsumer) - 토픽 이름으로 저장된 메시지를 가져가는 개체

 

Goal

- producer와 consumer 사이의 느슨한 연결

- binary 데이터 형식을 사용한 다양한 데이터 format 관리

- 기존의 클러스터에 영향을 주지 않고 서버 확장(scale out) 지원

Concept

- 카프카는 하나 이상의 서버로 구성되는 클러스터에서 작동한다.

- 카프카 클러스터는 topic이라고 불리는 카테고리에 데이터 레코드 스트림을 저장한다.

- 각 레코드는 key, value, timestamp로 구성된다.

Broker, Zookeper

- broker : 카프카 서버. broker.id=1..n으로 함으로써 동일한 노드내에서 여러개의 broker서버를 띄울 수도 있다.

- zookeeper: broker 노드의 상태 정보를 관리 한다.. zookeeker는 컨트롤러를 선정하는데 컨트롤러는 파티션 관리를 책임지는 브로커 중 하나이다. 파티션 관리는 리더 선정, 토픽 생성, 파티션 생성, 복제복 관리를 포함한다. 또한 리더 노드가 다운되면  컨트롤러는 팔로워 중 파티션 리더를 선정한다. 선정 방식에서 과반 수 투표방식으로 결정하기 때문에 홀수로 구성해야 하고, 과반수 이상 살아 있으면 정상 동작한다.

 

API

 

카프카의 주요 API들은 아래와 같다.

  • Producer API : 애플리케이션은 이 API를 이용해서 하나 이상의 카프카 토픽에 스트림 레코드를 게시할 수 있다.
  • Consumer API : 애플리케이션은 이 API를 이용해서 하나 이상의 카프카 토픽으로 부터 스트림 레코드를 구독 할 수 있다.
  • Streams API : 애플리케이션이 하나 이상의 토픽에서 입력 스트림을 읽고 변환해서 하나 이상의 출력 토픽으로 스트림을 보낼 수 있도록 한다.
  • Connector API : Connector를 이용해서 재 사용 가능한 Producer 혹은 Consumers를 카프카 토픽에 연결 할 수 있다. 예를 들어 관계형 데이터베이스 컨넥터는 테이블에 대한 변경 사항을 캡처할 수 있다.

 

Topic

 

- 카프카의 모든 메시지는  byte array로 표현된다.

- 모든 토픽은 하나 이상의 파티션으로 나뉘어져 있다.

- 파티션 내 한 칸은 로그라고 불린다.

- 카프카 클러스터는 파티션에 설정된 모든 레코드에 보관 기간에 따라 저장한다. 기본 값은 7일이다.

- 여러 개로 나뉘어진 파티션은 내부 record에 timestamp를 갖고 있기 때문에 도착한 순서에 맞게 저장한다.

 

 

 

 

- producer는 토픽에 메시지를 저장한다.

- consumer는 다른 MQ와는 다르게 topic으로 부터 메시지를 pull 방식으로 가져온다. 

- producer는 로그 선행 기입 파일 마지막에 메시지를 write한다.

- consumer는 주어진 토픽 파티션에 속한 로그 파일에서 메시지를 가져 온다. consumer 자체가 능등적으로 offset을 저장하고 있으며 offset으로 데이터를 가져 오기 때문에 가볍다.

 

 

 

파티션의 분산 및 복제

 

- 물리적으로 토픽은 하나 이상의 파티션을 다른 broker에게 분배될 수 있다.

- 만일 순서가 중요한 데이터라면, 파티션을 1개로 구성하는 것도 고려할 수 있다. 

- 파티션을 복제하면 roundRobin 방식으로 쓰여 진다. 

- 파티션을 여러개로 구성하면 여러 쓰레드에서 병렬로 처리되기 때문에 높은 처리량을 보장 받을 수 있지만 순서 보장 고려, producer 메모리 증가, 장애 복구 시간 증가 등 여러가지를 고려해야 한다.

- 파티션은 지정된 복제 팩터에 따라 카프카 클러스터 전역에 복제되는데 각 파티션은 leader 브로커와 follower 브로커를 가지며, 파티션에 대한 모든 읽기와 쓰기 요청은 리더를 통해서만 진행된다.

 

 

 

 

Comsumer Group

 

 

 

 

- 카프카는 큐(queue) 발행/구독(Pub/Sub) 두 가지 모델을 지원한다. 큐 모델 에서 특정 메시지는 하나의 인스턴스(C1,C2..)에서만 꺼내갈 수 있지만, 발행/구독 모델에서는 특정 메시지를 하나 이상의 인스턴스에서 꺼내갈 수 있다.

 

- pub/sub 모델을 통해, 복수의 Consumer로 이루어진 Consumer group을 구성하여 1 topic의 데이터를 분산하여 처리가 가능하다.

이 때 토픽의 파티션 수가 컨슈머 그룹 내 수보다 클 때 모두 처리가 가능한데, 만일 컨슈머 그룹이 더 크다면 1개 이상의 컨슈머는  idle(유휴) 상태가 된다.

 

 

설치 및 QuickStart

 

 

 

참고:

https://coding-start.tistory.com/192?category=790331 [코딩스타트]

https://www.joinc.co.kr/w/man/12/Kafka/about

https://kafka.apache.org/introhttps://www.joinc.co.kr/w/man/12/Kafka/about

 

Apache Kafka

Apache Kafka: A Distributed Streaming Platform.

kafka.apache.org

 

 

 

'Cloud & NoSQL & Middleware > Kafka' 카테고리의 다른 글

Kafka - Consumer API  (0) 2019.11.03
Kafka - Producer API  (0) 2019.11.03
Zookeeper api 테스트  (0) 2019.11.02
Zookeeper  (0) 2019.11.02

1. 아래 페이지에서 gz파일 다운로드 및 압축 해제

https://www-us.apache.org/dist/zookeeper/

 

 

2. $ ./conf/zookeeperzoo.cfg 파일 생성 및 설정

tickTime=2000
dataDir=/tmp/zookeeper
clientPort=2181

 

 

3. 서버 구동(단일 서버 standalone)

$ ./bin/zkServer.sh start

 

 

 

 

4. 클라이언트 연결

$ ./bin/zKcli.sh --server 127.0.0.1:2181

 

 

 

5. node 및 데이터 생성 테스트

 

 

 

1) create: /zk_test 노드 및 my_data String 데이터 생성

2) get: /zk_test 노드 데이터 read

3) set: /zk_test 노드 데이터 write

4) delete: /zk_test s 노드 삭제

 

 

 

6. 서버 중지

$ ./bin/zkServer.sh stop

'Cloud & NoSQL & Middleware > Kafka' 카테고리의 다른 글

Kafka - Consumer API  (0) 2019.11.03
Kafka - Producer API  (0) 2019.11.03
Kafka Overview  (0) 2019.11.02
Zookeeper  (0) 2019.11.02

Zookeeper는 안정적인 분산 조정(distributed cordination) 처리를 가능하게 해주는 오픈소스 서버

 

- 설정 정보 유지, 이름 지정, 분산 동기화 및 그룹 서비스 제공을 위한 중앙 집중식 서비스

- 분산 처리 시스템을 구성할 때 서버 간의 정보 공유, 서버 상태 체크, 동기화에 대한 락 처리를 수행하는 대표적인 오픈소스

 

 

특징

 

1) zookeeper는 심플하다. 주키퍼는 분산된 프로세스가 znode로 불리는 네임스페이스가 cordinate(조정)할 수 있다. 높은 성능과 고가용성으로 대규모 분산처리 시스템에 적합하다. 

2) zookeeper는 복제된다. 조정 된 분산 프로세스와 마찬가지로 ZooKeeper 자체는 ensembl이라는 일련의 호스트를 통해 복제된다. 

 

클라이언트는 단일 zookeeper 서버와 연결하여 데이터를 주고받으며 감시 이벤트를 통해 연결 유지 중이라는 것을 알리는 heatbeat를 주고 받는다. 만일 특정 상황으로 인해 연결이 끊어지면, 다른 서버에 연결될 것이다. 이를 통해 SPOF(SinglePoint of Failure)가 되는 일을 막는다.

 

3) zookeeper는 순서화된다. ZooKeeper는 모든 ZooKeeper transaction의 order를 반영하는 숫자로 각 업데이트에 도장을 찍는다.

4) zookeeper는 빠르다. 쓰기보단 읽기 작업 성능에서 빠르다.

 

 

 

5) 네임스페이스는 파일시스템 계층과 유사하며, 디렉토리 구조의 각 노드에 데이터는 저장한다. 파일 시스템과 다른 점은 각 노드에 자식 노드의 데이터 혹은 관련된 데이터가 포함된다.

 

Znode

- Ephemeral Node(임시노드): 해당 노드는 노드를 생성한 클라이언트의 연결 세션이 유지될 때만 유효하다. 

- Persistance Node(영구노드): 데이터를 강제로 삭제하지 않는 이상, 영구적으로 저장되는 노드이다.

- Sequence Node(시퀀스 노드): 노드를 생성할 때 자동으로 sequence 번호가 붙는 노드. 주로 분산 락을 구현하는데 이용한다.

 

 

Watches

- 주키퍼는 watch라는 콘셉을 제공하는데, 클라이언트는 노드에 watch를 설정할 수 있다. znode가 업데이트되면 watch는 트리거가되고 제거되며 클라이언트는 노드가 변경되었다는 패킷을 수신하게 된다. 예를 들어 watch를 설정한 상태에서 클라이언트와 서버의 연결이 끊어지면 연결이 끊어졌다는 local 알림을 받을 수 있을 것이다.

 

API

- API는 다음과 같은 점을 지원한다.

 

1) create: 노드 생성

2) delete: 노드 삭제

3) exsists: 노드가 존재하는지 테스트

4) get data: 노드로부터 데이터 read

5) set data: 노드에 데이터 wrtie

6) get children: 자식 노드 검색

7) sync: 데이터가 전파될 때까지 대기

 

 

 

 

 

 

http://zookeeper.apache.org/

 

Apache ZooKeeper

http://zookeeper.apache.org/doc/r3.5.6/zookeeperOver.html

 

ZooKeeper: Because Coordinating Distributed Systems is a Zoo

https://bcho.tistory.com/1016

 

분산 코디네이터 Zookeeper(주키퍼) 소개

ZooKeeper란 무엇인가? 조대협 (http://bcho.tistory.com) 소개 분산 시스템을 설계 하다보면, 가장 문제점 중의 하나가 분산된 시스템간의 정보를 어떻게 공유할것이고, 클러스터에 있는 서버들의 상태를 체크할..

bcho.tistory.com

http://helloworld.naver.com/helloworld/textyle/294797

불러오는 중입니다...

 

'Cloud & NoSQL & Middleware > Kafka' 카테고리의 다른 글

Kafka - Consumer API  (0) 2019.11.03
Kafka - Producer API  (0) 2019.11.03
Kafka Overview  (0) 2019.11.02
Zookeeper api 테스트  (0) 2019.11.02

+ Recent posts