Skip to main content

Apache Kafka

Apache Kafka is a distributed event store and stream-processing platform. Telegraf is a plug-in driven server agent for collecting and sending metrics and events from databases, systems and IoT sensors.

This integration lets you send metrics from your Apache Kafka HTTP server using the JMX to Prometheus exporter connected to Telegraf.

To send your Prometheus-format Apache Kafka metrics to Logz.io, you need to add the inputs.prometheus and outputs.http plug-ins to your Telegraf configuration file.

Configure Apache Kafka to send your metrics to Logz.io

Before you begin, you'll need: Apache Kafka running on your server

Download the JMX to Prometheus exporter

note

Run all commands for this integration from your Apache Kafka directory.

Navigate to the Kafka directory on your host and run the following command:

wget https://repo1.maven.org/maven2/io/prometheus/jmx/jmx_prometheus_javaagent/0.16.1/jmx_prometheus_javaagent-0.16.1.jar

Create a configuration file for the JMX to Prometheus exporter

Navigate to the Kafka directory on your host, create a file named kafka.yml with the following content:

lowercaseOutputName: true


rules:
# Special cases and very specific rules
- pattern : kafka.server<type=(.+), name=(.+), clientId=(.+), topic=(.+), partition=(.*)><>Value
name: kafka_server_$1_$2
type: GAUGE
labels:
clientId: "$3"
topic: "$4"
partition: "$5"
- pattern : kafka.server<type=(.+), name=(.+), clientId=(.+), brokerHost=(.+), brokerPort=(.+)><>Value
name: kafka_server_$1_$2
type: GAUGE
labels:
clientId: "$3"
broker: "$4:$5"
- pattern : kafka.coordinator.(\w+)<type=(.+), name=(.+)><>Value
name: kafka_coordinator_$1_$2_$3
type: GAUGE

# Generic per-second counters with 0-2 key/value pairs
- pattern: kafka.(\w+)<type=(.+), name=(.+)PerSec\w*, (.+)=(.+), (.+)=(.+)><>Count
name: kafka_$1_$2_$3_total
type: COUNTER
labels:
"$4": "$5"
"$6": "$7"
- pattern: kafka.(\w+)<type=(.+), name=(.+)PerSec\w*, (.+)=(.+)><>Count
name: kafka_$1_$2_$3_total
type: COUNTER
labels:
"$4": "$5"
- pattern: kafka.(\w+)<type=(.+), name=(.+)PerSec\w*><>Count
name: kafka_$1_$2_$3_total
type: COUNTER

- pattern: kafka.server<type=(.+), client-id=(.+)><>([a-z-]+)
name: kafka_server_quota_$3
type: GAUGE
labels:
resource: "$1"
clientId: "$2"

- pattern: kafka.server<type=(.+), user=(.+), client-id=(.+)><>([a-z-]+)
name: kafka_server_quota_$4
type: GAUGE
labels:
resource: "$1"
user: "$2"
clientId: "$3"

# Generic gauges with 0-2 key/value pairs
- pattern: kafka.(\w+)<type=(.+), name=(.+), (.+)=(.+), (.+)=(.+)><>Value
name: kafka_$1_$2_$3
type: GAUGE
labels:
"$4": "$5"
"$6": "$7"
- pattern: kafka.(\w+)<type=(.+), name=(.+), (.+)=(.+)><>Value
name: kafka_$1_$2_$3
type: GAUGE
labels:
"$4": "$5"
- pattern: kafka.(\w+)<type=(.+), name=(.+)><>Value
name: kafka_$1_$2_$3
type: GAUGE

# Emulate Prometheus 'Summary' metrics for the exported 'Histogram's.
#
# Note that these are missing the '_sum' metric!
- pattern: kafka.(\w+)<type=(.+), name=(.+), (.+)=(.+), (.+)=(.+)><>Count
name: kafka_$1_$2_$3_count
type: COUNTER
labels:
"$4": "$5"
"$6": "$7"
- pattern: kafka.(\w+)<type=(.+), name=(.+), (.+)=(.*), (.+)=(.+)><>(\d+)thPercentile
name: kafka_$1_$2_$3
type: GAUGE
labels:
"$4": "$5"
"$6": "$7"
quantile: "0.$8"
- pattern: kafka.(\w+)<type=(.+), name=(.+), (.+)=(.+)><>Count
name: kafka_$1_$2_$3_count
type: COUNTER
labels:
"$4": "$5"
- pattern: kafka.(\w+)<type=(.+), name=(.+), (.+)=(.*)><>(\d+)thPercentile
name: kafka_$1_$2_$3
type: GAUGE
labels:
"$4": "$5"
quantile: "0.$6"
- pattern: kafka.(\w+)<type=(.+), name=(.+)><>Count
name: kafka_$1_$2_$3_count
type: COUNTER
- pattern: kafka.(\w+)<type=(.+), name=(.+)><>(\d+)thPercentile
name: kafka_$1_$2_$3
type: GAUGE
labels:
quantile: "0.$4"

