Firbase Cloud Messaging Http v1 api 를 사용하려면 Google API 대쉬보드에 API 사용을 등록해야 하는데 방법은 다음과 같다.

 

 

1. url 접속 

https://console.developers.google.com/apis/dashboard

 

 

1. Firebase Console에서 생성한 프로젝트를 선택합니다.

2. Firebase Cloud messaging API를 검색합니다.

 

 

 

 

3. 사용설정을 선택합니다.

해당 글에서는 FCM 서버로 메시지를 전송하는 protocol에 대해서만 살펴볼 것이며,

 

SDK를 이용한 클라이언트 작업은 https://firebase.google.com/docs/cloud-messaging 에서 자세히 확인할 수 있다.

 

FCM Provider 에 대한 예제는 https://github.com/chulman/firebase-cloud-messaging 에 작성하였다.

 

Protocol

- 기본적으로 HTTP와 xmpp를 통해 메시지를 전송할 수 있다.

 

HTTP

XMPP

UpStream/DownStream

다운스트림 전용

최대 4kb의 데이터

업스트림/다운스트림

최대 4kb의 데이터

메시징 방식

Request & Response

-> 응답받기 전까지 다른 메시지를 보내지 못하도록 차단.

승인 또는 실패(ACK or NACK Json 인코딩 XMPP 메시지 형태)를 비동기적으로 보냄

JSON

HTTP POST로 전송된 

JSon message

JSON MESSAGE가 XMPP 메시지로 캡슐화 됨.

일반텍스트

HTTP POST로 전송된

일반 텍스트 메시지

지원하지 않음.

멀티캐스트 다운스트림이 여러 등록 토큰으로 전송

JSON Message 형식에서 지원

지원하지 않음.

 

HTTP API

-  Http API로 메시지를 전송하는 포맷은 http api 와 http v1 api 2가지가 존재한다. 

 

-  http v1 api를 사용하려면 google api 대쉬보드를 통해 등록해야 한다. (참조: Google API 대쉬보드에 FCM API 설정하기)

 

 

1. http api VS http v1 api

 

- http api는 가장 심플한 레거시 방식으로 오랫동안 많은 사람들이 써왔던 방식이다. 특정 url로 규격에 맞는 json 포맷을 통해 전송하기 만하면 된다.

 

- http v1 api는 http api를 보안한 api라고 생각하면 될 것 같다. 가장 크게 강조하는 2가지는 보안과 멀티 플랫폼이다.

 

   + (1) 보안 : http v1 api 는 oauth 방식을 통해 fcm 서버와 연결하고 인증한다. (때문에 좀 더 까다롭다..) 

   + (2) 멀티 플랫폼 : 기존 http api를 통해 메시지를 전송할 때는 보내는 형식이 기기 중심적이었다. 여러 기기를 묶어서 공통된 메시지를 보낼 수 밖에 없었는데, 이를 보완하고 플랫폼 별(ios, android..) 다양한 메시지 내용과 포맷을 하나의 json 포맷으로 구성할 수 있고 때문에 한 번의 전송으로 여러 플랫폼에 메시지 전송이 가능하다.

 

(해당 fcm 블로그에서 자세한 사항을 확인할 수 있다.) - https://firebase.googleblog.com/2017/11/whats-new-with-fcm-customizing-messages.html

 

 

 

 

2. request header 

 

- request option : 메시지를 전송 할 때는 POST 방식이어야 한다.

- url : 

   + http api : https://fcm.googleapis.com/fcm/send    

   + http v1 api: https://fcm.googleapis.com/v1/{parent=projects/*}/messages:send 

 

- port : 주의할 점은 htps를 통한 연결이 때문에 443 포트에 대한 방화벽 정책이 허용되어야 한다.

 

- content type : application / json

- parameter : 

      + http api : "authrization" :  key = "api 서버 키"

      + http v1 api: "authrization" :  bearer + "api 서버 키"

 

 

 

3. request body

 

- 메시지를 전송할 때 body에 json 형식의 payload를 보낼 수 있는데 format은 아래에서 확인할 수 있다.

   +  https://firebase.google.com/docs/cloud-messaging/concept-options?hl=ko

 

FCM 메시지 정보  |  Firebase

