CDH 6 includes Apache Kafka as part of the core package. The documentation includes improved contents for how to set up, install, and administer your Kafka ecosystem. For more information, see the Cloudera Enterprise 6.0.x Apache Kafka Guide. We look forward to your feedback on both the existing and new documentation.

Configuring Apache Kafka for Performance and Resource Management

Apache Kafka is optimized for small messages. According to benchmarks, the best performance occurs with 1 KB messages. Larger messages (for example, 10 MB to 100 MB) can decrease throughput and significantly impact operations.

Partitions and Memory Usage

For a quick video introduction to load balancing, see tl;dr: Balancing Apache Kafka Clusters.

Brokers allocate a buffer the size of replica.fetch.max.bytes for each partition they replicate. If replica.fetch.max.bytes is set to 1 MiB, and you have 1000 partitions, about 1 GiB of RAM is required. Ensure that the number of partitions multiplied by the size of the largest message does not exceed available memory.

The same consideration applies for the consumer fetch.message.max.bytes setting. Ensure that you have enough memory for the largest message for each partition the consumer replicates. With larger messages, you might need to use fewer partitions or provide more RAM.

Partition Reassignment

At some point you will likely exceed configured resources on your system. If you add a Kafka broker to your cluster to handle increased demand, new partitions are allocated to it (the same as any other broker), but it does not automatically share the load of existing partitions on other brokers. To redistribute the existing load among brokers, you must manually reassign partitions. You can do so using bin/kafka-reassign-partitions.sh script utilities.

To reassign partitions:
  1. Create a list of topics you want to move.
    topics-to-move.json
    {"topics": [{"topic": "foo1"},
                {"topic": "foo2"}],
     "version":1
    }
  2. Use the --generate option in kafka-reassign-partitions.sh to list the distribution of partitions and replicas on your current brokers, followed by a list of suggested locations for partitions on your new broker.
    bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 
        --topics-to-move-json-file topics-to-move.json 
        --broker-list "4" 
        --generate
    
    Current partition replica assignment
    
    {"version":1,
     "partitions":[{"topic":"foo1","partition":2,"replicas":[1,2]},
                   {"topic":"foo1","partition":0,"replicas":[3,1]},
                   {"topic":"foo2","partition":2,"replicas":[1,2]},
                   {"topic":"foo2","partition":0,"replicas":[3,2]},
                   {"topic":"foo1","partition":1,"replicas":[2,3]},
                   {"topic":"foo2","partition":1,"replicas":[2,3]}]
    }
    
    {"version":1,
     "partitions":[{"topic":"foo1","partition":3,"replicas":[4]},
                   {"topic":"foo1","partition":1,"replicas":[4]},
                   {"topic":"foo2","partition":2,"replicas":[4]}]
    }
    
  3. Revise the suggested list if required, and then save it as a JSON file.
  4. Use the --execute option in kafka-reassign-partitions.sh to start the redistirbution process, which can take several hours in some cases.
    > bin/kafka-reassign-partitions.sh \
        --zookeeper localhost:2181 \
        --reassignment-json-file expand-cluster-reassignment.json 
        --execute
  5. Use the --verify option in kafka-reassign-partitions.sh to check the status of your partitions.

Although reassigning partitions is labor-intensive, you should anticipate system growth and redistribute the load when your system is at 70% capacity. If you wait until you are forced to redistribute because you have reached the limit of your resources, the redistribution process can be extremely slow.

Garbage Collection

Large messages can cause longer garbage collection (GC) pauses as brokers allocate large chunks. Monitor the GC log and the server log. If long GC pauses cause Kafka to abandon the ZooKeeper session, you may need to configure longer timeout values for zookeeper.session.timeout.ms.

Handling Large Messages

Before configuring Kafka to handle large messages, first consider the following options to reduce message size:

  • The Kafka producer can compress messages. For example, if the original message is a text-based format (such as XML), in most cases the compressed message will be sufficiently small.

    Use the compression.codec and compressed.topics producer configuration parameters to enable compression. Gzip and Snappy are supported.

  • If shared storage (such as NAS, HDFS, or S3) is available, consider placing large files on the shared storage and using Kafka to send a message with the file location. In many cases, this can be much faster than using Kafka to send the large file itself.
  • Split large messages into 1 KB segments with the producing client, using partition keys to ensure that all segments are sent to the same Kafka partition in the correct order. The consuming client can then reconstruct the original large message.

