Configuring Flume Security with Kafka

Cloudera Manager 5.8 and 5.9 do not provide configuration options for Flume to work with Kafka sources and channels over TLS. When Kafka is configured with TLS, additional manual configuration is required to communicate with Flume.

This topic describes how to configure Flume to communicate with Kafka TLS.

Set Up Cloudera Manager to Generate flume.keytab

If you have already set up Kerberos because of a kerberized HDFS or Solr dependency on Flume, then this step is already done. Verify your Kerberos keytab files.

If you do not have a Kerberos dependency or the key tab files haven't been generated, then extend your Flume configuration. If the agent requires Kerberos credentials, then the configuration file must have a line that has the following attributes:
  • Begins with the agent's name.
  • Contains the $KERBEROS_PRINCIPAL string.
  • Is syntactically correct.
For example:
tier1.sources.source1.generateKeytabFor = $KERBEROS_PRINCIPAL

The property name generateKeyTabFor is an arbitrary name that is not used by Flume. There is no dependency on HDFS or any other service.

Verify the flume.keytab

After you configure and restart the agent, the key tab file is generated at the following directory location:

/var/run/cloudera-scm-agent/process/<latest_id>-flume-AGENT/flume.keytab

The file must not be empty on any host that runs a kerberized Flume agent.

Principal managaement is handled by Cloudera Manager for Flume, just as with other services. For example, principals are listed on the Administration > Security > Kerberos Credentials page in Cloudera Manager.

Create jaas.conf

Create a flafka_jaas.conf file on each host that runs a Flume agent. The configuration information is used to communicate with Kafka and also provide normal Flume Kerberos support. The flafka_jaas.conf file contains two entries for the Flume principal: Client and KafkaClient. Note that the principal property is host specific. Unix user flume must have read permission for this file.

/opt/cloudera/security/flafka_jaas.conf:
    Client {
    com.sun.security.auth.module.Krb5LoginModule required
    useKeyTab=true
    storeKey=true
    keyTab="flume.keytab"
    principal="flume/cornhost-1.gce.acmecorn.com@GCE.ACMECORN.COM";
    };
    
    KafkaClient {
    com.sun.security.auth.module.Krb5LoginModule required
    useKeyTab=true
    storeKey=true
    keyTab="flume.keytab"
    principal="flume/cornhost-1.gce.acmecorn.com@GCE.ACMECORN.COM";
    };

Set Up jaas for Flume in Cloudera Manager

In Flume service configuration, the Java Configuration Options for FlumeAgent requires the following:

-Djava.security.auth.login.config=/opt/cloudera/security/flafka_jaas.conf

Do not use Flume Service Environment Advanced Configuration Snippet (Safety Valve) to set this property using FLUME_AGENT_JAVA_OPTS, as it will override existing Java command line options.

Update Flume Service Configuration Kerberos Authentication and TLS Encryption

Add the relevant properties to the Flume source or Flume channel, depending on the broker.protocol type (in this case, SASL_SSL).

The default secure port of Kafka brokers is 9093 instead of 9092. Update the kafka.bootstrap.servers as well.

Kafka Source

Update the Flume Kafka source entries to include the following security configuration. Replace the truststore location and password with the correct path for the cluster.

tier1.sources.source1.kafka.consumer.security.protocol = SASL_SSL
tier1.sources.source1.kafka.consumer.sasl.kerberos.service.name = kafka
tier1.sources.source1.kafka.consumer.ssl.truststore.location=/opt/cloudera/security/jks/truststore.jks
tier1.sources.source1.kafka.consumer.ssl.truststore.password=cloudera
tier1.sources.source1.generateKeytabFor = $KERBEROS_PRINCIPAL
    

Kafka Channel

Update the Flume Kafka channel entries to include the following security configuration. Replace the truststore location and password with the correct path for the cluster. Both producer and consumer configurations are required.

tier1.channels.channel1.kafka.consumer.security.protocol = SASL_SSL
tier1.channels.channel1.kafka.consumer.sasl.kerberos.service.name = kafka 
tier1.channels.channel1.kafka.consumer.ssl.truststore.location=/opt/cloudera/security/jks/truststore.jks 
tier1.channels.channel1.kafka.consumer.ssl.truststore.password=cloudera 

tier1.channels.channel1.kafka.producer.security.protocol = SASL_SSL 
tier1.channels.channel1.kafka.producer.sasl.kerberos.service.name = kafka 
tier1.channels.channel1.kafka.producer.ssl.truststore.location=/opt/cloudera/security/jks/truststore.jks 
tier1.channels.channel1.kafka.producer.ssl.truststore.password=cloudera 
tier1.channels.channel1.generateKeytabFor = $KERBEROS_PRINCIPAL

Kafka Sink

Update the Flume Kafka sink entries to include the following security configuration. Replace the truststore location and password with the correct path for the cluster.

