Kafka 설치 및 Java 예제 코드(Producer, Consumer) 작성
https://webfirewood.tistory.com/128
Kafka(카프카) 개관
분산 서비스가 대세가된 지금 대부분의 현대 웹애플리케이션의 아키텍쳐는 위 그림과 같은 복잡한 모습을 가지게 됩니다. 보기만 해도 머리가 아플 정도로 복잡한 의존성 관계 때문에 시스템
webfirewood.tistory.com
위 포스트에서 카프카에 대한 개략적인 설명을 드렸었습니다. 이제 실제로 로컬에 카프카를 설치해서 Java 로 메세지를 발행하고 받아보는 코드를 만들어 보도록 하겠습니다. Mac OS 기준의 설명인 점 양해 부탁드립니다.
다운로드
일단 아래 링크에서 카프카를 다운로드 받습니다. 이 포스트에서는 2.8.0 버전을 사용하도록 하겠습니다.
https://www.apache.org/dyn/closer.cgi?path=/kafka/2.8.0/kafka_2.13-2.8.0.tgz
Apache Download Mirrors
Copyright © 2020 The Apache Software Foundation, Licensed under the Apache License, Version 2.0. Apache and the Apache feather logo are trademarks of The Apache Software Foundation.
www.apache.org
Zookeeper 실행
위 링크의 파일을 다운로드 받아서 압축을 해제하면 준비는 끝입니다. 압축을 해제한 디렉토리로 이동해서 먼저 Zookeeper를 실행합니다.
bin/zookeeper-server-start.sh config/zookeeper.properties
Kafka 실행
이제 Kafka를 실행합니다.
bin/kafka-server-start.sh config/server.properties
잘 실행 되었는지 확인하기 위해서 netstat 커맨드를 사용할 수 있습니다.
netstat -an | grep 2181
Kafka 토픽 생성
Kafka에 토픽을 생성해 보겠습니다.
bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092
quickstart-events 라는 topic을 localhost:9092 에 생성하였습니다. 해당 토픽이 잘 생성 되었는지 확인해 보기 위해서 다음 커맨드를 입력합니다.
bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092
Producer
Producer 를 실행해 메세지를 발행해 보도록 하겠습니다.
bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
이후 발행하고자 하는 메세지를 입력해 보면 됩니다.
Consumer
Consumer 를 실행해서 발행한 메세지를 제대로 받는지 확인해 보겠습니다.
bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
잘 전송되는 것이 확인 되면 이제 Java 애플리케이션을 작성해 보도록 하겠습니다.
Java 프로젝트 생성 및 dependency 추가
Gradle 프로젝트를 생성하고 build.gradle 파일에 dependency 를 추가합니다.
dependencies {
...
implementation 'org.apache.kafka:kafka-clients:3.0.0'
implementation 'org.apache.kafka:kafka-streams:3.0.0'
implementation 'org.apache.kafka:kafka_2.13:3.0.0'
}
Consumer 작성
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
public class Consumer {
public static void main(String[] args) {
Properties configs = new Properties();
configs.put("bootstrap.servers", "localhost:9092"); // kafka server host 및 port
configs.put("session.timeout.ms", "10000"); // session 설정
configs.put("group.id", "quickstart-events"); // topic 설정
configs.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // key deserializer
configs.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // value deserializer
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(configs); // consumer 생성
consumer.subscribe(Arrays.asList("quickstart-events")); // topic 설정
while (true) {
ConsumerRecords<String, String> records = consumer.poll(500);
for (ConsumerRecord<String, String> record : records) {
String input = record.topic();
if ("quickstart-events".equals(input)) {
System.out.println(record.value());
} else {
throw new IllegalStateException("get message on topic " + record.topic());
}
}
}
}
}
실행한 뒤 위에서 실행한 프로듀서를 통해 발행된 메세지를 제대로 수신하는지 확인합니다.
Producer 작성
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.io.IOException;
import java.util.Properties;
public class Producer {
public static void main(String[] args) throws IOException {
Properties configs = new Properties();
configs.put("bootstrap.servers", "localhost:9092"); // kafka host 및 server 설정
configs.put("acks", "all"); // 자신이 보낸 메시지에 대해 카프카로부터 확인을 기다리지 않습니다.
configs.put("block.on.buffer.full", "true"); // 서버로 보낼 레코드를 버퍼링 할 때 사용할 수 있는 전체 메모리의 바이트수
configs.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // serialize 설정
configs.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // serialize 설정
// producer 생성
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(configs);
// message 전달
for (int i = 0; i < 5; i++) {
String v = "hello"+i;
producer.send(new ProducerRecord<String, String>("quickstart-events", v));
}
// 종료
producer.flush();
producer.close();
}
}
Producer 를 실행한 이후 Consumer 가 hello0 ~ 4 까지 메세지를 잘 받는지 확인합니다.