搭建了新的 Kafka 集群,需要对这个集群做一些监控。另外,集群需要做点优化,需要监控信息作为判断依据,所以就有了需要采集 Kafka 监控的需求。
配置 JMX
Kafka 的运行态的信息主要通过 JMX 暴露出来。开启 JMX 的方法如下:
将下面内容加到 Kafka 的 systemd 文件的合适位置中:
Environment="KAFKA_JMX_OPTS=-Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=127.0.0.1 -Djava.net.preferIPv4Stack=true"
Environment="JMX_PORT=4321"
其中 127.0.0.1
和 4321
要修改为 kafka 所在的机器的内网 ip 和你需要的端口。
安装 Logstash
在终端输入下面命令即可完成安装:
wget https://artifacts.elastic.co/downloads/logstash/logstash-6.2.4.rpm
yum install logstash-6.2.4.rpm
安装 JMX input Plugin
检查是否安装了该插件
/usr/share/logstash/bin/logstash-plugin list
如发现没有该插件则需要安装
/usr/share/logstash/bin/logstash-plugin install logstash-input-jmx
执行后返回下面信息,就算安装成功
Validating logstash-input-jmx
Installing logstash-input-jmx
Installation successful
更多插件操作指南查看:here
添加配置
配置 kafka-status.conf
cd /etc/logstash/conf.d
touch kafka-status.conf
vim kafka-status.conf
输入下面内容到 kafka-status.conf
input {
jmx {
path => "/etc/logstash/conf.d/kafka-jmx"
type => "jmx"
polling_frequency => 60
nb_thread => 4
tags => ["kafka"]
}
}
output {
if "kafka" in [tags] {
elasticsearch {
hosts => ["192.168.32.221:24702", "192.168.32.222:24702", "192.168.32.223:24702"]
index => "logstash-kafka-%{+YYYY.MM.dd}"
}
#stdout { codec => rubydebug }
}
}
其中 input
里的 jmx
的 path
是存放采集 Kafka 的需要的 jmx 信息,每个 kafka borker 一个文件,后面会再介绍这个文件;polling_frequency
的是意思是采集频率,这里配置为60秒;nb_thread
是开启多少个线程数去做采集工作,这里填4,也是默认值。elasticsearch
的 hosts
的值需要根据实际使用情况去决定放到哪个 es 集群中,index
也要根据实际情况修改。
配置 kafka-jmx
这里以采集192.168.32.167
上的 kafka 为例。
mkdir /etc/logstash/conf.d/kafka-jmx
touch kafka-167.json
vim kafka-167.json
输入下面内容到kafka-167.json
{
"host" : "192.168.32.167",
"port" : 33082,
"alias" : "kafka.jmx",
"queries": [{
"object_name" : "java.lang:type=Memory",
"object_alias" : "Memory"
}, {
"object_name" : "java.lang:type=Runtime",
"attributes" : [ "Uptime", "StartTime" ],
"object_alias" : "Runtime"
}, {
"object_name" : "java.lang:type=GarbageCollector,name=*",
"attributes" : [ "CollectionCount", "CollectionTime" ],
"object_alias" : "${type}.${name}"
}, {
"object_name" : "java.nio:type=BufferPool,name=*",
"object_alias" : "${type}.${name}"
}, {
"object_name": "kafka.server:type=ZooKeeperClientMetrics,name=ZooKeeperRequestLatencyMs",
"object_alias": "${type}.ZooKeeperRequestLatencyMs"
}, {
"object_name": "kafka.server:type=SessionExpireListener,name=*",
"object_alias": "${type}.${name}"
}, {
"object_name": "kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec",
"object_alias": "${type}.BytesInPerSec"
}, {
"object_name": "kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec",
"object_alias": "${type}.BytesOutPerSec"
}, {
"object_name": "kafka.server:type=BrokerTopicMetrics,name=BytesRejectedPerSec",
"object_alias": "${type}.BytesRejectedPerSec"
}, {
"object_name": "kafka.server:type=BrokerTopicMetrics,name=FailedFetchRequestsPerSec",
"object_alias": "${type}.FailedFetchRequestsPerSec"
}, {
"object_name": "kafka.server:type=BrokerTopicMetrics,name=FailedProduceRequestsPerSec",
"object_alias": "${type}.FailedProduceRequestsPerSec"
}, {
"object_name": "kafka.server:type=BrokerTopicMetrics,name=FetchMessageConversionsPerSec",
"object_alias": "${type}.FetchMessageConversionsPerSec"
}, {
"object_name": "kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec",
"object_alias": "${type}.MessagesInPerSec"
}, {
"object_name": "kafka.server:type=BrokerTopicMetrics,name=ProduceMessageConversionsPerSec",
"object_alias": "${type}.ProduceMessageConversionsPerSec"
}, {
"object_name": "kafka.server:type=BrokerTopicMetrics,name=TotalFetchRequestsPerSec",
"object_alias": "${type}.TotalFetchRequestsPerSec"
}, {
"object_name": "kafka.server:type=BrokerTopicMetrics,name=TotalProduceRequestsPerSec",
"object_alias": "${type}.TotalProduceRequestsPerSec"
}, {
"object_name": "kafka.server:type=ReplicaManager,name=*",
"object_alias": "${type}.${name}"
}, {
"object_name": "kafka.controller:type=ControllerStats,name=*",
"object_alias": "${type}.${name}"
}, {
"object_name": "kafka.controller:type=KafkaController,name=*",
"object_alias": "${type}.${name}"
}, {
"object_name": "kafka.network:type=RequestMetrics,name=*,request=*",
"object_alias": "${type}.${name}.${request}"
}]
}
这里采集的指标主要有:
- JVM 的垃圾回收指标
- JAVA 的 Runtime 指标
- Kafka Server 的指标
- Kafka Controller 的指标
- Kafka network 的指标
- Kafka Zookeeper 的指标
每个 kafka broker 一个json
文件。
总结
完成上面操作,就可以把采集指标收集到 es 里面,后面再出一个配置 kafka 监控 grafana 的仪表盘。
本文由 Chakhsu Lau 创作,采用 知识共享署名4.0 国际许可协议进行许可。
本站文章除注明转载/出处外,均为本站原创或翻译,转载前请务必署名。