Performance and Resource Considerations

Kafka is optimized for small messages. According to benchmarks, the best performance occurs with 1 KB messages. Larger messages (for example, 10 MB to 100 MB) can decrease throughput and significantly impact operations.

Partitions and Memory Usage

Brokers allocate a buffer the size of replica.fetch.max.bytes for each partition they replicate. If replica.fetch.max.bytes is set to 1 MiB and you have 1000 partitions, about 1 GiB of RAM is required. Ensure that the number of partitions multiplied by the size of the largest message does not exceed available memory.

The same consideration applies for the consumer fetch.message.max.bytes setting. Ensure that you have enough memory for the largest message for each partition the consumer replicates. When using larger messages, you may need to use fewer partitions or provide more RAM.

Partition Reassignment

If your system is healthy and thriving, you can anticipate that at some point you will exceed your configured resources. If you add a Kafka broker to your cluster to handle increased demand, it is allocated new partitions (the same as any other broker), but it does not automatically share the load of existing partitions on other brokers. To redistribute the existing load between brokers, you must manually reassign partitions. The bin/kafka-reassign-partitions.sh script provides utilities to assist you.

To reassign partitions:
  1. Create a list of topics you want to move.
    topics-to-move.json
    {"topics": [{"topic": "foo1"},
                {"topic": "foo2"}],
     "version":1
    }
  2. Use the --generate option in kafka-reassign-partitions.sh to list the distribution of partitions and replicas on your current brokers, followed by a list of suggested locations for partitions on your new broker.
    bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 
        --topics-to-move-json-file topics-to-move.json 
        --broker-list "4" 
        --generate
    
    Current partition replica assignment
    
    {"version":1,
     "partitions":[{"topic":"foo1","partition":2,"replicas":[1,2]},
                   {"topic":"foo1","partition":0,"replicas":[3,1]},
                   {"topic":"foo2","partition":2,"replicas":[1,2]},
                   {"topic":"foo2","partition":0,"replicas":[3,2]},
                   {"topic":"foo1","partition":1,"replicas":[2,3]},
                   {"topic":"foo2","partition":1,"replicas":[2,3]}]
    }
    
    {"version":1,
     "partitions":[{"topic":"foo1","partition":3,"replicas":4},
                   {"topic":"foo1","partition":1,"replicas":4},
                   {"topic":"foo2","partition":2,"replicas":4}]
    }
    
  3. Make any desired revisions to the suggested list, then save it as a JSON file.
  4. Use the --execute option in kafka-reassign-partitions.sh to start the redistirbution process. This process can take several hours, in some cases.
    > bin/kafka-reassign-partitions.sh \
        --zookeeper localhost:2181 \
        --reassignment-json-file expand-cluster-reassignment.json 
        --execute
  5. Use the --verify option in kafka-reassign-partitions.sh to check the status of your partitions.

While it is a painful, manual process to reassign partitions, you want to anticipate your system's growth, and redistribute the load when your system is at 70% capacity. If you wait until you are forced to redistribute, it might be too late: the redistribution process will be excruciatingly slow if you have to run these utilities when you reach the limits of your resources.

Garbage Collection

Large messages can cause longer garbage collection (GC) pauses as brokers allocate large chunks. Monitor the GC log and the server log. If long GC pauses cause Kafka to abandon the ZooKeeper session, you may need to configure longer timeout values for zookeeper.session.timeout.ms.

Handling Large Messages

If you need to accommodate large messages, first consider the following options to reduce message size:

  • The Kafka producer can compress messages. For example, if the original message is a text-based format (such as XML), in most cases the compressed message will be sufficiently small.

    Use the compression.codec and compressed.topics producer configuration parameters to enable compression. Both Gzip and Snappy are supported.

  • If shared storage (such as NAS, HDFS, or S3) is available, consider placing large files on the shared storage and using Kafka to send a message with the file location. In many cases, this can be much faster than using Kafka to send the large file itself.
  • Split large messages into 1 KB segments with the producing client, using partition keys to ensure that all segments are sent to the same Kafka partition in the correct order. The consuming client can then reconstruct the original large message.

If you still need to send large messages with Kafka, modify the following configuration parameters to match your requirements:

Broker Configuration

  • message.max.bytes

    Maximum message size the broker will accept. Must be smaller than the consumer fetch.message.max.bytes, or the consumer cannot consume the message.

    Default value: 1000000 (1 MB)

  • log.segment.bytes

    Size of a Kafka data file. Must be larger than any single message.

    Default value: 1073741824 (1 GiB)

  • replica.fetch.max.bytes

    Maximum message size a broker can replicate. Must be larger than message.max.bytes, or a broker can accept messages it cannot replicate, potentially resulting in data loss.

    Default value: 1048576 (1 MiB)

Consumer Configuration

  • fetch.message.max.bytes

    Maximum message size a consumer can read. Must be at least as large as message.max.bytes.

    Default value: 1048576 (1 MiB)