Kafka Administration

This section describes managing a Kafka cluster in production, including:

  • How to manage Kafka's sub-components
  • Using Cloudera Manager and/or the command line to administer a Kafka cluster

Kafka Administration Basics

Broker Log Management

As mentioned previously, Kafka brokers save their data as log segments in a directory. The logs are rotated depending on the size and time settings.

The most common log retention settings to adjust for your cluster are shown below. These are accessible in Cloudera Manager via the Kafka > Configuration tab.

  • log.dirs: The location for the Kafka data (that is, topic directories and log segments).
  • log.retention.{ms|minutes|hours}: The retention period for the entire log. Any older log segments are removed.
  • log.retention.bytes: The retention size for the entire log.

There are many more variables available for fine-tuning broker log management. For more detailed information look at the relevant variables in the Apache Kafka documentation topic Broker Configs.

  • log.dirs
  • log.flush.*
  • log.retention.*
  • log.roll.*
  • log.segment.*

Record Management

There are two pieces two record management, log segments and log cleaner.

As part of the general data storage, Kafka rolls logs periodically based on size and/or time limits. Once either limit is hit, a new log segment is created with the all new data being placed there, while older log segments should generally no longer change. This helps limit the risk of data loss or corruption to a single segment instead of the entire log.

  • log.roll.{ms|hours}: The time period each log segment. Once the current segment is older than this value, it goes through log segment rotation.
  • log.segment.bytes: The maximum size for a single log segment.

There is an alternative to simply removing log segments for a partition. There is another feature based on the log cleaner. When the log cleaner is enabled, then individual records in older log segments can be managed differently:

  • log.cleaner.enable: This is a global setting in Kafka to enable the log cleaner.
  • cleanup.policy: This is a per-topic property that is usually set at topic creation time. There are two valid values for this property, delete and compact.
  • log.cleaner.min.compaction.lag.ms: This is the retention period for the “head” of the log. Only records outside of this retention period will be compacted by the log cleaner.

The compact policy, also called log compaction, assumes that the "most recent Kafka record is important." Some examples include tracking a current email address or tracking a current mailing address. With log compaction, older records with the same key are removed from a log segment and the latest one kept. This effectively removes some offsets from the partition.

Broker Garbage Collection

To set the JVM Garbage Collection log path on the brokers:

  1. In Cloudera Manager, go to Kafka > Configurations.
  2. Find the property Kafka Broker Environment Advanced Configuration Snippet (Safety Valve) and add the line:
    KAFKA_GC_LOG_OPTS="-Xloggc:/var/logs/kafka/gc-broker.log"
  3. Restart the Kafka service to apply the new configuration.

To set the JVM Garbage log rotation on the brokers:

  1. In Cloudera Manager, go to Kafka > Configurations.
  2. Find the property Kafka Broker Environment Advanced Configuration Snippet (Safety Valve) and add the line (modified as appropriate):
    -XX:+UseGCLogFileRotation
    -XX:NumberOfGCLogFiles=10
    -XX:GCLogFileSize=100M
                    
  3. Restart the Kafka service to apply the new configuration.

Migrating Brokers in a Cluster

Brokers can be moved to a new host in a Kafka cluster. This might be needed in the case of catastrophic hardware failure. Make sure the following are true before starting:

  • Make sure the cluster is healthy.
  • Make sure all replicas are in sync.
  • Perform the migration when there is minimal load on the cluster.

Brokers need to be moved one-by-one. There are two techniques available:

Using kafka-reassign-partitions tool

This method involves more manual work to modify JSON, but does not require manual edits to configuration files. See details in kafka-reassign-partitions Command.

Modify the broker IDs in meta.properties

This technique involves less manual work, but requires modifying an internal configuration file.

  1. Start up the new broker as a member of the old cluster.

    This creates files in the data directory.

  2. Stop both the new broker and the old broker that it is replacing.
  3. Change broker.id of the new broker to the broker.id of the old one both in Cloudera Manager and in data directory/meta.properties.
  4. (Optional) Run rsync to copy files from one broker to another.

    See Using rsync to Copy Files from One Broker to Another.

  5. Start up the new broker.

    It re-replicates data from the other nodes.

