CDH 6 includes Apache Kafka as part of the core package. The documentation includes improved contents for how to set up, install, and administer your Kafka ecosystem. For more information, see the Cloudera Enterprise 6.0.x Apache Kafka Guide. We look forward to your feedback on both the existing and new documentation.

Using Apache Kafka Command-line Tools

kafka-topics

The kafka-topics tool can be used to create, alter, list, and describe topics. For example:
$ kafka-topics --zookeeper zk01.example.com:2181 --list
sink1
t1
t2
$ kafka-topics --create --zookeeper hostname:2181/kafka --replication-factor 2 
  --partitions 4 --topic topicname 
                        

kafka-console-consumer

The kafka-console-consumer tool can be used to read data from a Kafka topic and write it to standard output. For example:
$ kafka-console-consumer --zookeeper zk01.example.com:2181 --topic t1

kafka-console-producer

The kafka-console-producer tool can be used to read data from standard output and write it to a Kafka topic. For example:
$ kafka-console-producer --broker-list kafka02.example.com:9092,kafka03.example.com:9092 --topic t1

kafka-consumer-groups

The kafka-consumer-groups tool can be used to list all consumer groups, describe a consumer group, delete consumer group info, or reset consumer group offsets.

For example:
$ kafka-consumer-groups --zookeeper zk01.example.com:2181 --describe --group flume

GROUP   TOPIC  PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG     OWNER
flume   t1     0          1               3               2       test-consumer-group_postamac.local-1456198719410-29ccd54f-0

The output from the tool shows the log and consumer offsets for each partition connected to the consumer group that is being described. You can see at a glance which consumers are current with their partition and which ones are behind. From there, you can determine which partitions (and likely the corresponding brokers) are slow.

kafka-reassign-partitions

This tool provides substantial control over partitions in a Kafka cluster. It is mainly used to balance storage loads across brokers through the following reassignment actions:

  • Change the ordering of the partition assignment list. Used to control leader imbalances between brokers.
  • Reassign partitions from one broker to another. Used to expand existing clusters.
  • Reassign partitions between log directories on the same broker. Used to resolve storage load imbalance among available disks in the broker.
  • Reassign partitions between log directories across multiple brokers. Used to resolve storage load imbalance across multiple brokers.
The tool uses two JSON files for input. Both of these are created by the user. The two files are the following:
Topics-to-Move JSON
This JSON file specifies the topics that you want to reassign. This a simple file that tells the kafka-reassign-partitions tool which partitions it should look at when generating a proposal for the reassignment configuration. The user has to create the topics-to-move JSON file from scratch.
The format of the file is the following:
{"topics":  [{"topic": "mytopic1"},
             {"topic": "mytopic2"}],
 "version":1
}
Reassignment Configuration JSON
This JSON file is a configuration file that contains the parameters used in the reassignment process. This file is created by the user, however, a proposal for its contents is generated by the tool. When the kafka-reasssign-partitions tool is executed with the --generate option, it generates a proposed configuration which can be fine-tuned and saved as a JSON file. The file created this way is the reassignment configuration JSON. To generate a proposal, the tool requires a topics-to-move file as input.
The format of the file is the following:
{"version":1,
 "partitions":
   [{"topic":"mytopic1","partition":3,"replicas":[4,5],"log_dirs":["any","any"]},
    {"topic":"mytopic1","partition":1,"replicas":[5,4],"log_dirs":["any","any"]},
    {"topic":"mytopic2","partition":2,"replicas":[6,5],"log_dirs":["any","any"]}]
}

The reassignment configuration contains multiple properties that each control and specify an aspect of the configuration. The Reassignment Configuration Properties table lists each property and its description.

