Performance and Resource Considerations

Kafka is optimized for small messages. According to benchmarks, the best performance occurs with 1 KB messages. Larger messages (for example, 10 MB to 100 MB) can decrease throughput and significantly impact operations.

Partitions and Memory Usage

Brokers allocate a buffer the size of replica.fetch.max.bytes for each partition they replicate. If replica.fetch.max.bytes is set to 1 MiB, and you have 1000 partitions, about 1 GiB of RAM is required. Ensure that the number of partitions multiplied by the size of the largest message does not exceed available memory.

The same consideration applies for the consumer fetch.message.max.bytes setting. Ensure that you have enough memory for the largest message for each partition the consumer replicates. With larger messages, you might need to use fewer partitions or provide more RAM.

Partition Reassignment

At some point you will likely exceed configured resources on your system. If you add a Kafka broker to your cluster to handle increased demand, new partitions are allocated to it (the same as any other broker), but it does not automatically share the load of existing partitions on other brokers. To redistribute the existing load among brokers, you must manually reassign partitions. You can do so using bin/kafka-reassign-partitions.sh script utilities.

To reassign partitions:
  1. Create a list of topics you want to move.
    topics-to-move.json
    {"topics": [{"topic": "foo1"},
                {"topic": "foo2"}],
     "version":1
    }
  2. Use the --generate option in kafka-reassign-partitions.sh to list the distribution of partitions and replicas on your current brokers, followed by a list of suggested locations for partitions on your new broker.
    bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 
        --topics-to-move-json-file topics-to-move.json 
        --broker-list "4" 
        --generate
    
    Current partition replica assignment
    
    {"version":1,
     "partitions":[{"topic":"foo1","partition":2,"replicas":[1,2]},
                   {"topic":"foo1","partition":0,"replicas":[3,1]},
                   {"topic":"foo2","partition":2,"replicas":[1,2]},
                   {"topic":"foo2","partition":0,"replicas":[3,2]},
                   {"topic":"foo1","partition":1,"replicas":[2,3]},
                   {"topic":"foo2","partition":1,"replicas":[2,3]}]
    }
    
    {"version":1,
     "partitions":[{"topic":"foo1","partition":3,"replicas":4},
                   {"topic":"foo1","partition":1,"replicas":4},
                   {"topic":"foo2","partition":2,"replicas":4}]
    }
    
  3. Revise the suggested list if required, and then save it as a JSON file.
  4. Use the --execute option in kafka-reassign-partitions.sh to start the redistirbution process, which can take several hours in some cases.
    > bin/kafka-reassign-partitions.sh \
        --zookeeper localhost:2181 \
        --reassignment-json-file expand-cluster-reassignment.json 
        --execute
  5. Use the --verify option in kafka-reassign-partitions.sh to check the status of your partitions.

Although reassigning partitions is labor-intensive, you should anticipate system growth and redistribute the load when your system is at 70% capacity. If you wait until you are forced to redistribute because you have reached the limites of your resources, the redistribution process can be extremely slow.

Garbage Collection

Large messages can cause longer garbage collection (GC) pauses as brokers allocate large chunks. Monitor the GC log and the server log. If long GC pauses cause Kafka to abandon the ZooKeeper session, you may need to configure longer timeout values for zookeeper.session.timeout.ms.

Handling Large Messages

Before configuring Kafka to handle large messages, first consider the following options to reduce message size:

  • The Kafka producer can compress messages. For example, if the original message is a text-based format (such as XML), in most cases the compressed message will be sufficiently small.

    Use the compression.codec and compressed.topics producer configuration parameters to enable compression. Gzip and Snappy are supported.

  • If shared storage (such as NAS, HDFS, or S3) is available, consider placing large files on the shared storage and using Kafka to send a message with the file location. In many cases, this can be much faster than using Kafka to send the large file itself.
  • Split large messages into 1 KB segments with the producing client, using partition keys to ensure that all segments are sent to the same Kafka partition in the correct order. The consuming client can then reconstruct the original large message.

If you still need to send large messages with Kafka, modify the following configuration parameters to match your requirements:

Broker Configuration

  • message.max.bytes

    Maximum message size the broker will accept. Must be smaller than the consumer fetch.message.max.bytes, or the consumer cannot consume the message.

    Default value: 1000000 (1 MB)

  • log.segment.bytes

    Size of a Kafka data file. Must be larger than any single message.

    Default value: 1073741824 (1 GiB)

  • replica.fetch.max.bytes

    Maximum message size a broker can replicate. Must be larger than message.max.bytes, or a broker can accept messages it cannot replicate, potentially resulting in data loss.

    Default value: 1048576 (1 MiB)

Consumer Configuration

  • fetch.message.max.bytes

    Maximum message size a consumer can read. Must be at least as large as message.max.bytes.

    Default value: 1048576 (1 MiB)

Tuning Kafka for Optimal Performance

Performance tuning involves two important metrics: Latencymeasures how long it takes to process one event, and throughput measures how many events arrive within a specific amount of time. Most systems are optimized for either latency or throughput. Kafka is balanced for both. A well tuned Kafka system has just enough brokers to handle topic throughput, given the latency required to process information as it is received.

Tuning your producers, brokers, and consumers to send, process, and receive the largest possible batches within a manageable amount of time results in the best balance of latency and throughput for your Kafka cluster.

Tuning Kafka Producers

Kafka uses an asynchronous publish/subscribe model. When your producer calls the send() command, the result returned is a future. The future provides methods to let you check the status of the information in process. When the batch is ready, the producer sends it to the broker. The Kafka broker waits for an event, receives the result, and then responds that the transaction is complete.

If you do not use a future, you could get just one record, wait for the result, and then send a response. Latency is very low, but so is throughput. If each transaction takes 5 ms, throughput is 200 events per second.—slower than the expected 100,000 events per second.

When you use Producer.send(), you fill up buffers on the producer. When a buffer is full, the producer sends the buffer to the Kafka broker and begins to refill the buffer.

Two parameters are particularly important for latency and throughput: batch size and lead time.

Batch Size

send.buffer.bytes measures batch size in total bytes instead of the number of messages. It controls how many bytes of data to collect before sending messages to the Kafka broker. Set this as high as possible, without exceeding available memory.

If you increase the size of your buffer, it might never get full. The Producer sends the information eventually, based on other triggers, such as wait time in milliseconds. Although you can impair memory usage by setting the buffer batch size too high, this does not impact latency.

If your producer is sending all the time, you are probably getting the best throughput possible. If the producer is often idle, you might not be writing enough data to warrant the current allocation of resources.

Lead Time

queue.buffering.max.ms sets the maximum time to buffer data in asynchronous mode. For example, a setting of 100 batches 100ms of messages to send at once. This improves throughput, but the buffering adds message delivery latency.

By default, the producer does not wait. It sends the buffer any time data is available.

Instead of sending immediately, you can set queue.buffering.max.ms to 5 and send a more messages in one batch.

The farther away the broker is from the producer, the more overhead required to send messages. Increase queue.buffering.max.ms for higher latency and higher throughput in your producer.

Tuning Kafka Brokers

Topics are divided into partitions. Each partition has a leader. Most partitions are written into leaders with multiple replicas. When the leaders are not balanced properly, one might be overworked, compared to others. For more information on load balancing, see Partitions and Memory Usage.

Depending on your system and how critical your data is, you want to be sure that you have sufficient replication sets to preserve your data. Cloudera recommends starting with one partition per physical storage disk and one consumer per partition.

Tuning Kafka Consumers

Consumers can create throughput issues on the other side of the pipeline. The maximum number of consumers for a topic is equal to the number of partitions. You need enough partitions to handle all the consumers needed to keep up with the producers.

Consumers in the same consumer group split the partitions among them. Adding more consumers to a group can enhance performance. Adding more consumer groups does not affect performance.

How you use the replica.high.watermark.checkpoint.interval.ms property can affect throughput. When reading from a partition, you can mark the last point where you read information. That way, if you have to go back and locate missing data, you have a checkpoint from which to move forward without having to reread prior data. If you set the checkpoint watermark for every event, you will never lose a message, but it significantly impacts performance. If, instead, you set it to check the offset every hundred messages, you have a margin of safety with much less impact on throughput.