Note that data intensive administration operations such as rebalancing partitions, adding a broker, removing a broker, or bootstrapping a new machine can cause significant additional load on the cluster.

To avoid performance degradation of business workloads you can limit the resources that these background processes can consume by specifying the -throttleparameter when running kafka-reassign-partitions.

Using rsync to Copy Files from One Broker to Another

You can run rsync command to copies over all data from an old broker to a new broker, preserving modification times and permissions. Using rsync allows you to avoid having the re-replicate the data from the leader. You do need to ensure the disk structures match between the two brokers, or you have to verify the meta.properties file between the source and destination brokers (because there is one meta.properties file for each data directory).

Run command on destination broker:

rsync -avz
        src_broker:src_data_dir
        dest_data_dir

If you plan to change the broker ID, edit dest_data_dir/meta.properties.

Quotas

For a quick video introduction to quotas, see Quotas.

In Cloudera Distribution of Kafka 2.0 and higher, Kafka can enforce quotas on produce and fetch requests. Producers and consumers can use very high volumes of data. This can monopolize broker resources, cause network saturation, and generally deny service to other clients and the brokers themselves. Quotas protect against these issues and are important for large, multi-tenant clusters where a small set of clients using high volumes of data can degrade the user experience.

Quotas are byte-rate thresholds, defined per client ID. A client ID logically identifies an application making a request. A single client ID can span multiple producer and consumer instances. The quota is applied for all instances as a single entity: For example, if a client ID has a produce quota of 10 MB/s, that quota is shared across all instances with that same ID.

When running Kafka as a service, quotas can enforce API limits. By default, each unique client ID receives a fixed quota in bytes per second, as configured by the cluster (quota.producer.default, quota.consumer.default). This quota is defined on a per-broker basis. Each client can publish or fetch a maximum of X bytes per second per broker before it gets throttled.

The broker does not return an error when a client exceeds its quota, but instead attempts to slow the client down. The broker computes the amount of delay needed to bring a client under its quota and delays the response for that amount of time. This approach keeps the quota violation transparent to clients (outside of client-side metrics). This also prevents clients from having to implement special backoff and retry behavior.

Setting Quotas

You can override the default quota for client IDs that need a higher or lower quota. The mechanism is similar to per-topic log configuration overrides. Write your client ID overrides to ZooKeeper under /config/clients. All brokers read the overrides, which are effective immediately. You can change quotas without having to do a rolling restart of the entire cluster.

By default, each client ID receives an unlimited quota. The following configuration sets the default quota per producer and consumer client ID to 10 MB/s.

quota.producer.default=10485760
quota.consumer.default=10485760

To set quotas using Cloudera Manager, open the Kafka Configuration page and search for Quota. Use the fields provided to set the Default Consumer Quota or Default Producer Quota. For more information, see Modifying Configuration Properties Using Cloudera Manager.

Setting User Limits for Kafka

Kafka opens many files at the same time. The default setting of 1024 for the maximum number of open files on most Unix-like systems is insufficient. Any significant load can result in failures and cause error messages such as java.io.IOException...(Too many open files) to be logged in the Kafka or HDFS log files. You might also notice errors such as this:
ERROR Error in acceptor (kafka.network.Acceptor)
java.io.IOException: Too many open files

Cloudera recommends setting the value to a relatively high starting point, such as 32,768.

You can monitor the number of file descriptors in use on the Kafka Broker dashboard. In Cloudera Manager:
  1. Go to the Kafka service.
  2. Select a Kafka Broker.
  3. Open Charts Library > Process Resources and scroll down to the File Descriptors chart.
SeeViewing Charts for Cluster, Service, Role, and Host Instances.

Kafka Administration using Command Line Tools

In some situations, it is convenient to use the command line tools available in Kafka to administer your cluster.

kafka-topics Command

Use the kafka-topics command to generate a snapshot of topics in the Kafka cluster.