Firebase 클라우드 메시징(FCM)은 다양한 메시징 옵션과 기능을 제공합니다. 이 페이지의 정보는 다양한 유형의 FCM 메시지에 관한 이해를 돕고 FCM으로 구현할 수 있는 기능을 소개하기 위한 내용입니다. 메시지 유형 FCM을 통해 2가지 유형의 메시지를 클라이언트에 보낼 수 있습니다. 알림 메시지: 종종 '표시 메시지'로 간주됩니다. FCM SDK에서 자동으로 처리합니다. 데이터 메시지: 클라이언트 앱에서 처리합니다. 알림 메시지에는 사용자에게

firebase.google.com

 

3.1 http api 요청 포맷 예

 

아래 포맷을 보면 특정 토큰(기기)에 notification이라는 메시지 형식을 통해 알림 형태를 지정하며, 커스텀 데이터를 추가해서 사용자가 메시지를 좀 더 다양하게 파악 할 수 있다.

{
  "message":{
    "token":"bk3RNwTe3H0:CI2k_HHwgIpoDKCIZvvDMExUdFQ3P1...",
    "notification":{
      "title":"Portugal vs. Denmark",
      "body":"great match!"
    },
    "data" : {
      "Nick" : "Mario",
      "Room" : "PortugalVSDenmark"
    }
  }
}

 

- 여러 토큰에 타겟해서 보낼 경우 registration_ids를 설정해서 보낼 수 있는데, 배열 갯수의 제한은 1000개 이다. 즉 한 번 전송시 1000개까지 밖에 전송할 수 없다.

 

{
  "message":{
    "notification":{
      "title":"Portugal vs. Denmark",
      "body":"great match!"
    },
    "data" : {
      "Nick" : "Mario",
      "Room" : "PortugalVSDenmark"
    },
    "registration_ids": ["bk3RNwTe3H0:CI2k_HHwgIpoDKCIZvvDMExUdFQ3P1...", ...] 
  }
}

 

- 때문에 FCM에서는 TOPIC이라는 키를 활용해서 좀 더 많은 디바이스에 한번에 전송할 수 있는 방법을 제공한다.  다음과 같이 메시지를 전송하면 "NEWS"라는 토픽을 등록한 기기에 모두 전송한다.

 

{
  "message":{
    "topic" : "NEWS",
    "notification":{
      "title":"Portugal vs. Denmark",
      "body":"great match!"
    },
    "data" : {
      "Nick" : "Mario",
      "Room" : "PortugalVSDenmark"
    }
  }
}

 

3.2 http v1 api 요청 포맷 예

- 여러 플랫폼에 전송할 수 있는 형식을 사용할 수 있다.

 

{
  "message":{
    "topic":"subscriber-updates",
    "notification":{
      "body" : "This week's edition is now available.",
      "title" : "NewsMagazine.com",
    },
    "data" : {
      "volume" : "3.21.15",
      "contents" : "http://www.news-magazine.com/world-week/21659772"
    },
    "android":{
      "priority":"normal"
    },
    "apns":{
      "headers":{
        "apns-priority":"5"
      }
    },
    "webpush": {
      "headers": {
        "Urgency": "high"
      }
    }
  }
}

 

 

 

4. response header & Error Code

- 참조: https://firebase.google.com/docs/cloud-messaging/http-server-ref?hl=ko#error-codes

 

 

 

5. response body

- 마찬가지로 http api와 http v1 api의 response body는 다르다.

 

 

5.1  response body

 

- 전송 시 registration_ids 키값 에 5개의 기기를 포함했을 때, body 다음과 같은 형식이 될 것이고 5기기의 결과 값이 result 배열에 포함된다.

 

 

- http v1 api로 전송시 성공한다면 다음과 같은 형식으로 응답을 내 뱉는다.

 

{"name": "projects/{project-name}/messages/2280331808525169080"}

 

 

- 가장 큰 문제점은 에러가 발생했을 때 처리 부분인데, 에러코드가 너무 빈약하다는 점이다. 자세하게 확인하려면 admin sdk를 통해 따로 요청하는 등 방법이 존재하는데 이에 따른 비용이 더 생길 우려가 있다.

 

구조

 

https://firebase.google.com/docs/cloud-messaging/fcm-architecture

 

 

 