Reassignment Configuration Properties
Property Description
topic Specifies the topic.
partition Specifies the partition.
replicas Specifies the brokers that the selected partition is assigned to. The brokers are listed in order, which means that the first broker in the list is always the leader for that partition. Change the order of brokers to resolve any leader balancing issues among brokers. Change the broker IDs to reassign partitions to different brokers.
log_dirs Specifies the log directory of the brokers. The log directories are listed in the same order as the brokers. By default any is specified as the log directory, which means that the broker is free to choose where it places the replica. By default, the current broker implementation selects the log directory using a round-robin algorithm. An absolute path beginning with a / can be used to explicitly set where to store the partition replica.
Notes and Recommendations:
  • Cloudera recommends that you minimize the volume of replica changes per command instance. Instead of moving 10 replicas with a single command, move two at a time in order to save cluster resources.
  • This tool cannot be used to make an out-of-sync replica into the leader partition.
  • Use this tool only when all brokers and topics are healthy.
  • Anticipate system growth. Redistribute the load when the system is at 70% capacity. Waiting until redistribution becomes necessary due to reaching resource limits can make the redistribution process extremely time consuming.

Tool Usage

To reassign partitions, complete the following steps:

  1. Create a topics-to-move JSON file that specifies the topics you want to reassign. Use the following format:
    {"topics":  [{"topic": "mytopic1"},
                 {"topic": "mytopic2"}],
     "version":1
    }
  2. Generate the content for the reassignment configuration JSON with the following command:
    kafka-reassign-partitions --zookeeper hostname:port --topics-to-move-json-file topics to move.json --broker-list broker 1, broker 2 --generate

    Running the command lists the distribution of partition replicas on your current brokers followed by a proposed partition reassignment configuration.

    Example output:

    Current partition replica assignment
    {"version":1,
     "partitions":
       [{"topic":"mytopic2","partition":1,"replicas":[2,3],"log_dirs":["any","any"]},
        {"topic":"mytopic1","partition":0,"replicas":[1,2],"log_dirs":["any","any"]},
        {"topic":"mytopic2","partition":0,"replicas":[1,2],"log_dirs":["any","any"]},
        {"topic":"mytopic1","partition":2,"replicas":[3,1],"log_dirs":["any","any"]},
        {"topic":"mytopic1","partition":1,"replicas":[2,3],"log_dirs":["any","any"]}]
    }
    
    Proposed partition reassignment configuration
    
    {"version":1,
     "partitions":
       [{"topic":"mytopic1","partition":0,"replicas":[4,5],"log_dirs":["any","any"]},
        {"topic":"mytopic1","partition":2,"replicas":[4,5],"log_dirs":["any","any"]},
        {"topic":"mytopic2","partition":1,"replicas":[4,5],"log_dirs":["any","any"]},
        {"topic":"mytopic1","partition":1,"replicas":[5,4],"log_dirs":["any","any"]},
        {"topic":"mytopic2","partition":0,"replicas":[5,4],"log_dirs":["any","any"]}]
    }

    In this example, the tool proposed a configuration which reassigns existing partitions on broker 1, 2, and 3 to brokers 4 and 5.

  3. Copy and paste the proposed partition reassignment configuration into an empty JSON file.
  4. Review, and if required, modify the suggested reassignment configuration.
  5. Save the file.
  6. Start the redistribution process with the following command:
    kafka-reassign-partitions --zookeeper hostname:port  --reassignment-json-file reassignment configuration.json --bootstrap-server hostname:port --execute

    The tool prints a list containing the original replica assignment and a message that reassignment has started. Example output:

    Current partition replica assignment
    
    {"version":1,
     "partitions":
       [{"topic":"mytopic2","partition":1,"replicas":[2,3],"log_dirs":["any","any"]},
        {"topic":"mytopic1","partition":0,"replicas":[1,2],"log_dirs":["any","any"]},
        {"topic":"mytopic2","partition":0,"replicas":[1,2],"log_dirs":["any","any"]},
        {"topic":"mytopic1","partition":2,"replicas":[3,1],"log_dirs":["any","any"]},
        {"topic":"mytopic1","partition":1,"replicas":[2,3],"log_dirs":["any","any"]}]
    }
    
    Save this to use as the --reassignment-json-file option during rollback
    Successfully started reassignment of partitions.
  7. Verify the status of the reassignment with the following command:
    kafka-reassign-partitions --zookeeper hostname:port --reassignment-json-file reassignment configuration.json  --bootstrap-server hostname:port --verify
    The tool prints the reassignment status of all partitions. Example output:
    Status of partition reassignment: 
    Reassignment of partition mytopic2-1 completed successfully
    Reassignment of partition mytopic1-0 completed successfully
    Reassignment of partition mytopic2-0 completed successfully
    Reassignment of partition mytopic1-2 completed successfully
    Reassignment of partition mytopic1-1 completed successfully