- pattern: 'java.lang<name=([\s\w]+), type=GarbageCollector, key=(\w+)>(.*): (\d+)'
name: jvm_gc_$3
labels:
name: $1
key: $2
value: $4
type: GAUGE

- pattern: 'java.lang<name=([\s\w]+), type=GarbageCollector>(.*): (\d+)'
name: jvm_gc_$2
labels:
name: $1
value: $3
type: GAUGE

- pattern: kafka.log<type=LogFlushStats, name=LogFlushRateAndTimeMs><>Count
name: kafka_log_log_flush_rate_time
help: Log flush rate and time (rate in seconds, latency|time in ms)
type: UNTYPED

#kafka.connect:type=app-info,client-id="{clientid}"
#kafka.consumer:type=app-info,client-id="{clientid}"
#kafka.producer:type=app-info,client-id="{clientid}"
- pattern: 'kafka.(.+)<type=app-info, client-id=(.+)><>start-time-ms'
name: kafka_$1_start_time_seconds
labels:
clientId: "$2"
help: "Kafka $1 JMX metric start time seconds"
type: GAUGE
valueFactor: 0.001
- pattern: 'kafka.(.+)<type=app-info, client-id=(.+)><>(commit-id|version): (.+)'
name: kafka_$1_$3_info
value: 1
labels:
clientId: "$2"
$3: "$4"
help: "Kafka $1 JMX metric info version and commit-id"
type: GAUGE

#kafka.producer:type=producer-topic-metrics,client-id="{clientid}",topic="{topic}"", partition="{partition}"
#kafka.consumer:type=consumer-fetch-manager-metrics,client-id="{clientid}",topic="{topic}"", partition="{partition}"
- pattern: kafka.(.+)<type=(.+)-metrics, client-id=(.+), topic=(.+), partition=(.+)><>(.+-total|compression-rate|.+-avg|.+-replica|.+-lag|.+-lead)
name: kafka_$2_$6
labels:
clientId: "$3"
topic: "$4"
partition: "$5"
help: "Kafka $1 JMX metric type $2"
type: GAUGE

#kafka.producer:type=producer-topic-metrics,client-id="{clientid}",topic="{topic}"
#kafka.consumer:type=consumer-fetch-manager-metrics,client-id="{clientid}",topic="{topic}"", partition="{partition}"
- pattern: kafka.(.+)<type=(.+)-metrics, client-id=(.+), topic=(.+)><>(.+-total|compression-rate|.+-avg)
name: kafka_$2_$5
labels:
clientId: "$3"
topic: "$4"
help: "Kafka $1 JMX metric type $2"
type: GAUGE

#kafka.connect:type=connect-node-metrics,client-id="{clientid}",node-id="{nodeid}"
#kafka.consumer:type=consumer-node-metrics,client-id=consumer-1,node-id="{nodeid}"
- pattern: kafka.(.+)<type=(.+)-metrics, client-id=(.+), node-id=(.+)><>(.+-total|.+-avg)
name: kafka_$2_$5
labels:
clientId: "$3"
nodeId: "$4"
help: "Kafka $1 JMX metric type $2"
type: UNTYPED

#kafka.connect:type=kafka-metrics-count,client-id="{clientid}"
#kafka.consumer:type=consumer-fetch-manager-metrics,client-id="{clientid}"
#kafka.consumer:type=consumer-coordinator-metrics,client-id="{clientid}"
#kafka.consumer:type=consumer-metrics,client-id="{clientid}"
- pattern: kafka.(.+)<type=(.+)-metrics, client-id=(.*)><>(.+-total|.+-avg|.+-bytes|.+-count|.+-ratio|.+-age|.+-flight|.+-threads|.+-connectors|.+-tasks|.+-ago)
name: kafka_$2_$4
labels:
clientId: "$3"
help: "Kafka $1 JMX metric type $2"
type: GAUGE

#kafka.connect:type=connector-task-metrics,connector="{connector}",task="{task}<> status"
- pattern: 'kafka.connect<type=connector-task-metrics, connector=(.+), task=(.+)><>status: ([a-z-]+)'
name: kafka_connect_connector_status
value: 1
labels:
connector: "$1"
task: "$2"
status: "$3"
help: "Kafka Connect JMX Connector status"
type: GAUGE

