Kafka Administration Using Command Line Tools

In some situations, it is convenient to use the command line tools available in Kafka to administer your cluster. However, it is important to note that not all tools available for Kafka are supported by Cloudera. Moreover, certain administration tasks can be carried more easily and conveniently using Cloudera Manager. Therefore, before you continue, make sure to review Unsupported Command Line Tools and Notes on Kafka CLI Administration.

Unsupported Command Line Tools

The following tools can be found as part of the Kafka distribution, but their use is generally discouraged for various reasons as documented here.

Tool Notes
connect-distributed

connect-standalone

Kafka Connect is currently not supported.
kafka-acls Cloudera recommends using Sentry to manage ACLs instead of this tool.
kafka-broker-api-versions Primarily useful for Client-to-Broker protocol related development.
kafka-configs Use Cloudera Manager to adjust any broker or security properties instead of the kafka‑configs tool. This tool should only be used to modify topic properties.
kafka-delete-records Do not use with CDH.
kafka-log-dirs For querying log directory message. You can use the Cloudera Manager console instead: Kafka > Instances > Active Controller > Log Files > Role Log File
kafka-mirror-maker Use Cloudera Manager to create any CDH Mirror Maker instance.
kafka-preferred-replica-election This tool causes leadership for each partition to be transferred back to the 'preferred replica'. It can be used to balance leadership among the servers.

It is recommended to use kafka-reassign-partitions instead of kafka-preferred-replica-election.
kafka-replay-log-producer Can be used to “rename” a topic.
kafka-replica-verification Validates that all replicas for a set of topics have the same data. This tool is a “heavy duty” version of the ISR column of kafka-topics tool.
kafka-server-start

kafka-server-stop

Use Cloudera Manager to manage any Kafka host.
kafka-simple-consumer-shell Deprecated in Apache Kafka.
kafka-streams-application-reset Kafka Streams is currently not supported.
kafka-verifiable-consumer

kafka-verifiable-producer

These scripts are intended for system testing.
zookeeper-security-migration

zookeeper-server-start

zookeeper-server-stop

Use Cloudera Manager to manage any Zookeeper host.
zookeeper-shell Limit usage of this script to reading information from Zookeeper.

Notes on Kafka CLI Administration

Here are some additional points to be aware of regarding Kafka administration:

  • Use Cloudera Manager to start and stop Kafka and Zookeeper services. Do not use the kafka-server-start, kafka-server-stop, zookeeper-server-start, or zookeeper-server-stop commands.
  • For a parcel installation, all Kafka command line tools are located in /opt/cloudera/parcels/KAFKA/lib/kafka/bin/. For a package installation, all such tools can be found in /usr/bin/.
  • Ensure that the JAVA_HOME environment variable is set to your JDK installation directory before using the command-line tools. For example:
    export JAVA_HOME=/usr/java/jdk1.8.0_144-cloudera
    
  • Using any Zookeeper command manually can be very difficult to get right when it comes to interaction with Kafka. Cloudera recommends that you avoid doing any write operations or ACL modifications in Zookeeper.

kafka-topics

Use the kafka-topics tool to generate a snapshot of topics in the Kafka cluster.

kafka-topics --zookeeper zkhost --describe
Topic: topic-a1
PartitionCount:3
ReplicationFactor:3
Configs:
Topic: topic-a1
Partition: 0
Leader: 64
Replicas: 64,62,63
Isr: 64,62,63
Topic: topic-a1
Partition: 1
Leader: 62
Replicas: 62,63,64
Isr: 62,63,64
Topic: topic-a1
Partition: 2
Leader: 63
Replicas: 63,64,62
Isr: 63,64,62
Topic: topic-a2
PartitionCount:1
ReplicationFactor:3
Configs:
Topic: topic-a2
Partition: 0
Leader: 64
Replicas: 64,62,63
Isr: 64,62,63

The output lists each topic and basic partition information. Note the following about the output:

  • Partition count: The more partitions, the higher the possible parallelism among consumers and producers.
  • Replication factor: Shows 1 for no redundancy and higher for more redundancy.
  • Replicas and in-sync replicas (ISR): Shows which broker ID’s have the partitions and which replicas are current.

