티스토리 뷰

donaricano-btn
반응형

 

https://webfirewood.tistory.com/128

 

Kafka(카프카) 개관

 분산 서비스가 대세가된 지금 대부분의 현대 웹애플리케이션의 아키텍쳐는 위 그림과 같은 복잡한 모습을 가지게 됩니다. 보기만 해도 머리가 아플 정도로 복잡한 의존성 관계 때문에 시스템

webfirewood.tistory.com

 

위 포스트에서 카프카에 대한 개략적인 설명을 드렸었습니다. 이제 실제로 로컬에 카프카를 설치해서 Java 로 메세지를 발행하고 받아보는 코드를 만들어 보도록 하겠습니다. Mac OS 기준의 설명인 점 양해 부탁드립니다.

다운로드

일단 아래 링크에서 카프카를 다운로드 받습니다. 이 포스트에서는 2.8.0 버전을 사용하도록 하겠습니다.

 

https://www.apache.org/dyn/closer.cgi?path=/kafka/2.8.0/kafka_2.13-2.8.0.tgz 

 

Apache Download Mirrors

Copyright © 2020 The Apache Software Foundation, Licensed under the Apache License, Version 2.0. Apache and the Apache feather logo are trademarks of The Apache Software Foundation.

www.apache.org

Zookeeper 실행 

위 링크의 파일을 다운로드 받아서 압축을 해제하면 준비는 끝입니다. 압축을 해제한 디렉토리로 이동해서 먼저 Zookeeper를 실행합니다.

bin/zookeeper-server-start.sh config/zookeeper.properties

Kafka 실행

이제 Kafka를 실행합니다. 

bin/kafka-server-start.sh config/server.properties

잘 실행 되었는지 확인하기 위해서 netstat 커맨드를 사용할 수 있습니다.

netstat -an | grep 2181

netstat 커맨드의 결과

Kafka 토픽 생성

Kafka에 토픽을 생성해 보겠습니다.

bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092

quickstart-events 라는 topic을 localhost:9092 에 생성하였습니다. 해당 토픽이 잘 생성 되었는지 확인해 보기 위해서 다음 커맨드를 입력합니다.

bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092

Producer

Producer 를 실행해 메세지를 발행해 보도록 하겠습니다.

bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092

이후 발행하고자 하는 메세지를 입력해 보면 됩니다.

Consumer

Consumer 를 실행해서 발행한 메세지를 제대로 받는지 확인해 보겠습니다.

bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092

잘 전송되는 것이 확인 되면 이제 Java 애플리케이션을 작성해 보도록 하겠습니다.

Java 프로젝트 생성 및 dependency 추가

Gradle 프로젝트를 생성하고 build.gradle 파일에 dependency 를 추가합니다.

dependencies {
    ...

    implementation 'org.apache.kafka:kafka-clients:3.0.0'
    implementation 'org.apache.kafka:kafka-streams:3.0.0'
    implementation 'org.apache.kafka:kafka_2.13:3.0.0'
}

Consumer 작성

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;

public class Consumer {

    public static void main(String[] args) {

        Properties configs = new Properties();
        configs.put("bootstrap.servers", "localhost:9092"); // kafka server host 및 port
        configs.put("session.timeout.ms", "10000"); // session 설정
        configs.put("group.id", "quickstart-events");   // topic 설정
        configs.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");    // key deserializer
        configs.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");  // value deserializer

        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(configs);    // consumer 생성
        consumer.subscribe(Arrays.asList("quickstart-events")); // topic 설정

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(500);
            for (ConsumerRecord<String, String> record : records) {
                String input = record.topic();
                if ("quickstart-events".equals(input)) {
                    System.out.println(record.value());
                } else {
                    throw new IllegalStateException("get message on topic " + record.topic());
                }
            }
        }
    }
}

실행한 뒤 위에서 실행한 프로듀서를 통해 발행된 메세지를 제대로 수신하는지 확인합니다.

Producer 작성

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.io.IOException;
import java.util.Properties;

public class Producer {

    public static void main(String[] args) throws IOException {

        Properties configs = new Properties();
        configs.put("bootstrap.servers", "localhost:9092"); // kafka host 및 server 설정
        configs.put("acks", "all");                         // 자신이 보낸 메시지에 대해 카프카로부터 확인을 기다리지 않습니다.
        configs.put("block.on.buffer.full", "true");        // 서버로 보낼 레코드를 버퍼링 할 때 사용할 수 있는 전체 메모리의 바이트수
        configs.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");   // serialize 설정
        configs.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // serialize 설정

        // producer 생성
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(configs);

        // message 전달
        for (int i = 0; i < 5; i++) {
            String v = "hello"+i;
            producer.send(new ProducerRecord<String, String>("quickstart-events", v));
        }

        // 종료
        producer.flush();
        producer.close();
    }
}

Producer 를 실행한 이후 Consumer 가 hello0 ~ 4 까지 메세지를 잘 받는지 확인합니다.

반응형

'Cloud' 카테고리의 다른 글

Kafka(카프카) 개관  (0) 2020.05.13
donaricano-btn
댓글
반응형
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
«   2025/01   »
1 2 3 4
5 6 7 8 9 10 11
12 13 14 15 16 17 18
19 20 21 22 23 24 25
26 27 28 29 30 31
글 보관함