Setting Up Apache Kafka Using the Command Line

Minimum Required Role: Cluster Administrator (also provided by Full Administrator)

Kafka is distributed as a parcel, separate from the CDH parcel. It is also distributed as a package. The steps to install Kafka vary, depending on whether you choose to install from a parcel or a package.

General Information Regarding Installation and Upgrade

Cloudera Manager 5.4 and higher includes the Kafka service. To install, download Kafka using Cloudera Manager, distribute Kafka to the cluster, activate the new parcel, and add the service to the cluster. For a list of available parcels and packages, see CDK Powered by Apache Kafka Version and Packaging Information

Cloudera recommends that you deploy Kafka on dedicated hosts that are not used for other cluster roles.

Rolling Upgrade to Kafka 3.0.x

Before upgrading from Kafka 2.x.x to 3.0.x, ensure that you set inter.broker.protocol.version and log.message.format.version to the current Kafka version, and then unset them after the upgrade. This is a good practice because the newer broker versions might write log entries that the older brokers will not be able to read. And if you need to rollback to the older version, and you have not set inter.broker.protocol.version and log.message.format.version, data loss might occur.

Based on the current version of Kafka, use the following three-digit values to set inter.broker.protocol.version and log.message.format.version:
  • To upgrade from CDK 2.0.x, use 0.9.0
  • To upgrade from CDK 2.1.x, use 0.10.0
  • To upgrade from CDK 2.2.x, use 0.10.2
From the Cloudera Manager Admin Console:
  1. Upgrade Kafka brokers to 3.0.x.
    1. Update server.properties file on all brokers with the following properties: inter.broker.protocol.version = <current_Kafka_version> and log.message.format.version = <current_Kafka_version>, as follows:
    2. From the Clusters menu, select the Kafka cluster.
    3. Click the Configuration tab.
    4. Use the Search field to find the Kafka Broker Advanced Configuration Snippet (Safety Valve) configuration property.
    5. Add the following properties to the Kafka Broker Advanced Configuration Snippet (Safety Valve) for kafka.properties:

      To upgrade from Kafka 2.0.x to Kafka 3.0.x, enter:

      inter.broker.protocol.version=0.9.0
      log.message.format.version=0.9.0

      To upgrade from Kafka 2.1.x to Kafka 3.0.x, enter:

      inter.broker.protocol.version=0.10.0
      log.message.format.version=0.10.0

      To upgrade from Kafka 2.2.x to Kafka 3.0.x, enter:

      inter.broker.protocol.version=0.10.2
      log.message.format.version=0.10.2
      Make sure you enter 3 digits for the version. Otherwise, the following error will occur:
      2017-12-14 14:25:47,818 FATAL kafka.Kafka$:
      java.lang.IllegalArgumentException: Version `0.10` is not a valid version
              at kafka.api.ApiVersion$$anonfun$apply$1.apply(ApiVersion.scala:72)
              at kafka.api.ApiVersion$$anonfun$apply$1.apply(ApiVersion.scala:72)
              at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
      
    6. Save your changes.
  2. Download, distribute, and activate the new parcel. Do not restart the Kafka service, select Activate Only and click OK.
  3. Perform a rolling restart. Select Rolling Restart or Restart based on the downtime that can be afforded.
  4. Upgrade all Kafka 2.x.x clients to Kafka 3.0.x.
  5. After the whole cluster restart is successful, remove the above settings and restart the cluster again.

Graceful Shutdown of Kafka Brokers

If the Kafka brokers do not shut down gracefully, subsequent restarts may take longer than expected. This can happen when the brokers take longer than 30 seconds to clear their backlog while stopping the Kafka service, stopping the Kafka Broker role, or stopping a cluster where the Kafka service is running. The Kafka brokers are also shut down as part of performing an upgrade. There are two configuration properties you can set to control whether Cloudera Manager waits for the brokers to shut down gracefully:
Kafka Shutdown Properties
Property Description Default Value
Enable Controlled Shutdown Enables controlled shutdown of the broker. If enabled, the broker moves all leaders on it to other brokers before shutting itself down. This reduces the unavailability window during shutdown. Enabled
Graceful Shutdown Timeout The timeout in milliseconds to wait for graceful shutdown to complete. 30000 milliseconds

