Kafka JMX 监控数据采集
in Note with 0 comment
Kafka JMX 监控数据采集
in Note with 0 comment

搭建了新的 Kafka 集群,需要对这个集群做一些监控。另外,集群需要做点优化,需要监控信息作为判断依据,所以就有了需要采集 Kafka 监控的需求。

配置 JMX

Kafka 的运行态的信息主要通过 JMX 暴露出来。开启 JMX 的方法如下:

将下面内容加到 Kafka 的 systemd 文件的合适位置中:

Environment="KAFKA_JMX_OPTS=-Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=127.0.0.1 -Djava.net.preferIPv4Stack=true"
Environment="JMX_PORT=4321"

其中 127.0.0.14321 要修改为 kafka 所在的机器的内网 ip 和你需要的端口。

安装 Logstash

在终端输入下面命令即可完成安装:

wget https://artifacts.elastic.co/downloads/logstash/logstash-6.2.4.rpm
yum install logstash-6.2.4.rpm

安装 JMX input Plugin

检查是否安装了该插件

/usr/share/logstash/bin/logstash-plugin list

如发现没有该插件则需要安装

/usr/share/logstash/bin/logstash-plugin install logstash-input-jmx

执行后返回下面信息,就算安装成功

Validating logstash-input-jmx
Installing logstash-input-jmx
Installation successful

更多插件操作指南查看:here

添加配置

配置 kafka-status.conf

cd /etc/logstash/conf.d
touch kafka-status.conf
vim  kafka-status.conf

输入下面内容到 kafka-status.conf

input {
  jmx {
    path => "/etc/logstash/conf.d/kafka-jmx"
    type => "jmx"
    polling_frequency => 60
    nb_thread => 4
    tags => ["kafka"]
  }
}

output {
  if "kafka" in [tags] {
    elasticsearch {
      hosts => ["192.168.32.221:24702", "192.168.32.222:24702", "192.168.32.223:24702"]
      index => "logstash-kafka-%{+YYYY.MM.dd}"
    }
    #stdout { codec => rubydebug }
  }
}

其中 input 里的 jmxpath 是存放采集 Kafka 的需要的 jmx 信息,每个 kafka borker 一个文件,后面会再介绍这个文件;polling_frequency 的是意思是采集频率,这里配置为60秒;nb_thread 是开启多少个线程数去做采集工作,这里填4,也是默认值。elasticsearchhosts的值需要根据实际使用情况去决定放到哪个 es 集群中,index也要根据实际情况修改。

配置 kafka-jmx

这里以采集192.168.32.167上的 kafka 为例。

mkdir /etc/logstash/conf.d/kafka-jmx
touch kafka-167.json
vim  kafka-167.json

输入下面内容到kafka-167.json

{
  "host" : "192.168.32.167",
  "port" : 33082,
  "alias" : "kafka.jmx",
  "queries": [{
    "object_name" : "java.lang:type=Memory",
    "object_alias" : "Memory"
  }, {
    "object_name" : "java.lang:type=Runtime",
    "attributes" : [ "Uptime", "StartTime" ],
    "object_alias" : "Runtime"
  }, {
      "object_name" : "java.lang:type=GarbageCollector,name=*",
    "attributes" : [ "CollectionCount", "CollectionTime" ],
    "object_alias" : "${type}.${name}"
  }, {
      "object_name" : "java.nio:type=BufferPool,name=*",
    "object_alias" : "${type}.${name}"
  }, {
    "object_name": "kafka.server:type=ZooKeeperClientMetrics,name=ZooKeeperRequestLatencyMs",
    "object_alias": "${type}.ZooKeeperRequestLatencyMs"
  }, {
    "object_name": "kafka.server:type=SessionExpireListener,name=*",
    "object_alias": "${type}.${name}"
  }, {
    "object_name": "kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec",
    "object_alias": "${type}.BytesInPerSec"
  }, {
    "object_name": "kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec",
    "object_alias": "${type}.BytesOutPerSec"
  }, {
    "object_name": "kafka.server:type=BrokerTopicMetrics,name=BytesRejectedPerSec",
    "object_alias": "${type}.BytesRejectedPerSec"
  }, {
    "object_name": "kafka.server:type=BrokerTopicMetrics,name=FailedFetchRequestsPerSec",
    "object_alias": "${type}.FailedFetchRequestsPerSec"
  }, {
    "object_name": "kafka.server:type=BrokerTopicMetrics,name=FailedProduceRequestsPerSec",
    "object_alias": "${type}.FailedProduceRequestsPerSec"
  }, {
    "object_name": "kafka.server:type=BrokerTopicMetrics,name=FetchMessageConversionsPerSec",
    "object_alias": "${type}.FetchMessageConversionsPerSec"
  }, {
    "object_name": "kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec",
    "object_alias": "${type}.MessagesInPerSec"
  }, {
    "object_name": "kafka.server:type=BrokerTopicMetrics,name=ProduceMessageConversionsPerSec",
    "object_alias": "${type}.ProduceMessageConversionsPerSec"
  }, {
    "object_name": "kafka.server:type=BrokerTopicMetrics,name=TotalFetchRequestsPerSec",
    "object_alias": "${type}.TotalFetchRequestsPerSec"
  }, {
    "object_name": "kafka.server:type=BrokerTopicMetrics,name=TotalProduceRequestsPerSec",
    "object_alias": "${type}.TotalProduceRequestsPerSec"
  }, {
    "object_name": "kafka.server:type=ReplicaManager,name=*",
    "object_alias": "${type}.${name}"
  }, {
    "object_name": "kafka.controller:type=ControllerStats,name=*",
    "object_alias": "${type}.${name}"
  }, {
    "object_name": "kafka.controller:type=KafkaController,name=*",
    "object_alias": "${type}.${name}"
  }, {
    "object_name": "kafka.network:type=RequestMetrics,name=*,request=*",
    "object_alias": "${type}.${name}.${request}"
  }]
 }

这里采集的指标主要有:

每个 kafka broker 一个json文件。

总结

完成上面操作,就可以把采集指标收集到 es 里面,后面再出一个配置 kafka 监控 grafana 的仪表盘。

Responses