tier1.sinks.sink1.kafka.producer.security.protocol = SASL_SSL
tier1.sinks.sink1.kafka.producer.sasl.kerberos.service.name = kafka
tier1.sinks.sink1.kafka.producer.ssl.truststore.location=/opt/cloudera/security/jks/truststore.jks
tier1.sinks.sink1.kafka.producer.ssl.truststore.password=cloudera
tier1.sinks.sink1.generateKeytabFor = $KERBEROS_PRINCIPAL

Update Flume Service Configuration: Kerberos Authentication with No Encryption

Add the relevant properties to the Flume source or Flume channel, depending on the broker.protocol type (in this case, SASL_PLAINTEXT).

The default secure port of Kafka brokers is 9093 rather than 9092: update kafka.bootstrap.servers as well.

Kafka Source

Update the Flume Kafka source entries to include the following security configuration. Replace the truststore location and password with the correct path for the cluster.

tier1.sources.source1.kafka.consumer.security.protocol = SASL_PLAINTEXT
tier1.sources.source1.kafka.consumer.sasl.kerberos.service.name = kafka
tier1.sources.source1.generateKeytabFor = $KERBEROS_PRINCIPAL

Kafka Channel

Update the Flume Kafka channel entries to include the following security configuration. Replace the truststore location and password with the correct path for the cluster. Both producer and consumer configurations are required.

tier1.channels.channel1.kafka.consumer.security.protocol = SASL_PLAINTEXT
tier1.channels.channel1.kafka.consumer.sasl.kerberos.service.name = kafka

tier1.channels.channel1.kafka.producer.security.protocol = SASL_PLAINTEXT
tier1.channels.channel1.kafka.producer.ssl.truststore.password=cloudera
tier1.channels.channel1.generateKeytabFor = $KERBEROS_PRINCIPAL

Kafka Sink

Update the Flume Kafka sink entries to include the following security configuration. Replace the truststore location and password with the correct path for the cluster.

tier1.sinks.sink1.kafka.producer.security.protocol = SASL_PLAINTEXT
tier1.sinks.sink1.kafka.producer.sasl.kerberos.service.name = kafka
tier1.sinks.sink1.generateKeytabFor = $KERBEROS_PRINCIPAL

Example

The code sample below is a complete working example Flume configuration with two tiers. Tier1 reads an input log and puts the new Events to the sectest topic using a Kafka Sink (the tailed file has to exist before agent starts). Tier2 listens to the sectest topic by a Kafka Source and logs every event.

########################################################
# Tier1
########################################################
tier1.sources  = source1
tier1.channels = channel1
tier1.sinks    = sink1

tier1.channels.channel1.type   = memory
tier1.channels.channel1.capacity = 1000
tier1.channels.channel1.transactionCapacity = 100

tier1.sinks.sink1.channel      = channel1
tier1.sources.source1.channels = channel1

tier1.sources.source1.type  = exec
tier1.sources.source1.command  = tail -F /tmp/input/input.log
tier1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink
tier1.sinks.sink1.kafka.topic = sectest

tier1.sinks.sink1.kafka.bootstrap.servers = acmecp-ssl-1.gce.cloudera.com:9093,acmecp-ssl-2.gce.cloudera.com:9093

###
# Security related setup
###
tier1.sinks.sink1.kafka.producer.security.protocol = SASL_SSL
tier1.sinks.sink1.kafka.producer.sasl.kerberos.service.name = kafka
tier1.sinks.sink1.kafka.producer.ssl.truststore.location=/etc/cdep-ssl-conf/CA_STANDARD/truststore.jks
tier1.sinks.sink1.kafka.producer.ssl.truststore.password=cloudera
tier1.sinks.sink1.generateKeytabFor = $KERBEROS_PRINCIPAL

########################################################
# Tier2
########################################################
tier2.sources  = source1
tier2.channels = channel1
tier2.sinks    = sink1

tier2.channels.channel1.type   = memory
tier2.channels.channel1.capacity = 1000
tier2.channels.channel1.transactionCapacity = 100

tier2.sinks.sink1.channel      = channel1
tier2.sources.source1.channels = channel1

tier2.sources.source1.type  = org.apache.flume.source.kafka.KafkaSource
tier2.sources.source1.kafka.bootstrap.servers = acmecp-ssl-1.gce.cloudera.com:9093,acmecp-ssl-2.gce.cloudera.com:9093
tier2.sources.source1.kafka.topics = sectest
tier2.sources.source1.kafka.offsets.storage = kafka
tier2.sources.source1.kafka.consumer.group.id = flume
tier2.sources.source1.kafka.consumer.auto.offset.reset = earliest
tier2.sinks.sink1.type = logger

###
# Security related setup
###
tier2.sources.source1.kafka.consumer.security.protocol = SASL_SSL
tier2.sources.source1.kafka.consumer.sasl.kerberos.service.name = kafka
tier2.sources.source1.kafka.consumer.ssl.truststore.location=/etc/cdep-ssl-conf/CA_STANDARD/truststore.jks
tier2.sources.source1.kafka.consumer.ssl.truststore.password=cloudera
tier2.sources.source1.generateKeytabFor = $KERBEROS_PRINCIPAL