개인적으로 Firebase 레퍼런스는 굉장히 친절하고 쉬운편이라고 생각해서 사이트를 조금만 참조해도 누구나 쉽게 이해할 수 있다고 생각하며, 개인적인 의견만 코멘트해 본다.

 

 

(1) : GUI 나 HTTP/XMPP 프로토콜을 사용할 수 있는 환경이라면 어디서든 메시지를 전송할 수 있다. 파이어베이스 sdk 라이브러리를 사용할 수 있는 환경은 https://firebase.google.com/docs/libraries에서 확인 가능하다.

 

(2) : 클라이언트에게 메시지 전송을 하려면 FCM 백엔드 서버에 클라이언트 정보를 같이 전달해줘야 하는데 토픽과 instance가 필요하다.

여기서 토픽은 클라이언트 정보를 묶는 topic (pub/sub 모델에서 자주 등장하는 topic 으로 생각해도 좋다) 을 전달해주거나 고유한 token(device 정보를 알 수 있는 uuid 값)을 함께 전달해야 한다.

 

(3) : FCM 백엔드 서버는 데이터를(json 혹은 xml) 분석하고 플랫폼에 맞게 전송 서버를 타겟할 것이다;.

 

(4) : 전송했던 타겟 정보와 일치한 클라이언트는 메시지를 수신한다.

 

 

 

LifeCycle

 

위의 플로우로 메시지를 전송하려면 반드시 선행해야하는 작업이 있는데, 당연하겠지만 메시지를 수신할 클라이언트는 자신의 정보를 FCM 서버에 등록해야 한다는 점이다. 순서는 다음과 같다.

 

 

1. 클라이언트는 자신의 정보( 토픽, 인스턴스) 를 FCM 백엔드 서버에 등록해야 한다.

2. 메시지를 전송할 주체 (3rd-parry 서버 혹은 모바일) 는 등록된 정보를 획득해야 하며, 해당 정보로 다운스트림 메시지를 전송한다.

 

 

Firebase

파이어베이스(Firebase)는 2011년 파이어베이스(Firebase, Inc)사가 개발하고 2014년 구글 인수된 모바일 및 웹 애플리케이션개발 플랫폼이다.

 

Firebase Cloud Messaging

Firebase 클라우드 메시징(FCM)은 무료로 메시지를 안정적으로 전송할 수 있는 교차 플랫폼 메시징 솔루션

구글에서는 GCM이라는 독자적인 플랫폼이 존재했었는데, 파이어베이스를 인수한 이후 GCM을 고도하여 FCM에 집중해오고 있다.

GCM은 2019년 5월 서비스 지원을 중단하게 되었다. 때문에 대부분 Push 솔루션을 구현한 회사들에서는 FCM으로 마이그레이션 작업을 준비했을 것이며 나 또한 그러했다.