If you still need to send large messages with Kafka, modify the configuration parameters presented in the following sections to match your requirements.

Broker Configuration

Broker Configuration Properties
Property Default Value Description
message.max.bytes 1000000

(1 MB)

Maximum message size the broker accepts. When using the old consumer, this property must be lower than the consumer fetch.message.max.bytes, or the consumer cannot consume the message.
log.segment.bytes 1073741824

(1 GiB)

Size of a Kafka data file. Must be larger than any single message.
replica.fetch.max.bytes 1048576

(1 MiB)

Maximum message size a broker can replicate. Must be larger than message.max.bytes, or a broker can accept messages it cannot replicate, potentially resulting in data loss.

Consumer Configuration

Kafka offers two separate consumer implementations, the old consumer and the new consumer. The old consumer is the Consumer class written in Scala. The new consumer is the KafkaConsumer class written in Java. When configuring Kafka to handle large messages, different properties have to be configured for each consumer implementation.
Old Consumer
Old Consumer Configuration Properties
Property Default Value Description
fetch.message.max.bytes 52428800

(50 MiB)

The maximum amount of data the server should return for a fetch request. This is a hard limit. If a message batch is larger than this limit, the consumer will not be able to consume the message or any subsequent messages in a given partition.
New Consumer
New Consumer Configuration Properties
Property Default Value Description
max.partition.fetch.bytes 1048576

(10 MiB)

The maximum amount of data per-partition the server will return.
fetch.max.bytes 52428800

(50 MiB)

The maximum amount of data the server should return for a fetch request.

Tuning Kafka for Optimal Performance

For a quick video introduction to tuning Kafka, see tl;dr: Tuning Your Apache Kafka Cluster.

Performance tuning involves two important metrics: Latencymeasures how long it takes to process one event, and throughput measures how many events arrive within a specific amount of time. Most systems are optimized for either latency or throughput. Kafka is balanced for both. A well tuned Kafka system has just enough brokers to handle topic throughput, given the latency required to process information as it is received.

Tuning your producers, brokers, and consumers to send, process, and receive the largest possible batches within a manageable amount of time results in the best balance of latency and throughput for your Kafka cluster.

Tuning Kafka Producers

Kafka uses an asynchronous publish/subscribe model. When your producer calls the send() command, the result returned is a future. The future provides methods to let you check the status of the information in process. When the batch is ready, the producer sends it to the broker. The Kafka broker waits for an event, receives the result, and then responds that the transaction is complete.

If you do not use a future, you could get just one record, wait for the result, and then send a response. Latency is very low, but so is throughput. If each transaction takes 5 ms, throughput is 200 events per second.—slower than the expected 100,000 events per second.

When you use Producer.send(), you fill up buffers on the producer. When a buffer is full, the producer sends the buffer to the Kafka broker and begins to refill the buffer.

Two parameters are particularly important for latency and throughput: batch size and linger time.

Batch Size

batch.size measures batch size in total bytes instead of the number of messages. It controls how many bytes of data to collect before sending messages to the Kafka broker. Set this as high as possible, without exceeding available memory. The default value is 16384.

If you increase the size of your buffer, it might never get full. The Producer sends the information eventually, based on other triggers, such as linger time in milliseconds. Although you can impair memory usage by setting the buffer batch size too high, this does not impact latency.

If your producer is sending all the time, you are probably getting the best throughput possible. If the producer is often idle, you might not be writing enough data to warrant the current allocation of resources.

Linger Time

linger.ms sets the maximum time to buffer data in asynchronous mode. For example, a setting of 100 batches 100ms of messages to send at once. This improves throughput, but the buffering adds message delivery latency.

By default, the producer does not wait. It sends the buffer any time data is available.

Instead of sending immediately, you can set linger.ms to 5 and send more messages in one batch. This would reduce the number of requests sent, but would add up to 5 milliseconds of latency to records sent, even if the load on the system does not warrant the delay.

The farther away the broker is from the producer, the more overhead required to send messages. Increase linger.ms for higher latency and higher throughput in your producer.

Tuning Kafka Brokers

Topics are divided into partitions. Each partition has a leader. Most partitions are written into leaders with multiple replicas. When the leaders are not balanced properly, one might be overworked, compared to others. For more information on load balancing, see Partitions and Memory Usage.