(30 seconds)

To configure these properties, go to Clusters > Kafka Service > Configuration and search for "shutdown".

If Kafka is taking a long time for controlled shutdown to complete, consider increasing the value of Graceful Shutdown Timeout. Once this timeout is reached, Cloudera Manager issues a forced shutdown, which interrupts the controlled shutdown and could cause subsequent restarts to take longer than expected.

Disks and Filesystem

Cloudera recommends that you use multiple drives to get good throughput. To ensure good latency, do not share the same drives used for Kafka data with application logs or other OS filesystem activity. You can either use RAID to combine these drives into a single volume, or format and mount each drive as its own directory. Since Kafka has replication, RAID can also provide redundancy at the application level. This choice has several tradeoffs.

If you configure multiple data directories, partitions are assigned round-robin to data directories. Each partition is stored entirely in one of the data directories. This can lead to load imbalance between disks if data is not well balanced among partitions.

RAID can potentially do a better job of balancing load between disks because it balances load at a lower level. The primary downside of RAID is that it is usually a big performance hit for write throughput, and it reduces the available disk space.

Another potential benefit of RAID is the ability to tolerate disk failures. However, rebuilding the RAID array is so I/O intensive that it can effectively disable the server, so this does not provide much improvement in availability.

The following table summarizes these pros and cons for RAID10 versus JBOD.

RAID10 JBOD
Can survive single disk failure Single disk failure kills the broker
Single log directory More available disk space
Lower total I/O Higher write throughput
  Broker is not smart about balancing partitions across disk.

Installing or Upgrading Kafka from a Parcel

Minimum Required Role: Cluster Administrator (also provided by Full Administrator)

  1. In Cloudera Manager, select Hosts > Parcels.
  2. If you do not see Kafka in the list of parcels, you can add the parcel to the list.
    1. Find the parcel for the version of Kafka you want to use on CDK Powered by Apache Kafka Versions.
    2. Copy the parcel repository link.
    3. On the Cloudera Manager Parcels page, click Configuration.
    4. In the field Remote Parcel Repository URLs, click + next to an existing parcel URL to add a new field.
    5. Paste the parcel repository link.
    6. Save your changes.
  3. On the Cloudera Manager Parcels page, download the Kafka parcel, distribute the parcel to the hosts in your cluster, and then activate the parcel. See Managing Parcels. After you activate the Kafka parcel, Cloudera Manager prompts you to restart the cluster. You do not need to restart the cluster after installing Kafka. Click Close to ignore this prompt.
  4. Add the Kafka service to your cluster. See Adding a Service.

Installing or Upgrading Kafka from a Package

Minimum Required Role: Cluster Administrator (also provided by Full Administrator)

You install the Kafka package from the command line.

  1. Navigate to the /etc/repos.d directory.
  2. Use wget to download the Kafka repository. See CDK Powered by Apache Kafka Version and Packaging Information.
  3. Install Kafka using the appropriate commands for your operating system.
    Kafka Installation Commands
    Operating System Commands
    RHEL-compatible
    $ sudo yum clean all
    $ sudo yum install kafka
    $ sudo yum install kafka-server
                                        
    SLES
    $ sudo zypper clean --all
    $ sudo zypper install kafka
    $ sudo zypper install kafka-server
                                        
    Ubuntu or Debian
    $ sudo apt-get update
    $ sudo apt-get install kafka
    $ sudo apt-get install kafka-server
                                
  4. Edit /etc/kafka/conf/server.properties to ensure that the broker.id is unique for each node and broker in Kafka cluster, and zookeeper.connect points to same ZooKeeper for all nodes and brokers.
  5. Start the Kafka server with the following command:

    $ sudo service kafka-server start.

To verify all nodes are correctly registered to the same ZooKeeper, connect to ZooKeeper using zookeeper-client.

$ zookeeper-client
$ ls /brokers/ids

You should see all of the IDs for the brokers you have registered in your Kafka cluster.

To discover to which node a particular ID is assigned, use the following command:

$ get /brokers/ids/<ID>

This command returns the host name of node assigned the ID you specify.