There are situations where this tool shows an invalid value for the leader broker ID or the number of ISRs is fewer than the number of replicas. In those cases, there may be something wrong with those specific topics.

It is possible to change topic configuration properties using this tool. Increasing the partition count, the replication factor or both is not recommended.

kafka-configs

The kafka-configs tool allows you to set and unset properties to topics. Cloudera recommends that you use Cloudera Manager instead of this tool to change properties on brokers, because this tool bypasses any Cloudera Manager safety checks.

Setting a topic property:

kafka-configs --zookeeper zkhost --entity-type topics --entity-name topic --alter --add-config property=value

Checking a topic property:

$ kafka-configs --zookeeper zkhost --entity-type
            topics --entity-name topic --describe

Unsetting a topic property:

$ kafka-configs --zookeeper zkhost --entity-type
            topics --entity-name topic --alter --delete-config property

The Apache Kafka documentation includes a complete list of topic properties.

kafka-console-consumer

The kafka-console-consumer tool can be useful in a couple of ways:

  • Acting as an independent consumer of particular topics. This can be useful to compare results against a consumer program that you’ve written.
  • To test general topic consumption without the need to write any consumer code.

Examples of usage:

$ kafka-console-consumer --bootstrap-server <broker1>,<broker2>... --topic <topic> --from-beginning
<record-earliest-offset>
<record-earliest-offset+1>

Note the following about the tool:

  • This tool prints all records and keeps outputting as more records are written to the topic.
  • If the kafka-console-consumer tool is given no flags, it displays the full help message.
  • In older versions of Kafka, it may have been necessary to use the --new-consumer flag. As of Apache Kafka version 0.10.2, this is no longer necessary.

kafka-console-producer

This tool is used to write messages to a topic. It is typically not as useful as the console consumer, but it can be useful when the messages are in a text based format. In general, the usage will be something like:

cat file | kafka-console-producer args

kafka-consumer-groups

The basic usage of the kafka-consumer-groups tool is:

kafka-consumer-groups.sh --bootstrap-server broker1,broker2... --describe --group GROUP_ID

This tool is primarily useful for debugging consumer offset issues. The output from the tool shows the log and consumer offsets for each partition connected to the consumer group corresponding to GROUP_ID. You can see at a glance which consumers are current with their partition and which ones are behind. From there, you can determine which partitions (and likely the corresponding brokers) are slow.

Beyond this debugging usage, there are other more advanced options to this tool:

  • --execute --reset-offsets SCENARIO_OPTION: Resets the offsets for a consumer group to a particular value based on the SCENARIO_OPTION flag given.

    Valid flags for SCENARIO_OPTION are:

    • --to-datetime
    • --by-period
    • --to-earliest
    • --to-latest
    • --shift-by
    • --from-file
    • --to-current

    You will likely want to set the --topic flag to restrict this change to a specific topic or a specific set of partitions within that topic.

This tool can be used to reset all offsets on all topics. This is something you probably won’t ever want to do. It is highly recommended that you use this command carefully.

kafka-reassign-partitions

This tool allows a great deal of control over partitions in a Kafka cluster. You can do either or both of the following:

  • Change the ordering of the partition assignment list. This is usually done to control leader imbalances between brokers.
  • Move replicas from one broker to another. The most common usage of this is after adding new brokers to the cluster.

