张子阳的博客

首页 读书 技术 店铺 关于
张子阳的博客 首页 读书 技术 店铺 关于

Kafka分布式消息系统(通过控制台访问) - Part.4

2018-10-15 作者: 张子阳 分类: 分布式系统

在前面的三篇文章中,介绍了Kafka的相关概念,并且搭建了一个3节点的Kafka和Zookeeper集群。接下来,就可以对集群进行访问。访问的方式通常为各种编程语言的Kafka库,或者是使用命令行脚本。各种语言的编程库,可以在这个页面获得:https://cwiki.apache.org/confluence/display/KAFKA/Clients。这篇文章主要讲述如何通过命令行来访问Kafka集群,包括查看所有Topic(主题)、创建Topic、查看Topic详情、写入数据到Topic、读取Topic内容,以及删除Topic。

本文假设你已经搭建好了kafka集群和zookeeper集群,主机名分别为kafka1、kafka2、kafka3(zookeeper1、zookeeper2、zookeeper3)。只有一台主机也是可以的,其名称为kafka1/zookeeper1。

bin目录脚本

建议将kafka的默认目录设置为环境变量:KAFKA_HOME,以后再访问时方便一些,只要 cd $KAFKA_HOME就可以了(实际安装目录可能为/opt/kafka_2.11-1.1.0)。本文如无特别说明,所有的操作都是在$KAFKA_HOME文件夹下执行的。

在bin目录下包含了很多默认脚本。除了前面接触过的,启动和停止kafka、zookeeper集群的 kafka-server-start.sh、kafka-server-stop.sh、zookeeper-server-start.sh、zookeeper-server-stop.sh,还有其他二十几个。如果想要了解每个脚本的用途和用法,只需要不带参数执行一下就可以了,例如执行 bin/kafka-console-producer.sh:

# bin/kafka-console-producer.sh
Read data from standard input and publish it to Kafka.
Option                                       Description
------                                           -----------
--batch-size <Integer: size>       Number of messages to send in a single
                                               batch if they are not being sent
                                               synchronously. (default: 200)
--broker-list <String: broker-list>      REQUIRED: The broker list string in
                                               the form HOST1:PORT1,HOST2:PORT2.

... 以下省略100行

上面的第一句话描述了该脚本的用途:Read data from standard input and publish it to Kafka.(从标准输入读取数据并发布到Kafka中)。下面则列出了必须和可选的参数。接下来,我们先来看下如何创建一个topic。

创建topic

使用kafka-topics.sh可以创建一个名为 test.user 的Topic:

# bin/kafka-topics.sh --zookeeper zookeeper1:2181/kafka --create --topic test.user --replication-factor 2 --partitions 6
WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
Created topic "test.user".

主要的几个参数含义如下:

这里有一个警告,意思是Topic的名称要么使用'.',要么使用'_',但是不要两个都用,否则可能出现名称冲突问题。因为本处只使用了'.',因此可以不用理会。

查看topic详情

kafka-topics.sh 同样可以用来查看刚刚创建的topic:

# bin/kafka-topics.sh --zookeeper zookeeper1:2181/kafka --topic test.user --describe
Topic:test.user	PartitionCount:6	ReplicationFactor:2	Configs:
	Topic: test.user	Partition: 0	Leader: 3	Replicas: 3,1	Isr: 3,1
	Topic: test.user	Partition: 1	Leader: 1	Replicas: 1,2	Isr: 1,2
	Topic: test.user	Partition: 2	Leader: 2	Replicas: 2,3	Isr: 2,3
	Topic: test.user	Partition: 3	Leader: 3	Replicas: 3,2	Isr: 3,2
	Topic: test.user	Partition: 4	Leader: 1	Replicas: 1,3	Isr: 1,3
	Topic: test.user	Partition: 5	Leader: 2	Replicas: 2,1	Isr: 2,1    

可以看到test.user的主要信息都罗列了出来,包括分区数、副本数、Leader等。

查看topic列表

使用下面的命令,可以查看到所有的主题列表:

# bin/kafka-topics.sh --zookeeper zookeeper1:2181/kafka --list
__consumer_offsets
cdc.tgstat_ddztest.test_spark
connect-configs-aw
connect-offsets-aw
connect-status-aw
... 省略几十行
test.user

这里面会看到一些特殊的topic:__consumer_offsets,可以称作“系统topic”,用来记录Consumer提交的offsets,可见kafka也利用自身来记录一些系统数据。

connect-configs-aw、connect-offsets-aw、connect-status-aw是kafka connect组件用到的3个topic,也是“系统topic”,分别记录配置、偏移量和状态。

在最后一行,看到了我们前面创建的test.user主题。在我的主机上,还有几十个其他的topic,这里进行了省略。如果你是在一个新的集群上做测试,那么应该只有__consumer_offsets和test.user两个topic。

读/写 topic

创建好了topic之后,就可以对topic进行读写,此时需要同时打开两个控制台,一个用于写,一个用于读。

控制台(写)的命令如下:

# bin/kafka-console-producer.sh --broker-list kafka1:9092 --topic test.user

>

此时会进入交互模式,光标位于“>”右侧,可以输入文本,按下回车后,输入的内容将会发布到kafka集群的test.user主题当中。

控制台(读)的命令如下:

# bin/kafka-console-consumer.sh --bootstrap-server kafka1:9092 --topic test.user --from-beginning

此时,如果topic中包含有数据,会将数据输出到控制台,直到队列的末尾,然后等待新数据;如果没有数据(我们当前的情况),那么就会直接等待新数据。

现在在控制台(写)当中输入文本,可以在控制台(读)中立即看到。如下图所示:

控制台显示效果

值得注意的是上面的次序,因为1~6这几个数我输入的非常快,又因为有6个partition,因此这6条消息会发送到不同的partition当中,同一个partition中的消息是有序的,不同partition中的消息是无序的。因此,可以看到消息的顺序在写入端和读取端是不一致的。

删除topic

使用下面的命令,删除topic:

# bin/kafka-topics.sh --zookeeper zookeeper1:2181/kafka --delete --topic test.user
Topic test.user is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.

上面的提示是说:只有当delete.topic.enable开启时,删除topic才有效。而这个delete.topic.enable选项,是在搭建kafka集群时设置的,这里可以参看前面的文章。

删除完成后,再使用前面的命令查看该topic,会看到控制台返回空,说明没有test.user这个topic了。

至此,我们就完成了对topic的最常见操作:创建、删除、查看,以及读取和写入数据。使用控制台的方式进行操作,可以对集群的状态进行快速的验证,非常的方便。在实际生产中,还是通过各种语言的kafka库来进行访问。对集群进行查看,除了使用控制台,也可以通过第三方的一些UI工具,例如Yahoo的Kafka-Manager、Landoop的Kafka-topics-ui等。这个以后再介绍。

感谢阅读,希望这篇文章能给你带来帮助!