#kafka.connect:type=task-error-metrics,connector="{connector}",task="{task}"
#kafka.connect:type=source-task-metrics,connector="{connector}",task="{task}"
#kafka.connect:type=sink-task-metrics,connector="{connector}",task="{task}"
#kafka.connect:type=connector-task-metrics,connector="{connector}",task="{task}"
- pattern: kafka.connect<type=(.+)-metrics, connector=(.+), task=(.+)><>(.+-total|.+-count|.+-ms|.+-ratio|.+-avg|.+-failures|.+-requests|.+-timestamp|.+-logged|.+-errors|.+-retries|.+-skipped)
name: kafka_connect_$1_$4
labels:
connector: "$2"
task: "$3"
help: "Kafka Connect JMX metric type $1"
type: GAUGE

#kafka.connect:type=connector-metrics,connector="{connector}"
#kafka.connect:type=connect-worker-metrics,connector="{connector}"
- pattern: kafka.connect<type=connect-worker-metrics, connector=(.+)><>([a-z-]+)
name: kafka_connect_worker_$2
labels:
connector: "$1"
help: "Kafka Connect JMX metric $1"
type: GAUGE

#kafka.connect:type=connect-worker-metrics
- pattern: kafka.connect<type=connect-worker-metrics><>([a-z-]+)
name: kafka_connect_worker_$1
help: "Kafka Connect JMX metric worker"
type: GAUGE

#kafka.connect:type=connect-worker-rebalance-metrics
- pattern: kafka.connect<type=connect-worker-rebalance-metrics><>([a-z-]+)
name: kafka_connect_worker_rebalance_$1
help: "Kafka Connect JMX metric rebalance information"
type: GAUGE

Start Zookeeper

Start the Zookeeper as follows:

bin/zookeeper-server-start.sh config/zookeeper.properties

Start Apache Kafka with the JMX to Prometheus exporter

Open a new terminal and run the following command:

KAFKA_OPTS="`$KAFKA_OPTS` -javaagent:`PWD`/jmx_prometheus_javaagent-0.16.1.jar=9100:`PWD`/kafka.yml" bin/kafka-server-start.sh config/server.properties

Set up Telegraf v1.17 on a dedicated machine

For Windows

wget https://dl.influxdata.com/telegraf/releases/telegraf-1.27.3_windows_amd64.zip

After downloading the archive, extract its content into C:\Program Files\Logzio\telegraf\.

The configuration file is located at C:\Program Files\Logzio\telegraf\.

For MacOS

brew install telegraf

The configuration file is located at /usr/local/etc/telegraf.conf.

For Linux

Ubuntu & Debian

sudo apt-get update && sudo apt-get install telegraf

The configuration file is located at /etc/telegraf/telegraf.conf.

RedHat and CentOS

sudo yum install telegraf

The configuration file is located at /etc/telegraf/telegraf.conf.

SLES & openSUSE

# add go repository
zypper ar -f obs://devel:languages:go/ go
# install latest telegraf
zypper in telegraf

The configuration file is located at /etc/telegraf/telegraf.conf.

FreeBSD/PC-BSD

sudo pkg install telegraf

The configuration file is located at /etc/telegraf/telegraf.conf.

Add the inputs.prometheus plug-in

First you need to configure the input plug-in to enable Telegraf to scrape data from your hosts. To do this, add the below code to the configuration file.

[[inputs.prometheus]]
urls = ["http://localhost:9100/metrics"]
response_timeout = "10s"

Add the outputs.http plug-in

After you create the configuration file, configure the output plug-in to enable Telegraf to send your data to Logz.io in Prometheus-format. To do this, add the following code to the configuration file:

[[outputs.http]]
url = "https://<<LISTENER-HOST>>:8053"
data_format = "prometheusremotewrite"
[outputs.http.headers]
Content-Type = "application/x-protobuf"
Content-Encoding = "snappy"
X-Prometheus-Remote-Write-Version = "0.1.0"
Authorization = "Bearer <<PROMETHEUS-METRICS-SHIPPING-TOKEN>>"

Replace the placeholders to match your specifics. (They are indicated by the double angle brackets << >>):

  • Replace <<LISTENER-HOST>> with the Logz.io Listener URL for your region, configured to use port 8052 for http traffic, or port 8053 for https traffic. For example, listener.logz.io if your account is hosted on AWS US East, or listener-nl.logz.io if hosted on Azure West Europe.
  • Replace <<PROMETHEUS-METRICS-SHIPPING-TOKEN>> with a token for the Metrics account you want to ship to.
    Here's how to look up your Metrics token.

Start Telegraf

On Windows:
telegraf.exe --service start
On MacOS:
telegraf --config telegraf.conf
On Linux:

Linux (sysvinit and upstart installations)

sudo service telegraf start

Linux (systemd installations)

systemctl start telegraf

Check Logz.io for your metrics

Give your data some time to get from your system to ours, then log in to your Logz.io Metrics account, and open the Logz.io Metrics tab.