Depending on your system and how critical your data is, you want to be sure that you have sufficient replication sets to preserve your data. Cloudera recommends starting with one partition per physical storage disk and one consumer per partition.

Tuning Kafka Consumers

Consumers can create throughput issues on the other side of the pipeline. The maximum number of consumers for a topic is equal to the number of partitions. You need enough partitions to handle all the consumers needed to keep up with the producers.

Consumers in the same consumer group split the partitions among them. Adding more consumers to a group can enhance performance. Adding more consumer groups does not affect performance.

How you use the replica.high.watermark.checkpoint.interval.ms property can affect throughput. When reading from a partition, you can mark the last point where you read information. That way, if you have to go back and locate missing data, you have a checkpoint from which to move forward without having to reread prior data. If you set the checkpoint watermark for every event, you will never lose a message, but it significantly impacts performance. If, instead, you set it to check the offset every hundred messages, you have a margin of safety with much less impact on throughput.

Configuring JMX Ephemeral Ports

Kafka uses two high-numbered ephemeral ports for JMX. These ports are listed when you view netstat -anp information for the Kafka Broker process.

You can change the number for the first port by adding a command similar to -Dcom.sun.management.jmxremote.rmi.port=<port number> to the field Additional Broker Java Options (broker_java_opts) in Cloudera Manager. The JMX_PORT configuration maps to com.sun.management.jmxremote.port by default.

The second ephemeral port used for JMX communication is implemented for the JRMP protocol and cannot be changed.

Quotas

For a quick video introduction to quotas, see tl;dr: Quotas.

In CDK 2.0 and higher Powered By Apache Kafka, Kafka can enforce quotas on produce and fetch requests. Producers and consumers can use very high volumes of data. This can monopolize broker resources, cause network saturation, and generally deny service to other clients and the brokers themselves. Quotas protect against these issues and are important for large, multi-tenant clusters where a small set of clients using high volumes of data can degrade the user experience.

Quotas are byte-rate thresholds, defined per client ID. A client ID logically identifies an application making a request. A single client ID can span multiple producer and consumer instances. The quota is applied for all instances as a single entity: For example, if a client ID has a produce quota of 10 MB/s, that quota is shared across all instances with that same ID.

When running Kafka as a service, quotas can enforce API limits. By default, each unique client ID receives a fixed quota in bytes per second, as configured by the cluster (quota.producer.default, quota.consumer.default). This quota is defined on a per-broker basis. Each client can publish or fetch a maximum of X bytes per second per broker before it gets throttled.

The broker does not return an error when a client exceeds its quota, but instead attempts to slow the client down. The broker computes the amount of delay needed to bring a client under its quota and delays the response for that amount of time. This approach keeps the quota violation transparent to clients (outside of client-side metrics). This also prevents clients from having to implement special backoff and retry behavior.

Setting Quotas

You can override the default quota for client IDs that need a higher or lower quota. The mechanism is similar to per-topic log configuration overrides. Write your client ID overrides to ZooKeeper under /config/clients. All brokers read the overrides, which are effective immediately. You can change quotas without having to do a rolling restart of the entire cluster.

By default, each client ID receives an unlimited quota. The following configuration sets the default quota per producer and consumer client ID to 10 MB/s.

quota.producer.default=10485760
quota.consumer.default=10485760

To set quotas using Cloudera Manager, open the Kafka Configuration page and search for Quota. Use the fields provided to set the Default Consumer Quota or Default Producer Quota. For more information, see Modifying Configuration Properties Using Cloudera Manager.

Setting User Limits for Kafka

Kafka opens many files at the same time. The default setting of 1024 for the maximum number of open files on most Unix-like systems is insufficient. Any significant load can result in failures and cause error messages such as java.io.IOException...(Too many open files) to be logged in the Kafka or HDFS log files. You might also notice errors such as this:
ERROR Error in acceptor (kafka.network.Acceptor)
java.io.IOException: Too many open files

Cloudera recommends setting the value to a relatively high starting point, such as 100,000.

You can monitor the number of file descriptors in use on the Kafka Broker dashboard. In Cloudera Manager:
  1. Go to the Kafka service.
  2. Select a Kafka Broker.
  3. Open Charts Library > Process Resources and scroll down to the File Descriptors chart.
See http://www.cloudera.com/documentation/enterprise/latest/topics/cm_dg_view_charts.html.