Configuring Flume Security with Kafka

In 5.15.0 and higher releases of CDH 5, and in CDH 6.1, you can use Cloudera Manager to configure Flume to communicate with Kafka sources, sinks, and channels over TLS.

When you add a Kafka service as a dependent of the Flume service, Cloudera Manager creates jaas.conf and flume.keytab files. Cloudera Manager also scans the Flume configuration and adds Kafka security properties to it. The properties are added at the beginning of the file so you can override them if needed. The following Kafka security properties are added to the Flume configuration:
kafka.consumer.security.protocol
kafka.consumer.sasl.kerberos.service.name

You also have to configure global truststore settings in Cloudera Manager. To do it, edit the Flume TLS/SSL Client Trust Store File and Flume TLS/SSL Client Trust Store Password properties in Cloudera Manager.

For detailed instructions, see Setting Keystore and Truststore for Flume Components.

To configure Flume to connect to secure Kafka, perform the following steps:

  1. In Cloudera Manager, go to the Flume service.
  2. Open the Configuration tab.
  3. Use the Search field to search for Kafka. The Kafka Service property is displayed. Select the Kafka service that you want the Flume service to connect to.

    The following image shows an example of the Kafka Service property with the KAFKA-1 service selected:


flume.keytab

Cloudera Manager automatically creates the flume.keytab file. However, if you need to edit the file, you can find it in the following location:

/var/run/cloudera-scm-agent/process/<latest_id>-flume-AGENT/flume.keytab

The file must not be empty on any host that runs a kerberized Flume agent.

Principal management is handled by Cloudera Manager for Flume, just as with other services. For example, principals are listed on the Administration > Security > Kerberos Credentials page in Cloudera Manager.

jaas.conf

Cloudera Manager also creates a flafka_jaas.conf file on each host that runs a Flume agent. You do not need to create or edit the file manually. The following information is provided for troubleshooting.

The configuration information in the file is used to communicate with Kafka and also provide normal Flume Kerberos support. The flafka_jaas.conf file contains two entries for the Flume principal: Client and KafkaClient. Note that the principal property is host specific. Unix user flume must have read permission for this file.

/opt/cloudera/security/flafka_jaas.conf:
    Client {
    com.sun.security.auth.module.Krb5LoginModule required
    useKeyTab=true
    storeKey=true
    keyTab="flume.keytab"
    principal="flume/cornhost-1.gce.acmecorn.com@GCE.ACMECORN.COM";
    };
    
    KafkaClient {
    com.sun.security.auth.module.Krb5LoginModule required
    useKeyTab=true
    storeKey=true
    serviceName="kafka" 
    keyTab="flume.keytab"
    principal="flume/cornhost-1.gce.acmecorn.com@GCE.ACMECORN.COM";
    };

Example

The code sample below is a complete working example Flume configuration with two tiers. Tier1 reads an input log and puts the new Events to the sectest topic using a Kafka Sink (the tailed file has to exist before agent starts). Tier2 listens to the sectest topic by a Kafka Source and logs every event.

########################################################
# Tier1
########################################################
tier1.sources  = source1
tier1.channels = channel1
tier1.sinks    = sink1

tier1.channels.channel1.type   = memory
tier1.channels.channel1.capacity = 1000
tier1.channels.channel1.transactionCapacity = 100

tier1.sinks.sink1.channel      = channel1
tier1.sources.source1.channels = channel1

tier1.sources.source1.type  = exec
tier1.sources.source1.command  = tail -F /tmp/input/input.log
tier1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink
tier1.sinks.sink1.kafka.topic = sectest

tier1.sinks.sink1.kafka.bootstrap.servers = acmecp-ssl-1.gce.cloudera.com:9093,acmecp-ssl-2.gce.cloudera.com:9093

###
# Generated security related setup
###
# tier1.sinks.sink1.kafka.producer.security.protocol = SASL_SSL
# tier1.sinks.sink1.kafka.producer.sasl.kerberos.service.name = kafka
###

########################################################
# Tier2
########################################################
tier2.sources  = source1
tier2.channels = channel1
tier2.sinks    = sink1

tier2.channels.channel1.type   = memory
tier2.channels.channel1.capacity = 1000
tier2.channels.channel1.transactionCapacity = 100

tier2.sinks.sink1.channel      = channel1
tier2.sources.source1.channels = channel1

tier2.sources.source1.type  = org.apache.flume.source.kafka.KafkaSource
tier2.sources.source1.kafka.bootstrap.servers = acmecp-ssl-1.gce.cloudera.com:9093,acmecp-ssl-2.gce.cloudera.com:9093
tier2.sources.source1.kafka.topics = sectest
tier2.sources.source1.kafka.offsets.storage = kafka
tier2.sources.source1.kafka.consumer.group.id = flume
tier2.sinks.sink1.type = logger

###
# Generated security related setup
###
# tier2.sources.source1.kafka.consumer.security.protocol = SASL_SSL
# tier2.sources.source1.kafka.consumer.sasl.kerberos.service.name = kafka
###