- boxed primitive type에서는 자주 사용하는 값에 대해 Caching 기능을 갖고 있다.
Long 객체의 내부 코드를 살펴보면, -128~127 사이의 값을 Caching 해두고 있는데 범위 내의 값을 비교할 땐 미리 생성해 둔 객체로 비교를 한다.
public static void main(String[] args){
Long a = 127l;
Long b = 128l;
Long boxA = Long.valueOf(127);
Long boxB = Long.valueOf(128);
System.err.println(a==boxA); //TRUE
System.err.println(b==boxB); //FALSE
}
위의 예제를 통해 살펴보면, 해당 값의 범위를 벗어 나면 비교연산에 실패하게 되므로 주의 해야한다.
ublic static void runProducerSync(final int sendMessageCount) throws Exception {
final Producer<Long, String> producer = createProducer();
long time = System.currentTimeMillis();
try {
for (long index = time; index < time + sendMessageCount; index++) {
final ProducerRecord<Long, String> record =
new ProducerRecord<>(TOPIC, index,
"Hello Mom " + index);
// record 기록 시 key value, timestamp 기록
RecordMetadata metadata = producer.send(record).get();
long elapsedTime = System.currentTimeMillis() - time;
System.out.printf("sent record(key=%s value=%s) " +
"meta(partition=%d, offset=%d) time=%d\n",
record.key(), record.value(), metadata.partition(),
metadata.offset(), elapsedTime);
}
} finally {
producer.flush();
producer.close();
}
}
public static void runProducerAsync(final int sendMessageCount) throws InterruptedException {
final Producer<Long, String> producer = createProducer();
long time = System.currentTimeMillis();
final CountDownLatch countDownLatch = new CountDownLatch(sendMessageCount);
try {
for (long index = time; index < time + sendMessageCount; index++) {
final ProducerRecord<Long, String> record =
new ProducerRecord<>(TOPIC, index, "Hello Mom " + index);
producer.send(record, (metadata, exception) -> {
long elapsedTime = System.currentTimeMillis() - time;
if (metadata != null) {
System.out.printf("sent record(key=%s value=%s) " +
"meta(partition=%d, offset=%d) time=%d\n",
record.key(), record.value(), metadata.partition(),
metadata.offset(), elapsedTime);
} else {
exception.printStackTrace();
}
});
}
countDownLatch.await(25, TimeUnit.SECONDS);
} finally {
producer.flush();
producer.close();
}
}
카프카 프로듀서를 사용할 때 유의해야 하는 사항
잘못된 파티셔닝 방식의 사용방지 : 파티션은 카프카에서 병렬 처리의 단위다. 메시지가 모든 토픽 파티션으로 균일하게 분배되도록 올바른 파티셔닝 전략을 선택해야한다. 만약 잘못된 파티셔닝 방식을 도입하게 되면 특정 파티션에만 요청이 집중되는 최악의 상황이 발생할 수 있으니 메시지가 모든 가용한 파티션에 분배 되도록 올바른 파티셔닝 방식을 정의하자.
기존 토픽에 새로운 파티션 추가 방지 : 메시지를 분산시키기 위해서 키 값을 사용하여 파티션을 선택하는 토픽에는 파티션을 추가하는 것을 피해야한다. 새로운 파티션 추가는 각 키에 대해 산출된 해시 코드를 변경시키며, 이는 파티션의 수도 산출에 필요한 입력 중 하나로 사용하기 때문이다.
브로커(Broker) - 카프카 클러스터의 서버 또는 노드를 말합니다. 토픽(Topic) - 프로튜서와 컨슈머들이 카프카로 보낸 자신들의 메시지를 구분하기 위한 카테고리
파티션(Partition) - 토픽을 분할 한 단위. 파티션을 늘려 병렬처리를 통해 처리속도를 높일 수 있음 프로듀서(Producer) - 메시지를 생산하여 토픽에 저장하는 개체 컨슈머(Comsumer) - 토픽 이름으로 저장된 메시지를 가져가는 개체
Goal
- producer와 consumer 사이의 느슨한 연결
- binary 데이터 형식을 사용한 다양한 데이터 format 관리
- 기존의 클러스터에 영향을 주지 않고 서버 확장(scale out) 지원
Concept
- 카프카는 하나 이상의 서버로 구성되는 클러스터에서 작동한다.
- 카프카 클러스터는 topic이라고 불리는 카테고리에 데이터 레코드 스트림을 저장한다.
- 각 레코드는 key, value, timestamp로 구성된다.
Broker, Zookeper
- broker : 카프카 서버.broker.id=1..n으로 함으로써 동일한 노드내에서 여러개의 broker서버를 띄울 수도 있다.
- zookeeper: broker 노드의 상태 정보를 관리 한다.. zookeeker는 컨트롤러를 선정하는데 컨트롤러는 파티션 관리를 책임지는 브로커 중 하나이다. 파티션 관리는 리더 선정, 토픽 생성, 파티션 생성, 복제복 관리를 포함한다. 또한 리더 노드가 다운되면 컨트롤러는 팔로워 중 파티션 리더를 선정한다. 선정 방식에서 과반 수 투표방식으로 결정하기 때문에 홀수로 구성해야 하고, 과반수 이상 살아 있으면 정상 동작한다.
API
카프카의 주요 API들은 아래와 같다.
Producer API : 애플리케이션은 이 API를 이용해서 하나 이상의 카프카 토픽에 스트림 레코드를 게시할 수 있다.
Consumer API : 애플리케이션은 이 API를 이용해서 하나 이상의 카프카 토픽으로 부터 스트림 레코드를 구독 할 수 있다.
Streams API : 애플리케이션이 하나 이상의 토픽에서 입력 스트림을 읽고 변환해서 하나 이상의 출력 토픽으로 스트림을 보낼 수 있도록 한다.
Connector API : Connector를 이용해서 재 사용 가능한 Producer 혹은 Consumers를 카프카 토픽에 연결 할 수 있다. 예를 들어 관계형 데이터베이스 컨넥터는 테이블에 대한 변경 사항을 캡처할 수 있다.
Topic
- 카프카의 모든 메시지는 byte array로 표현된다.
- 모든 토픽은 하나 이상의 파티션으로 나뉘어져 있다.
- 파티션 내 한 칸은 로그라고 불린다.
- 카프카 클러스터는 파티션에 설정된 모든 레코드에 보관 기간에 따라 저장한다. 기본 값은 7일이다.
- 여러 개로 나뉘어진 파티션은 내부 record에 timestamp를 갖고 있기 때문에 도착한 순서에 맞게 저장한다.
- producer는 토픽에 메시지를 저장한다.
- consumer는 다른 MQ와는 다르게 topic으로 부터 메시지를 pull 방식으로 가져온다.
- producer는 로그 선행 기입 파일 마지막에 메시지를 write한다.
- consumer는 주어진 토픽 파티션에 속한 로그 파일에서 메시지를 가져 온다. consumer 자체가 능등적으로 offset을 저장하고 있으며 offset으로 데이터를 가져 오기 때문에 가볍다.
파티션의 분산 및 복제
- 물리적으로 토픽은 하나 이상의 파티션을 다른 broker에게 분배될 수 있다.
- 만일 순서가 중요한 데이터라면, 파티션을 1개로 구성하는 것도 고려할 수 있다.
- 파티션을 복제하면 roundRobin 방식으로 쓰여 진다.
- 파티션을 여러개로 구성하면 여러 쓰레드에서 병렬로 처리되기 때문에 높은 처리량을 보장 받을 수 있지만 순서 보장 고려, producer 메모리 증가, 장애 복구 시간 증가 등 여러가지를 고려해야 한다.
- 파티션은 지정된 복제 팩터에 따라 카프카 클러스터 전역에 복제되는데 각 파티션은 leader 브로커와 follower 브로커를 가지며, 파티션에 대한 모든 읽기와 쓰기 요청은 리더를 통해서만 진행된다.
Comsumer Group
- 카프카는큐(queue)와발행/구독(Pub/Sub)두 가지 모델을 지원한다. 큐 모델 에서 특정 메시지는 하나의 인스턴스(C1,C2..)에서만 꺼내갈 수 있지만, 발행/구독 모델에서는 특정 메시지를 하나 이상의 인스턴스에서 꺼내갈 수 있다.
- pub/sub 모델을 통해, 복수의 Consumer로 이루어진 Consumer group을 구성하여 1 topic의 데이터를 분산하여 처리가 가능하다.
이 때 토픽의 파티션 수가 컨슈머 그룹 내 수보다 클 때 모두 처리가 가능한데, 만일 컨슈머 그룹이 더 크다면 1개 이상의 컨슈머는 idle(유휴) 상태가 된다.
Zookeeper는 안정적인 분산 조정(distributed cordination) 처리를 가능하게 해주는 오픈소스 서버
- 설정 정보 유지, 이름 지정, 분산 동기화 및 그룹 서비스 제공을 위한 중앙 집중식 서비스
- 분산 처리 시스템을 구성할 때 서버 간의 정보 공유, 서버 상태 체크, 동기화에 대한 락 처리를 수행하는 대표적인 오픈소스
특징
1) zookeeper는 심플하다. 주키퍼는 분산된 프로세스가 znode로 불리는 네임스페이스가 cordinate(조정)할 수 있다. 높은 성능과 고가용성으로 대규모 분산처리 시스템에 적합하다.
2) zookeeper는 복제된다. 조정 된 분산 프로세스와 마찬가지로 ZooKeeper 자체는 ensembl이라는 일련의 호스트를 통해 복제된다.
클라이언트는 단일 zookeeper 서버와 연결하여 데이터를 주고받으며 감시 이벤트를 통해 연결 유지 중이라는 것을 알리는 heatbeat를 주고 받는다. 만일 특정 상황으로 인해 연결이 끊어지면, 다른 서버에 연결될 것이다. 이를 통해SPOF(SinglePoint of Failure)가 되는 일을 막는다.
3) zookeeper는 순서화된다. ZooKeeper는 모든 ZooKeeper transaction의 order를 반영하는 숫자로 각 업데이트에 도장을 찍는다.
4) zookeeper는 빠르다. 쓰기보단 읽기 작업 성능에서 빠르다.
5) 네임스페이스는 파일시스템 계층과 유사하며, 디렉토리 구조의 각 노드에 데이터는 저장한다. 파일 시스템과 다른 점은 각 노드에 자식 노드의 데이터 혹은 관련된 데이터가 포함된다.
Znode
- Ephemeral Node(임시노드): 해당 노드는 노드를 생성한 클라이언트의 연결 세션이 유지될 때만 유효하다.
- Persistance Node(영구노드): 데이터를 강제로 삭제하지 않는 이상, 영구적으로 저장되는 노드이다.
- Sequence Node(시퀀스 노드): 노드를 생성할 때 자동으로 sequence 번호가 붙는 노드. 주로 분산 락을 구현하는데 이용한다.
Watches
- 주키퍼는 watch라는 콘셉을 제공하는데, 클라이언트는 노드에 watch를 설정할 수 있다. znode가 업데이트되면 watch는 트리거가되고 제거되며 클라이언트는 노드가 변경되었다는 패킷을 수신하게 된다. 예를 들어 watch를 설정한 상태에서 클라이언트와 서버의 연결이 끊어지면 연결이 끊어졌다는 local 알림을 받을 수 있을 것이다.