티스토리 뷰
https://webfirewood.tistory.com/128
위 포스트에서 카프카에 대한 개략적인 설명을 드렸었습니다. 이제 실제로 로컬에 카프카를 설치해서 Java 로 메세지를 발행하고 받아보는 코드를 만들어 보도록 하겠습니다. Mac OS 기준의 설명인 점 양해 부탁드립니다.
다운로드
일단 아래 링크에서 카프카를 다운로드 받습니다. 이 포스트에서는 2.8.0 버전을 사용하도록 하겠습니다.
https://www.apache.org/dyn/closer.cgi?path=/kafka/2.8.0/kafka_2.13-2.8.0.tgz
Zookeeper 실행
위 링크의 파일을 다운로드 받아서 압축을 해제하면 준비는 끝입니다. 압축을 해제한 디렉토리로 이동해서 먼저 Zookeeper를 실행합니다.
bin/zookeeper-server-start.sh config/zookeeper.properties
Kafka 실행
이제 Kafka를 실행합니다.
bin/kafka-server-start.sh config/server.properties
잘 실행 되었는지 확인하기 위해서 netstat 커맨드를 사용할 수 있습니다.
netstat -an | grep 2181
Kafka 토픽 생성
Kafka에 토픽을 생성해 보겠습니다.
bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092
quickstart-events 라는 topic을 localhost:9092 에 생성하였습니다. 해당 토픽이 잘 생성 되었는지 확인해 보기 위해서 다음 커맨드를 입력합니다.
bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092
Producer
Producer 를 실행해 메세지를 발행해 보도록 하겠습니다.
bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
이후 발행하고자 하는 메세지를 입력해 보면 됩니다.
Consumer
Consumer 를 실행해서 발행한 메세지를 제대로 받는지 확인해 보겠습니다.
bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
잘 전송되는 것이 확인 되면 이제 Java 애플리케이션을 작성해 보도록 하겠습니다.
Java 프로젝트 생성 및 dependency 추가
Gradle 프로젝트를 생성하고 build.gradle 파일에 dependency 를 추가합니다.
dependencies {
...
implementation 'org.apache.kafka:kafka-clients:3.0.0'
implementation 'org.apache.kafka:kafka-streams:3.0.0'
implementation 'org.apache.kafka:kafka_2.13:3.0.0'
}
Consumer 작성
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
public class Consumer {
public static void main(String[] args) {
Properties configs = new Properties();
configs.put("bootstrap.servers", "localhost:9092"); // kafka server host 및 port
configs.put("session.timeout.ms", "10000"); // session 설정
configs.put("group.id", "quickstart-events"); // topic 설정
configs.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // key deserializer
configs.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // value deserializer
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(configs); // consumer 생성
consumer.subscribe(Arrays.asList("quickstart-events")); // topic 설정
while (true) {
ConsumerRecords<String, String> records = consumer.poll(500);
for (ConsumerRecord<String, String> record : records) {
String input = record.topic();
if ("quickstart-events".equals(input)) {
System.out.println(record.value());
} else {
throw new IllegalStateException("get message on topic " + record.topic());
}
}
}
}
}
실행한 뒤 위에서 실행한 프로듀서를 통해 발행된 메세지를 제대로 수신하는지 확인합니다.
Producer 작성
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.io.IOException;
import java.util.Properties;
public class Producer {
public static void main(String[] args) throws IOException {
Properties configs = new Properties();
configs.put("bootstrap.servers", "localhost:9092"); // kafka host 및 server 설정
configs.put("acks", "all"); // 자신이 보낸 메시지에 대해 카프카로부터 확인을 기다리지 않습니다.
configs.put("block.on.buffer.full", "true"); // 서버로 보낼 레코드를 버퍼링 할 때 사용할 수 있는 전체 메모리의 바이트수
configs.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // serialize 설정
configs.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // serialize 설정
// producer 생성
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(configs);
// message 전달
for (int i = 0; i < 5; i++) {
String v = "hello"+i;
producer.send(new ProducerRecord<String, String>("quickstart-events", v));
}
// 종료
producer.flush();
producer.close();
}
}
Producer 를 실행한 이후 Consumer 가 hello0 ~ 4 까지 메세지를 잘 받는지 확인합니다.
'Cloud' 카테고리의 다른 글
Kafka(카프카) 개관 (0) | 2020.05.13 |
---|
- Total
- Today
- Yesterday
- 코딩의 기술
- Count
- DP
- 야근
- REST API
- RESTful
- 자바스크립트개론
- 디자인패턴
- 유지보수
- 경고
- GROUP BY
- 마르코프 연쇄
- 문장 생성기
- Warning
- 로그
- 크롬
- 마르코프
- 자바스크립트 개론
- CONVENTIONS
- html
- 클린코드
- markov chain
- restful api
- 동적계획법
- 몰라서망신
- java
- was
- Markov
- Spring in Action
- 전략패턴
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | |||
5 | 6 | 7 | 8 | 9 | 10 | 11 |
12 | 13 | 14 | 15 | 16 | 17 | 18 |
19 | 20 | 21 | 22 | 23 | 24 | 25 |
26 | 27 | 28 | 29 | 30 | 31 |