$ kafka-topics --zookeeper zkhost --describe
Topic: topic-a1
PartitionCount:3
ReplicationFactor:3
Configs:
Topic: topic-a1
Partition: 0
Leader: 64
Replicas: 64,62,63
Isr: 64,62,63
Topic: topic-a1
Partition: 1
Leader: 62
Replicas: 62,63,64
Isr: 62,63,64
Topic: topic-a1
Partition: 2
Leader: 63
Replicas: 63,64,62
Isr: 63,64,62
Topic: topic-a2
PartitionCount:1
ReplicationFactor:3
Configs:
Topic: topic-a2
Partition: 0
Leader: 64
Replicas: 64,62,63
Isr: 64,62,63

In this clean output, you can see each topic and the basic partition information. Note the following about the output:

  • Partition count: The more partitions, the higher the possible parallelism among consumers and producers.
  • Replication factor: Shows 1 for no redundancy and higher for more redundancy.
  • Replicas and in-sync replicas (ISR): Shows which broker ID’s have the partitions and which replicas are current.

There are situations where this command shows an invalid value for the leader broker ID or the number of ISRs is fewer than the number of replicas. In those cases, there may be something wrong with those specific topics.

Note that it is possible to change topic configuration properties using this command. As mentioned earlier, increasing the partition count and/or the replication factor is not recommended.

kafka-configs Command

The kafka-configs command allows you to set and unset properties to topics. It is recommended that you use Cloudera Manager instead of this command to change properties on brokers, because this command bypasses any Cloudera Manager safety checks.

Setting a topic property:

$ kafka-configs --zookeeper zkhost --entity-type topics --entity-name topic --alter --add-config property=value

Checking a topic property:

$ kafka-configs --zookeeper zkhost --entity-type
            topics --entity-name topic --describe

Unsetting a topic property:

$ kafka-configs --zookeeper zkhost --entity-type
            topics --entity-name topic --alter --delete-config property

The Apache Kafka documentation includes a complete list of topic properties.

kafka-console Command

The kafka-console command can be useful in a couple of ways:

  • Acting as an independent consumer of particular topics. This can be useful to compare results against a consumer program that you’ve written.
  • To test general topic consumption without the need to write any consumer code.

Examples of usage:

$ kafka-console-consumer --bootstrap-server <broker1>,<broker2>... --topic <topic> --from-beginning
<record-earliest-offset>
<record-earliest-offset+1>

Note the following about the command:

  • This command prints all records and keeps outputting as more records are written to the topic.
  • If the kafka-console-consumer command is given no flags, it displays the full help message.
  • In older versions of Kafka, it may have been necessary to use the --new-consumer flag. As of Apache Kafka version 0.10.2, this is no longer necessary.

kafka-console-producer Command

This command is used to write messages to a topic. It is typically not as useful as the console consumer, but it can be useful when the messages are in a text based format. In general, the usage will be something like:

$ cat file | kafka-console-producer args

kafka-consumer-groups Command

The basic usage of the kafka-consumer-groups command is:

$ kafka-consumer-groups.sh --bootstrap-server broker1,broker2... --describe --group GROUP_ID

This command is primarily useful for debugging consumer offset issues. The output from the command shows the log and consumer offsets for each partition connected to the consumer group corresponding to GROUP_ID. You can see at a glance which consumers are current with their partition and which ones are behind. From there, you can determine which partitions (and likely the corresponding brokers) are slow.

Beyond this debugging usage, there are other more advanced options to this command:

  • --execute --reset-offsets SCENARIO_OPTION: Resets the offsets for a consumer group to a particular value based on the SCENARIO_OPTION flag given.

    Valid flags for SCENARIO_OPTION are:

    • --to-datetime
    • --by-period
    • --to-earliest
    • --to-latest
    • --shift-by
    • --from-file
    • --to-current

    You will likely want to set the --topic flag to restrict this change to a specific topic or a specific set of partitions within that topic.

This command can be used to reset all offsets on all topics; this is something you probably won’t ever want to do. It is highly recommended that you use this command carefully.

kafka-reassign-partitions Command