(서비스 중단에 따라 반드시 변경해야 될 작업은 https://developers.google.com/cloud-messaging에서 확인해 볼 수 있다.)

 

그래서 어떻게 동작할까?

네티 인 액션

출처:https://firebase.google.com/docs/cloud-messaging/images/messaging-overview.png

 

 

 

다시 정리하면 FCM은 메시지 전송 플랫폼이다. 위 그림은 FCM이 어떻게 메시지를 클라이언트에게 전송하는지 모든 걸 나타낸다.

 

flow

(1) Firebase Console GUI 에서 메시지를 전송

 

(2) 3rd-party-server 혹은 모바일 클라이언트 에서 http 혹은 xmpp 프로토콜을 통해 메시지를 전송 

      - firebase는 손쉽게 메시지를 전송하도록 library를 제공한다.

 

(3) firebase 서버에서는 특정 기기로 메시지 전송 요청을 받으면 장치(ios, android, web)으로 알림을 전송

 

(4) 클라이언트는 메시지 수신

 

 

정리

- FCM을 사용함으로써 얻는 장점은 간단하다. cloud 서비스를 이용함으로써 서버리스 아키텍쳐를 구성하고 메모리나 CPU의 오버헤드, 트래픽, 서버 등 관리포인트가 크게 줄어들기 때문에 비용을 크게 감소시킬 수 있다. 특히 FCM은 완전히 무료다.  (물론 완전히 관리하지 않아도 된다는 말은 아니다.. 사용자 관리, 토픽 등.. 여러가지 있다.. ) 

 

 

Comsumer

 

- 토픽에 저장된 메시지를 가져와서 소비하는 소비자

- 파티션 리더에게 메시지를 가져오기 요청을 한다.

- 컨슈머는 offset을 명시하고 그 위치로부터 메시지를 수신한다.

- offset을 재조정하여 과거 데이터를 가져올 수 있으며, 이 때문에 받았던 데이터를 재 수신할 수 있다.

 

 

Old Consumer vs New Consumer

 

- offset에 대한 zookeeper 사용 유무

- 기존에는 offset을 zookeeper의 znode에 저장했다면, kafka 0.9 버전 부터 토픽에 저장한다. 이 때문에 new consumer는 카프카의 토픽으로부터 offset을 가져온다.

 

 

 

의존성 추가

dependencies {
	compile('org.apache.kafka:kafka-clients:2.3.0')
   }

 

 

 

Producer properties 설정 및 consumer 코드

    private final static String TOPIC = "my-example-topic";
    private final static String BOOTSTRAP_SERVERS = "localhost:9092,localhost:9093,localhost:9094";
    static Properties props = new Properties();
    static  {
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "KafkaExampleConsumer");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    }


    private static Consumer<Long, String> createConsumer() {
        // Create the consumer using props.
        final Consumer<Long, String> consumer = new KafkaConsumer<Long, String>(props);

        // Subscribe to the topic.
        consumer.subscribe(Collections.singletonList(TOPIC));
        return consumer;
    }

    public static void runConsumer()  {
        final Consumer<Long, String> consumer = createConsumer();

        final int giveUp = 100;   int noRecordsCount = 0;

        while (true) {
            final ConsumerRecords<Long, String> consumerRecords =
                    consumer.poll(Duration.ofMillis(1000l));

            if (consumerRecords.count()==0) {
                noRecordsCount++;
                if (noRecordsCount > giveUp) break;
                else continue;
            }

            consumerRecords.forEach(record -> {
                System.err.printf("Consumer Record:(%d, %s, %d, %d)\n",
                        record.key(), record.value(),
                        record.partition(), record.offset());
            });

            consumer.commitSync();
        }
        consumer.close();
        System.err.println("DONE");
    }

 

 

 

 

consumer group

 

- 카프카의 장점은 하나의 토픽에 여러 consumer가 동시에 접속해 메시지를 가져올 수 있다.

- 컨슈머 그룹의 각각의 컨슈머는 토픽 데이터를 적절히 나누어 처리가 가능하다.

- 컨슈머 그룹 내 컨슈머는 토픽의 파티션에 대해 소유권을 공유하며 하나의 토픽에 대해 책임을 갖는다.

- 만일 p0 partiotion의 소유권을 갖는 consumer가 down 되거나, 메시지가 너무 많아 partition을 소비할 수 없는 상황이 생긴다면 소유권을 추가된 consumer에게 넘겨줘야 하는데 이를 리밸런스라고 한다.

- 단점은 컨슈머가 리밸런스를 수행할 때, 메시지를 일시적으로 가져오지 않는다. 

 

 


https://coding-start.tistory.com/137 [코딩스타트]

https://kafka.apache.org/documentation.html#consumerapi

http://cloudurable.com/blog/kafka-tutorial-kafka-consumer/index.html

 

Kafka Tutorial: Creating a Kafka Consumer in Java

Tweet                                                                             Kafka Tutorial: Writing a Kafka Consumer in Java In this tutorial, you are going to create simple Kafka Consumer. This consumer consumes messages from the Kafka Producer you

cloudurable.com

 

'Cloud & NoSQL & Middleware > Kafka' 카테고리의 다른 글

Kafka - Producer API  (0) 2019.11.03
Kafka Overview  (0) 2019.11.02
Zookeeper api 테스트  (0) 2019.11.02
Zookeeper  (0) 2019.11.02

Producer

 

- 카프카 클러스터에 토픽으로 메시지를 저장하게끔 전달하는 생산자(애플리케이션, 서버)

- 데이터 레코드를 파티션에 매핑하고 파티션 리더(broker)에 요청을 보냄

- 요청 받은 메시지는 라운드 로빈 방식으로 각 파티션에 분배

 

 

특징

 

