相关资料

安装&启动

macOS 上推荐使用 brew 安装,当然,前提要安装好 JDK,这个不多说。

由于 kafka 集群依赖 zookeeper,所以顺便也安装一下 zk。

brew install zookeeper
brew install kafka

我这边默认安装的是 2.0.0 版本。

启动服务

sudo brew services start kafka

生产者&消费者

进入 kafka 主目录 (PS: brew 安装的软件默认在 /usr/local/Cellar/ 目录下),可以看到 kafka 的一些命令脚本:

$ cd /usr/local/Cellar/kafka/2.0.0/libexec
$ ls -al bin/
total 256
drwxr-xr-x  32 tony  staff  1024  7 24 22:20 .
drwxr-xr-x   6 tony  staff   192 10 30 00:50 ..
-rwxr-xr-x   1 tony  staff  1421  7 24 22:20 connect-distributed.sh
-rwxr-xr-x   1 tony  staff  1418  7 24 22:20 connect-standalone.sh
-rwxr-xr-x   1 tony  staff   861  7 24 22:20 kafka-acls.sh
-rwxr-xr-x   1 tony  staff   873  7 24 22:20 kafka-broker-api-versions.sh
-rwxr-xr-x   1 tony  staff   864  7 24 22:20 kafka-configs.sh
-rwxr-xr-x   1 tony  staff   945  7 24 22:20 kafka-console-consumer.sh
-rwxr-xr-x   1 tony  staff   944  7 24 22:20 kafka-console-producer.sh
-rwxr-xr-x   1 tony  staff   871  7 24 22:20 kafka-consumer-groups.sh
-rwxr-xr-x   1 tony  staff   948  7 24 22:20 kafka-consumer-perf-test.sh
-rwxr-xr-x   1 tony  staff   871  7 24 22:20 kafka-delegation-tokens.sh
-rwxr-xr-x   1 tony  staff   869  7 24 22:20 kafka-delete-records.sh
-rwxr-xr-x   1 tony  staff   866  7 24 22:20 kafka-dump-log.sh
-rwxr-xr-x   1 tony  staff   863  7 24 22:20 kafka-log-dirs.sh
-rwxr-xr-x   1 tony  staff   862  7 24 22:20 kafka-mirror-maker.sh
-rwxr-xr-x   1 tony  staff   886  7 24 22:20 kafka-preferred-replica-election.sh
-rwxr-xr-x   1 tony  staff   959  7 24 22:20 kafka-producer-perf-test.sh
-rwxr-xr-x   1 tony  staff   874  7 24 22:20 kafka-reassign-partitions.sh
-rwxr-xr-x   1 tony  staff   874  7 24 22:20 kafka-replica-verification.sh
-rwxr-xr-x   1 tony  staff  9290  7 24 22:20 kafka-run-class.sh
-rwxr-xr-x   1 tony  staff  1376  7 24 22:20 kafka-server-start.sh
-rwxr-xr-x   1 tony  staff   997  7 24 22:20 kafka-server-stop.sh
-rwxr-xr-x   1 tony  staff   945  7 24 22:20 kafka-streams-application-reset.sh
-rwxr-xr-x   1 tony  staff   863  7 24 22:20 kafka-topics.sh
-rwxr-xr-x   1 tony  staff   958  7 24 22:20 kafka-verifiable-consumer.sh
-rwxr-xr-x   1 tony  staff   958  7 24 22:20 kafka-verifiable-producer.sh
-rwxr-xr-x   1 tony  staff  1722  7 24 22:20 trogdor.sh
-rwxr-xr-x   1 tony  staff   867  7 24 22:20 zookeeper-security-migration.sh
-rwxr-xr-x   1 tony  staff  1393  7 24 22:20 zookeeper-server-start.sh
-rwxr-xr-x   1 tony  staff  1001  7 24 22:20 zookeeper-server-stop.sh
-rwxr-xr-x   1 tony  staff   968  7 24 22:20 zookeeper-shell.sh

为了避免一些问题,先将默认的日志目录改为777:

sudo chmod -R 777 /usr/local/Cellar/kafka/2.0.0/libexec/logs/

开始操作~

我们使用一个叫 tony01 的 topic

# 发送一些消息(生产者)
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic tony01

# 启动消费者
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic tony01 --from-beginning

效果如图:

kafka 集群

现在测试一下有多个 kafka server 的情况。 首先复制配置文件

# 进入配置目录
cd /usr/local/Cellar/kafka/2.0.0/libexec/config

# 复制两份,分别对应各自的服务端
cp server.properties server01.properties
cp server.properties server02.properties

# 先创建好日志目录
mkdir /tmp/logs-kafka01
mkdir /tmp/logs-kafka02
chmod -R 777 /tmp/logs-kafka01
chmod -R 777 /tmp/logs-kafka02

# 配置不同的端口号和 broker.id,注意原配置文件有 broker.id 和 log.dirs 配置,需要注释或修改

vim server01.properties

broker.id=1
listeners=PLAINTEXT://:9093
log.dirs=/tmp/logs-kafka01
 
vim server02.properties:

broker.id=2
listeners=PLAINTEXT://:9094
log.dirs=/tmp/logs-kafka02

# 回到主目录,启动 kafka 服务
cd /usr/local/Cellar/kafka/2.0.0/libexec
bin/kafka-server-start.sh config/server01.properties &
bin/kafka-server-start.sh config/server02.properties &

最后看到这行

[2018-10-31 14:15:51,066] INFO [KafkaServer id=1] started (kafka.server.KafkaServer)

就说明启动成功了。

可以使用 ps aux | grep server01.properties 命令查看进程。

kafka 集群测试~~

# 进入 kafka 主目录
cd /usr/local/Cellar/kafka/2.0.0/libexec

# 创建一个新的 topic (3个副本),注意这里连接的是 zk
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 4 --topic my-replicated-topic

# 使用 describe topics 命令查看 broker 的状态
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic

# 扔一些信息进去
bin/kafka-console-producer.sh --broker-list localhost:9093 --topic my-replicated-topic

# 消费
bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --from-beginning --topic my-replicated-topic

# 换一个节点消费
bin/kafka-console-consumer.sh --bootstrap-server localhost:9094 --from-beginning --topic my-replicated-topic

常见问题

  1. 修改 logs 目录权限后,仍然报错:
[2018-10-31 14:07:02,901] ERROR Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
kafka.common.InconsistentBrokerIdException: Configured broker.id 1 doesn't match stored broker.id 0 in meta.properties. If you moved your data, make sure your configured broker.id matches. If you intend to create a new broker, you should remove all data in your data directories (log.dirs).

这是因为日志文件冲突导致的,检查一下 log.dirs 配置,如果配置文件里有两个,后面那一个生效。所以你需要注释掉一个。

参考文档

标签: none

添加新评论