This script allows a great deal of control over partitions in a Kafka cluster. You can do either or both of the following:

  • Change the ordering of the partition assignment list. This is usually done to control leader imbalances between brokers.
  • Move replicas from one broker to another. The most common usage of this is after adding new brokers to the cluster.

To reassign partitions:

  1. Create a list of topics you want to move.
    topics-to-move.json
    {"topics":  [{"topic": "foo1"},
                 {"topic": "foo2"}],
     "version":1
    }
  2. Use the --generate option in kafka-reassign-partitions.sh to list the distribution of partitions and replicas on your current brokers, followed by a list of suggested locations for partitions on your new broker.
    $ bin/kafka-reassign-partitions.sh --zookeeper localhost:2181
            --topics-to-move-json-file topics-to-move.json
            --broker-list "4"
            --generate
    
    Current partition replica assignment
    
    {"version":1,
     "partitions":[{"topic":"foo1","partition":2,"replicas":[1,2]},
                   {"topic":"foo1","partition":0,"replicas":[3,1]},
                   {"topic":"foo2","partition":2,"replicas":[1,2]},
                   {"topic":"foo2","partition":0,"replicas":[3,2]},
                   {"topic":"foo1","partition":1,"replicas":[2,3]},
                   {"topic":"foo2","partition":1,"replicas":[2,3]}]
    }
    
    {"version":1,
     "partitions":[{"topic":"foo1","partition":3,"replicas":[4]},
                   {"topic":"foo1","partition":1,"replicas":[4]},
                   {"topic":"foo2","partition":2,"replicas":[4]}]
    }
  3. Revise the suggested list if required, and then save it as a JSON file.
  4. Use the --execute option in kafka-reassign-partitions.sh to start the redistribution process, which can take several hours in some cases.
    $ bin/kafka-reassign-partitions.sh \
            --zookeeper localhost:2181 \
            --reassignment-json-file expand-cluster-reassignment.json
            --execute
  5. Use the --verify option in kafka-reassign-partitions.sh to check the status of your partitions.

There are several caveats to using this command:

  • It is highly recommended that you minimize the volume of replica changes. Say, instead of moving ten replicas with a single command, move two at a time in order to keep the cluster healthy.
  • It is not possible to use this command to make an out-of-sync replica into the leader partition.
  • Given the earlier restrictions, it is best to use this command only when all brokers and topics are healthy.
Although reassigning partitions is labor-intensive, you should anticipate system growth and redistribute the load when your system is at 70% capacity. If you wait until you are forced to redistribute because you have reached the limit of your resources, the redistribution process can be extremely slow.

kafka-*-perf-test Scripts

The kafka-*-perf-test scripts can be used in several ways. In general, it is expected that these tools should be used on a test or development cluster.

  • Measuring read and/or write throughput.
  • Stress testing the cluster based on specific parameters (such as message size).
  • Load testing for the purpose of evaluating specific metrics or determining the impact of cluster configuration changes.

The kafka-producer-perf-test script can either create a randomly generated byte record:

$ kafka-producer-perf-test --topic TOPIC --record-size SIZE_IN_BYTES

or randomly read from a set of provided records:

$ kafka-producer-perf-test --topic TOPIC --payload-delimiter DELIMITER --payload-file INPUT_FILE

where the INPUT_FILE is a concatenated set of pre-generated messages separated by DELIMITER. This script keeps producing messages or limited based on the --num-records flag.

The kafka-consumer-perf-test is:

$ kafka-consumer-perf-test --broker-list host1:port1,host2:port2,... --zookeeper zk1:port1,zk2:port2,... --topic TOPIC

The flags of most interest for this command are:

  • --group gid: If you run more than one instance of this test, you will want to set different ids for each instance.
  • --num-fetch-threads: Defaults to 1. Increase if higher throughput testing is needed.
  • --from-latest: To start consuming from the latest offset. May be needed for certain types of testing.

Enabling DEBUG or TRACE in command line scripts