bootstrap url : producer는 cluster에 대한 metadata를 가져오기 위해 최소 하나 이상의 broker에 연결한다. failover를 위해 1개 이상의 url 리스트를 설정한다.

 

serializer: 카프카는 tcp 기반의 byte array 포맷을 가진 데이터를 송수신한다. 데이터를 write할 때 producer는 미리 설정한 직렬화 클래스를 사용하여 byte array로 직렬화 한 후 전송한다. 

 

matching partition : 파티션을 매칭하여 데이터를 전송하는 것은 producer의 책임이다. 만일 파티션을 명시하지 않으면 key의 해쉬 값을 이용하여 파티션을 결정하고 데이터를 보낸다. 만일 키값도 존재하지 않는다면 라운드 로빈 방식으로 저장한다.

 

retry : 처리 실패 응답이나 재시도를 결정할 수 있다.

 

batch: 효율적인 메시지 전송을 위해 batch를 사용할 수 있다.

 

 

 

 

의존성 추가

dependencies {
	compile('org.apache.kafka:kafka-clients:2.3.0')
   }

 

 

 

Producer properties 설정

private final static String TOPIC = "my-example-topic";
private final static String BOOTSTRAP_SERVERS = "localhost:9092";


private static Producer<Long, String> createProducer() {
        Properties props = new Properties();
        //카프카 서버의 부트스트랩 주소를 설정한다.
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaExampleProducer");

       
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.RETRIES_CONFIG,  1);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 20000);
//        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
//        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 24568545);

                return new KafkaProducer<>(props);
    }

 

Kafka의 Producer의 Config는 java.utils.Properties를 통해 바인딩되며 옵션은 아래 링크에서 확인 가능하다.

https://kafka.apache.org/documentation/#producerconfigs

 

 

 

Producer Sync & Async Send Message

 

ublic static void runProducerSync(final int sendMessageCount) throws Exception {

        final Producer<Long, String> producer = createProducer();
        long time = System.currentTimeMillis();

        try {
            for (long index = time; index < time + sendMessageCount; index++) {
                final ProducerRecord<Long, String> record =
                        new ProducerRecord<>(TOPIC, index,
                                "Hello Mom " + index);

                // record 기록 시 key value, timestamp 기록
                RecordMetadata metadata = producer.send(record).get();

                long elapsedTime = System.currentTimeMillis() - time;
                System.out.printf("sent record(key=%s value=%s) " +
                                "meta(partition=%d, offset=%d) time=%d\n",
                        record.key(), record.value(), metadata.partition(),
                        metadata.offset(), elapsedTime);

            }
        } finally {
            producer.flush();
            producer.close();
        }
    }

    public static void runProducerAsync(final int sendMessageCount) throws InterruptedException {
        final Producer<Long, String> producer = createProducer();
        long time = System.currentTimeMillis();
        final CountDownLatch countDownLatch = new CountDownLatch(sendMessageCount);

        try {
            for (long index = time; index < time + sendMessageCount; index++) {
                final ProducerRecord<Long, String> record =
                        new ProducerRecord<>(TOPIC, index, "Hello Mom " + index);

                producer.send(record, (metadata, exception) -> {
                    long elapsedTime = System.currentTimeMillis() - time;
                    if (metadata != null) {
                        System.out.printf("sent record(key=%s value=%s) " +
                                        "meta(partition=%d, offset=%d) time=%d\n",
                                record.key(), record.value(), metadata.partition(),
                                metadata.offset(), elapsedTime);
                    } else {
                        exception.printStackTrace();
                    }
                });
            }
            countDownLatch.await(25, TimeUnit.SECONDS);
        } finally {
            producer.flush();
            producer.close();
        }
    }

 

 

 

카프카 프로듀서를 사용할 때 유의해야 하는 사항

  • 잘못된 파티셔닝 방식의 사용방지 : 파티션은 카프카에서 병렬 처리의 단위다. 메시지가 모든 토픽 파티션으로 균일하게 분배되도록 올바른 파티셔닝 전략을 선택해야한다. 만약 잘못된 파티셔닝 방식을 도입하게 되면 특정 파티션에만 요청이 집중되는 최악의 상황이 발생할 수 있으니 메시지가 모든 가용한 파티션에 분배 되도록 올바른 파티셔닝 방식을 정의하자.
  • 기존 토픽에 새로운 파티션 추가 방지 : 메시지를 분산시키기 위해서 키 값을 사용하여 파티션을 선택하는 토픽에는 파티션을 추가하는 것을 피해야한다. 새로운 파티션 추가는 각 키에 대해 산출된 해시 코드를 변경시키며, 이는 파티션의 수도 산출에 필요한 입력 중 하나로 사용하기 때문이다.

 

 

 

 