To reassign partitions:

  1. Create a list of topics you want to move.
    topics-to-move.json
    {"topics":  [{"topic": "foo1"},
                 {"topic": "foo2"}],
     "version":1
    }
  2. Use the --generate option in kafka-reassign-partitions.sh to list the distribution of partitions and replicas on your current brokers, followed by a list of suggested locations for partitions on your new broker.
    $ bin/kafka-reassign-partitions.sh --zookeeper localhost:2181
            --topics-to-move-json-file topics-to-move.json
            --broker-list "4"
            --generate
    
    Current partition replica assignment
    
    {"version":1,
     "partitions":[{"topic":"foo1","partition":2,"replicas":[1,2]},
                   {"topic":"foo1","partition":0,"replicas":[3,1]},
                   {"topic":"foo2","partition":2,"replicas":[1,2]},
                   {"topic":"foo2","partition":0,"replicas":[3,2]},
                   {"topic":"foo1","partition":1,"replicas":[2,3]},
                   {"topic":"foo2","partition":1,"replicas":[2,3]}]
    }
    
    {"version":1,
     "partitions":[{"topic":"foo1","partition":3,"replicas":[4]},
                   {"topic":"foo1","partition":1,"replicas":[4]},
                   {"topic":"foo2","partition":2,"replicas":[4]}]
    }
  3. Revise the suggested list if required, and then save it as a JSON file.
  4. Use the --execute option in kafka-reassign-partitions.sh to start the redistribution process, which can take several hours in some cases.
    $ bin/kafka-reassign-partitions.sh \
            --zookeeper localhost:2181 \
            --reassignment-json-file expand-cluster-reassignment.json
            --execute
  5. Use the --verify option in kafka-reassign-partitions.sh to check the status of your partitions.

There are several caveats to using this command:

  • It is highly recommended that you minimize the volume of replica changes. Say, instead of moving ten replicas with a single command, move two at a time in order to keep the cluster healthy.
  • It is not possible to use this command to make an out-of-sync replica into the leader partition.
  • Given the earlier restrictions, it is best to use this command only when all brokers and topics are healthy.

kafka-*-perf-test

The kafka-*-perf-test tool can be used in several ways. In general, it is expected that these tools should be used on a test or development cluster.

  • Measuring read and/or write throughput.
  • Stress testing the cluster based on specific parameters (such as message size).
  • Load testing for the purpose of evaluating specific metrics or determining the impact of cluster configuration changes.

The kafka-producer-perf-test script can either create a randomly generated byte record:

kafka-producer-perf-test --topic TOPIC --record-size SIZE_IN_BYTES

or randomly read from a set of provided records:

kafka-producer-perf-test --topic TOPIC --payload-delimiter DELIMITER --payload-file INPUT_FILE

where the INPUT_FILE is a concatenated set of pre-generated messages separated by DELIMITER. This script keeps producing messages or limited based on the --num-records flag.

The kafka-consumer-perf-test is:

kafka-consumer-perf-test --broker-list host1:port1,host2:port2,... --zookeeper zk1:port1,zk2:port2,... --topic TOPIC

The flags of most interest for this command are:

  • --group gid: If you run more than one instance of this test, you will want to set different ids for each instance.
  • --num-fetch-threads: Defaults to 1. Increase if higher throughput testing is needed.
  • --from-latest: To start consuming from the latest offset. May be needed for certain types of testing.

Enabling DEBUG or TRACE in command line scripts

In some cases, you may find it useful to produce extra debugging output from the Kafka client API. The DEBUG (shown below) or TRACE levels can be set by replacing the setting in the log4j properties file as follows:
cp /etc/kafka/conf/tools-log4j.properties /var/tmp
sed -i -e 's/WARN/DEBUG/g' /var/tmp/tools-log4j.properties

export KAFKA_OPTS="-Dlog4j.configuration=file:/var/tmp/tools-log4j.properties"

Understanding the kafka-run-class Bash Script

Almost all the provided Kafka tools eventually call the kafka-run-class script. This script is generally not called directly. However, if you are proficient with bash and want to understand certain features available in all Kafka scripts as well as some potential debugging scenarios, familiarity with the kafka-run-class script can prove highly beneficial.

For example, there are some useful environment variables that affect all the command line scripts:

  • KAFKA_DEBUG allows a Java debugger to attach to the JVM launched by the particular script. Setting KAFKA_DEBUG also allows some further debugging customization:
    • JAVA_DEBUG_PORT sets the JVM debugging port.
    • JAVA_DEBUG_OPTS can be used to override the default debugging arguments being passed to the JVM.
  • KAFKA_HEAP_OPTS can be used to pass memory setting arguments to the JVM.
  • KAFKA_JVM_PERFORMANCE_OPTS can be used to pass garbage collection flags to the JVM.