In some cases, you may find it useful to produce extra debugging output from the Kafka client API. The DEBUG (shown below) or TRACE levels can be set by replacing the setting in the log4j properties file as follows:
cp /etc/kafka/conf/tools-log4j.properties /var/tmp
sed -i -e 's/WARN/DEBUG/g' /var/tmp/tools-log4j.properties

export KAFKA_OPTS="-Dlog4j.configuration=file:/var/tmp/tools-log4j.properties"

Unsupported Command Line Tools

Beyond the scripts described earlier, the following scripts can be found as part of the Kafka distribution, but their use is generally discouraged for various reasons as documented here.

Command or Script Notes
connect-distributed

connect-standalone

Kafka Connect is not currently supported.
kafka-acls Recommend to use Sentry to manage ACLs instead of this script.
kafka-broker-api-versions Primarily useful for Client-to-Broker protocol related development.
kafka-configs Use Cloudera Manager to adjust any broker or security properties instead of the kafka‑configs command. This command should only be used to modify topic properties.
kafka-delete-records Do not use with CDH.
kafka-log-dirs For querying log directory message. You can use the Cloudera Manager console instead: Kafka > Instances > Active Controller > Log Files > Role Log File
kafka-mirror-maker Use Cloudera Manager to create any CDH MIrror Maker instance.
kafka-preferred-replica-election This tool causes leadership for each partition to be transferred back to the 'preferred replica'; it can be used to balance leadership among the servers.

It is recommended to use kafka-reassign-partitions instead of kafka-preferred-replica-election.
kafka-replay-log-producer Can be used to “rename” a topic. Generally not more useful than that.
kafka-replica-verification Validates that all replicas for a set of topics have the same data. This command is a “heavy duty” version of the ISR column of kafka-topics command.
kafka-server-start

kafka-server-stop

Use Cloudera Manager to manage any Kafka host.
kafka-simple-consumer-shell Depreciated in Apache Kafka.
kafka-streams-application-reset Kafka Streams is currently not supported.
kafka-verifiable-consumer

kafka-verifiable-producer

These scripts are intended for system testing.
zookeeper-security-migration

zookeeper-server-start

zookeeper-server-stop

Use Cloudera Manager to manager any Zookeeper host.
zookeeper-shell Limit usage of this script to reading information from Zookeeper.

Understanding the kafka-run-class Bash Script

Almost all the provided Kafka scripts eventually call the kafka-run-class script. This script generally won’t be called directly. However, if you are proficient with bash, it is good to know about this script to understand some of the features available in all scripts as well as some potential debugging situations.

For example, there are some useful environment variables that affect all the command line scripts:

  • KAFKA_DEBUG allows a Java debugger to attach to the JVM launched by the particular script. Setting KAFKA_DEBUG also allows some further debugging customization:
    • JAVA_DEBUG_PORT sets the JVM debugging port.
    • JAVA_DEBUG_OPTS can be used to override the default debugging arguments being passed to the JVM.
  • KAFKA_HEAP_OPTS can be used to pass memory setting arguments to the JVM.
  • KAFKA_JVM_PERFORMANCE_OPTS can be used to pass garbage collection flags to the JVM.

Notes on Kafka CLI Administration

Here are some extra things to be aware of about Kafka administration:

  • Use Cloudera Manager to start and stop Kafka and Zookeeper services. Do not use the kafka-server-start, kafka-server-stop, zookeeper-server-start, or zookeeper-server-stop commands.
  • For a parcel installation, all Kafka command-line tools are located in /opt/cloudera/parcels/KAFKA/lib/kafka/bin/. For a package installation, all such commands can be found in /usr/bin/.
  • Ensure that the JAVA_HOME environment variable is set to your JDK installation directory before using the command-line tools. For example:
    export JAVA_HOME=/usr/java/jdk1.8.0_144-cloudera
    
  • Using any Zookeeper commands manually can be very tricky to get right when it comes to interaction with Kafka. It is highly recommended that you avoid doing any write operations or ACL modifications in Zookeeper.

Adding Users as Kafka Administrators

In some cases, additional users besides the kafka account need administrator access. This can be done in Cloudera Manager by going to Kafka > Configuration > Super users.