https://kafka.apache.org/

http://cloudurable.com/blog/kafka-tutorial-kafka-producer/index.html

https://coding-start.tistory.com/193?category=790331

'Cloud & NoSQL & Middleware > Kafka' 카테고리의 다른 글

Kafka - Consumer API  (0) 2019.11.03
Kafka Overview  (0) 2019.11.02
Zookeeper api 테스트  (0) 2019.11.02
Zookeeper  (0) 2019.11.02

Kafka

카프카는 분산 스트리밍 플랫폼(streaming platform)이다.

- mom으로 구현한 MQ System

- message broker

- pub/sub 모델

- 높은 throughput(처리량)

- 빠른 처리 속도

- 파일에 데이터를 기록한다.

 

 

용어

 브로커(Broker) - 카프카 클러스터의 서버 또는 노드를 말합니다.
 토픽(Topic) - 프로튜서와 컨슈머들이 카프카로 보낸 자신들의 메시지를 구분하기 위한 카테고리

 파티션(Partition) - 토픽을 분할 한 단위. 파티션을 늘려 병렬처리를 통해 처리속도를 높일 수 있음
 프로듀서(Producer) - 메시지를 생산하여 토픽에 저장하는 개체
 컨슈머(Comsumer) - 토픽 이름으로 저장된 메시지를 가져가는 개체

 

Goal

- producer와 consumer 사이의 느슨한 연결

- binary 데이터 형식을 사용한 다양한 데이터 format 관리

- 기존의 클러스터에 영향을 주지 않고 서버 확장(scale out) 지원

Concept

- 카프카는 하나 이상의 서버로 구성되는 클러스터에서 작동한다.

- 카프카 클러스터는 topic이라고 불리는 카테고리에 데이터 레코드 스트림을 저장한다.

- 각 레코드는 key, value, timestamp로 구성된다.

Broker, Zookeper

- broker : 카프카 서버. broker.id=1..n으로 함으로써 동일한 노드내에서 여러개의 broker서버를 띄울 수도 있다.

- zookeeper: broker 노드의 상태 정보를 관리 한다.. zookeeker는 컨트롤러를 선정하는데 컨트롤러는 파티션 관리를 책임지는 브로커 중 하나이다. 파티션 관리는 리더 선정, 토픽 생성, 파티션 생성, 복제복 관리를 포함한다. 또한 리더 노드가 다운되면  컨트롤러는 팔로워 중 파티션 리더를 선정한다. 선정 방식에서 과반 수 투표방식으로 결정하기 때문에 홀수로 구성해야 하고, 과반수 이상 살아 있으면 정상 동작한다.

 

API

 

카프카의 주요 API들은 아래와 같다.

  • Producer API : 애플리케이션은 이 API를 이용해서 하나 이상의 카프카 토픽에 스트림 레코드를 게시할 수 있다.
  • Consumer API : 애플리케이션은 이 API를 이용해서 하나 이상의 카프카 토픽으로 부터 스트림 레코드를 구독 할 수 있다.
  • Streams API : 애플리케이션이 하나 이상의 토픽에서 입력 스트림을 읽고 변환해서 하나 이상의 출력 토픽으로 스트림을 보낼 수 있도록 한다.
  • Connector API : Connector를 이용해서 재 사용 가능한 Producer 혹은 Consumers를 카프카 토픽에 연결 할 수 있다. 예를 들어 관계형 데이터베이스 컨넥터는 테이블에 대한 변경 사항을 캡처할 수 있다.

 

Topic

 

- 카프카의 모든 메시지는  byte array로 표현된다.

- 모든 토픽은 하나 이상의 파티션으로 나뉘어져 있다.

- 파티션 내 한 칸은 로그라고 불린다.

- 카프카 클러스터는 파티션에 설정된 모든 레코드에 보관 기간에 따라 저장한다. 기본 값은 7일이다.