Examples

There are multiple ways to modify the configuration file. The following list of examples shows how a user can modify a proposed configuration and what these changes do. Changes to the original example are marked in bold.

Suppose that the kafka-reassign-partitions tool generated the following proposed reassignment configuration:
{"version":1,
 "partitions":
   [{"topic":"mytopic1","partition":0,"replicas":[1,2],"log_dirs":["any","any"]}]}
Reassign partitions between brokers
To reassign partitions from one broker to another, change the broker ID specified in replicas. For example:
{"topic":"mytopic1","partition":0,"replicas":[5,2],"log_dirs":["any","any"]}

This reassignment configuration moves partition mytopic1-0 from broker 1 to broker 5.

Reassign partitions to another log directory on the same broker
To reassign partitions between log directories on the same broker, change the appropriate any entry to an absolute path. For example:
{"topic":"mytopic1","partition":0,"replicas":[1,2],"log_dirs":["/log/directory1","any"]}

This reassignment configuration moves partition mytopic1-0 to the /log/directory1 log directory.

Reassign partitions between log directories across multiple brokers
To reassign partitions between log directories across multiple brokers, change the broker ID specified in replicas and the appropriate any entry to an absolute path. For example:
{"topic":"mytopic1","partition":0,"replicas":[5,2],"log_dirs":["/log/directory1","any"]}

This reassignment configuration moves partition mytopic1-0 to /log/directory1 on broker 5.

Change partition assignment order (elect a new leader)
To change the ordering of the partition assignment list, change the order of the brokers in replicas. For example:
{"topic":"mytopic1","partition":0,"replicas":[2,1],"log_dirs":["any","any"]}

This reassignment configuration elects broker 2 as the new leader.

kafka-log-dirs

The kafka-log-dirs tool allows user to query a list of replicas per log directory on a broker. The tool provides information that is required for optimizing replica assignment across brokers.

On successful execution, the tool prints a list of partitions per log directory for the specified topics and brokers. The list contains information on topic partition, size, offset lag, and reassignment state. Example output:
{
    "brokers": [
        {
            "broker": 86,
            "logDirs": [
                {
                    "error": null,
                    "logDir": "/var/local/kafka/data",
                    "partitions": [
                        {
                            "isFuture": false,
                            "offsetLag": 0,
                            "partition": "mytopic1-2",
                            "size": 0
                        }
                    ]
                }
            ]
        },
        ...
    ],
    "version": 1
}

The Contents of the kafka-log-dirs Output table gives an overview of the information provided by the kafka-log-dirs tool.

Contents of the kafka-log-dirs Output
Property Description
broker Displays the ID of the broker.
error Indicates if there is a problem with the disk that hosts the topic partition. If an error is detected, org.apache.kafka.common.errors.KafkaStorageException is displayed. If no error is detected, the value is null.
logDir Specifies the location of the log directory. Returns an absolute path.
isfuture The reassignment state of the partition. This property shows whether there is currently replica movement underway between the log directories.
offsetLag Displays the offset lag of the partition.
partition Displays the name of the partition.
size Displays the size of the partition in bytes.

Tool Usage

To retrieve replica assignment information, run the following command:
kafka-log-dirs --describe --bootstrap-server hostname:port --broker-list broker 1, broker 2 --topic-list topic 1, topic 2
If no topic is specified with the --topic-list option, then all topics are queried. If no broker is specified with the --broker-list option, then all brokers are queried. If a log directory is offline, the log directory will be marked offline in the script output. Error example:
"error":"org.apache.kafka.common.errors.KafkaStorageException"