- 여러 개로 나뉘어진 파티션은 내부 record에 timestamp를 갖고 있기 때문에 도착한 순서에 맞게 저장한다.

 

 

 

 

- producer는 토픽에 메시지를 저장한다.

- consumer는 다른 MQ와는 다르게 topic으로 부터 메시지를 pull 방식으로 가져온다. 

- producer는 로그 선행 기입 파일 마지막에 메시지를 write한다.

- consumer는 주어진 토픽 파티션에 속한 로그 파일에서 메시지를 가져 온다. consumer 자체가 능등적으로 offset을 저장하고 있으며 offset으로 데이터를 가져 오기 때문에 가볍다.

 

 

 

파티션의 분산 및 복제

 

- 물리적으로 토픽은 하나 이상의 파티션을 다른 broker에게 분배될 수 있다.

- 만일 순서가 중요한 데이터라면, 파티션을 1개로 구성하는 것도 고려할 수 있다. 

- 파티션을 복제하면 roundRobin 방식으로 쓰여 진다. 

- 파티션을 여러개로 구성하면 여러 쓰레드에서 병렬로 처리되기 때문에 높은 처리량을 보장 받을 수 있지만 순서 보장 고려, producer 메모리 증가, 장애 복구 시간 증가 등 여러가지를 고려해야 한다.

- 파티션은 지정된 복제 팩터에 따라 카프카 클러스터 전역에 복제되는데 각 파티션은 leader 브로커와 follower 브로커를 가지며, 파티션에 대한 모든 읽기와 쓰기 요청은 리더를 통해서만 진행된다.

 

 

 

 

Comsumer Group

 

 

 

 

- 카프카는 큐(queue) 발행/구독(Pub/Sub) 두 가지 모델을 지원한다. 큐 모델 에서 특정 메시지는 하나의 인스턴스(C1,C2..)에서만 꺼내갈 수 있지만, 발행/구독 모델에서는 특정 메시지를 하나 이상의 인스턴스에서 꺼내갈 수 있다.

 

- pub/sub 모델을 통해, 복수의 Consumer로 이루어진 Consumer group을 구성하여 1 topic의 데이터를 분산하여 처리가 가능하다.

이 때 토픽의 파티션 수가 컨슈머 그룹 내 수보다 클 때 모두 처리가 가능한데, 만일 컨슈머 그룹이 더 크다면 1개 이상의 컨슈머는  idle(유휴) 상태가 된다.

 

 

설치 및 QuickStart

 

 

 

참고:

https://coding-start.tistory.com/192?category=790331 [코딩스타트]

https://www.joinc.co.kr/w/man/12/Kafka/about

https://kafka.apache.org/introhttps://www.joinc.co.kr/w/man/12/Kafka/about

 

Apache Kafka

Apache Kafka: A Distributed Streaming Platform.

kafka.apache.org

 

 

 

'Cloud & NoSQL & Middleware > Kafka' 카테고리의 다른 글

Kafka - Consumer API  (0) 2019.11.03
Kafka - Producer API  (0) 2019.11.03
Zookeeper api 테스트  (0) 2019.11.02
Zookeeper  (0) 2019.11.02

1. 아래 페이지에서 gz파일 다운로드 및 압축 해제

https://www-us.apache.org/dist/zookeeper/

 

 

2. $ ./conf/zookeeperzoo.cfg 파일 생성 및 설정

tickTime=2000
dataDir=/tmp/zookeeper
clientPort=2181

 

 

3. 서버 구동(단일 서버 standalone)

$ ./bin/zkServer.sh start

 

 

 

 

4. 클라이언트 연결

$ ./bin/zKcli.sh --server 127.0.0.1:2181

 

 

 

5. node 및 데이터 생성 테스트

 

 

 

1) create: /zk_test 노드 및 my_data String 데이터 생성

2) get: /zk_test 노드 데이터 read

3) set: /zk_test 노드 데이터 write

4) delete: /zk_test s 노드 삭제

 

 

 

6. 서버 중지

$ ./bin/zkServer.sh stop

'Cloud & NoSQL & Middleware > Kafka' 카테고리의 다른 글

Kafka - Consumer API  (0) 2019.11.03
Kafka - Producer API  (0) 2019.11.03
Kafka Overview  (0) 2019.11.02
Zookeeper  (0) 2019.11.02

+ Recent posts