Near Real Time (NRT) Indexing Tweets Using Flume

The following section describes how to use Flume for near real time (NRT) indexing using tweets from the Twitter public stream as an example. Near real time indexing is generally used when new data needs to be returned in query results in time frames measured in seconds. Before continuing, make sure that you have completed the procedures in Preparing to Index Sample Tweets with Cloudera Search.

Install Flume

If you have not already done so, install Flume. For Cloudera Manager installations, Flume is included in CDH, and no additional action is required for installation. Add the Flume service to the cluster following the instructions in Adding a Service.

For instructions on installing Flume in an unmanaged environment, see Setting Up Apache Flume Using the Command Line.

Sentry Configuration for NRT Indexing Using Flume

If your cluster has security enabled and is using Apache Sentry for authorization, make sure that the Flume system user (flume by default) has permission to update the collection (cloudera_tutorial_tweets in this example):

  1. Switch to the Sentry admin user (solr in this example) using kinit:
    kinit solr@EXAMPLE.COM
  2. Create a Sentry role:
    solrctl sentry --create-role cloudera_tutorial_flume
  3. Map the flume group to this role:
    solrctl sentry --add-role-group cloudera_tutorial_flume flume
  4. Grant Update privileges to the cloudera_tutorial_flume role for the cloudera_tutorial_tweets collections:
    solrctl sentry --grant-privilege cloudera_tutorial_flume 'collection=cloudera_tutorial_tweets->action=update'
    For more information on the Sentry privilege model for Cloudera Search, see Authorization Privilege Model for Solr.

Copy Configuration Template Files

Copy the configuration files as follows:

  • Cloudera Manager: For Cloudera Manager environments, the Flume agent is configured in a later section. Skip to Configuring the Flume Solr Sink.
  • Unmanaged CDH:
    sudo cp -r $HOME/cloudera_tutorial_tweets_config /etc/flume-ng/conf/cloudera_tutorial_tweets
    sudo cp /usr/share/doc/search*/examples/solr-nrt/twitter-flume.conf /etc/flume-ng/conf/flume.conf
    sudo cp /usr/share/doc/search*/examples/solr-nrt/test-morphlines/tutorialReadAvroContainer.conf /etc/flume-ng/conf/morphline.conf

Configuring the Flume Solr Sink

This topic describes how to configure the Flume Solr Sink for both Cloudera Manager and unmanaged CDH installations:
  • For Cloudera Manager deployments, use Cloudera Manager to edit the configuration files similar to the process described in Configuring the Flume Agents.
  • For unmanaged CDH installations, use command-line tools (such as vi) to edit files.
  1. Modify the Flume configuration for a single agent to specify the Flume source details and configure the flow. You must set the relative or absolute path to the morphline configuration file.
    • Cloudera Manager: In the Cloudera Manager Admin Console, go to Flume service > Instances > Agent (select one) > Configuration. When prompted to make configuration changes on the service configuration page, click Cancel. Set Agent Name to twitter_stream and modify Configuration File exactly as follows. You will replace the YOUR_TWITTER_* values in a later step:
      twitter_stream.sources = twitterSrc
      twitter_stream.channels = memoryChannel
      twitter_stream.sinks = solrSink
      
      twitter_stream.sources.twitterSrc.type = org.apache.flume.source.twitter.TwitterSource
      twitter_stream.sources.twitterSrc.consumerKey = YOUR_TWITTER_CONSUMER_KEY
      twitter_stream.sources.twitterSrc.consumerSecret = YOUR_TWITTER_CONSUMER_SECRET
      twitter_stream.sources.twitterSrc.accessToken = YOUR_TWITTER_ACCESS_TOKEN
      twitter_stream.sources.twitterSrc.accessTokenSecret = YOUR_TWITTER_ACCESS_TOKEN_SECRET
      twitter_stream.sources.twitterSrc.maxBatchDurationMillis = 200
      twitter_stream.sources.twitterSrc.channels = memoryChannel
      
      twitter_stream.channels.memoryChannel.type = memory
      twitter_stream.channels.memoryChannel.capacity = 10000
      twitter_stream.channels.memoryChannel.transactionCapacity = 1000
      
      twitter_stream.sinks.solrSink.type = org.apache.flume.sink.solr.morphline.MorphlineSolrSink
      twitter_stream.sinks.solrSink.channel = memoryChannel
      twitter_stream.sinks.solrSink.morphlineFile = morphlines.conf

      Click Save Changes.

    • Unmanaged CDH: If you copied the configuration templates as described in Copy Configuration Template Files, no further action is required in this step.
  2. Edit the Morphline configuration to specify Solr environment details.
    • Cloudera Manager: In the Cloudera Manager Admin Console, go to Flume service > Instances > Agent (select one) > Configuration. When prompted to make configuration changes on the service configuration page, click Cancel. Make sure that you selected the same agent that you selected in step 1. Edit the SOLR_LOCATOR directive in the Morphlines File as follows. Edit the SOLR_LOCATOR entry only. Leave the rest of the configuration unedited.
      SOLR_LOCATOR : {
        # Name of solr collection
        collection : cloudera_tutorial_tweets
      
        # ZooKeeper ensemble
        zkHost : "zk01.example.com:2181,zk02.example.com:2181,zk03.example.com:2181/solr"
      }

      Replace the example ZooKeeper hostnames with the hostnames of your ZooKeeper servers.

      Click Save Changes.

    • Unmanaged CDH: Edit the SOLR_LOCATOR section in /etc/flume-ng/conf/morphline.conf as follows:
      SOLR_LOCATOR : {
        # Name of solr collection
        collection : cloudera_tutorial_tweets
      
        # ZooKeeper ensemble
        zkHost : "zk01.example.com:2181,zk02.example.com:2181,zk03.example.com:2181/solr"
      }

      Replace the example ZooKeeper hostnames with the hostnames of your ZooKeeper servers.

  3. (Unmanaged CDH only) Copy flume-env.sh.template to flume-env.sh:
    sudo cp /etc/flume-ng/conf/flume-env.sh.template /etc/flume-ng/conf/flume-env.sh
  4. Update the Java heap size.
    • Cloudera Manager: In the Cloudera Manager Admin Console, go to Flume service > Instances > Agent (select one) > Configuration. When prompted to make configuration changes on the service configuration page, click Cancel. Make sure that you selected the same agent that you selected in step 1. Select Category > Resource Management, and then set Java Heap Size of Agent in Bytes to 500 and select MiB in the unit drop-down menu. Click Save Changes.
    • Unmanaged CDH: Edit /etc/flume-ng/conf/flume-env.sh, inserting or replacing JAVA_OPTS as follows:
      JAVA_OPTS="-Xmx500m"
  5. (Optional) Modify Flume logging settings to facilitate monitoring and debugging:
    • Cloudera Manager: In the Cloudera Manager Admin Console, go to Flume service > Instances > Agent (select one) > Configuration. When prompted to make configuration changes on the service configuration page, click Cancel. Make sure that you selected the same agent that you selected in step 1. Select Category > Advanced, and then modify Agent Logging Advanced Configuration Snippet (Safety Valve) to include:
      log4j.logger.org.apache.flume.sink.solr=DEBUG
      log4j.logger.org.kitesdk.morphline=TRACE
    • Unmanaged CDH: Run the following commands:
      sudo bash -c 'echo "log4j.logger.org.apache.flume.sink.solr=DEBUG" >> /etc/flume-ng/conf/log4j.properties'
      sudo bash -c 'echo "log4j.logger.org.kitesdk.morphline=TRACE" >> /etc/flume-ng/conf/log4j.properties'
  6. (Optional) In an unmanaged environment you can use SEARCH_HOME to configure where Flume finds Cloudera Search dependencies for the Flume Solr Sink. To set SEARCH_HOME use a command similar to the following:
    export SEARCH_HOME=/usr/lib/search

    Alternatively, you can add the same setting to /etc/flume-ng/conf/flume-env.sh.

    In a Cloudera Manager managed environment, Cloudera Manager automatically updates the SOLR_HOME location with any required dependencies.

Configuring Flume Solr Sink to Access the Twitter Public Stream

Use the Twitter developer site to generate credentials to access the Twitter public stream:

  1. Sign in to https://apps.twitter.com with a Twitter account.
  2. On the Application Management page, click Create New App.
  3. Fill in the form to represent the Search installation. This can represent multiple clusters, and does not require the callback URL. Because this is not a publicly distributed application, the values you enter for the required name, description, and website fields are not important.
  4. Read and accept the Developer Agreement, then click Create your Twitter application at the bottom of the page.
  5. Click on the Keys and Access Tokens tab, then click Create my access token button at the bottom.

The Flume TwitterSource uses the Twitter 1.1 API, which requires authentication of both the consumer (application) and the user (you). Consider this information confidential, just like your regular Twitter credentials. Edit the Flume configuration and replace the following properties with the credentials you generated:

agent.sources.twitterSrc.consumerKey = YOUR_TWITTER_CONSUMER_KEY
agent.sources.twitterSrc.consumerSecret = YOUR_TWITTER_CONSUMER_SECRET
agent.sources.twitterSrc.accessToken = YOUR_TWITTER_ACCESS_TOKEN
agent.sources.twitterSrc.accessTokenSecret = YOUR_TWITTER_ACCESS_TOKEN_SECRET

To edit the Flume configuration:

  • Cloudera Manager: In the Cloudera Manager Admin Console, go to Flume service > Instances > Agent (select one) > Configuration. When prompted to make configuration changes on the service configuration page, click Cancel. Make sure that you selected the same agent that you configured earlier. Modify the Configuration File parameter.
  • Unmanaged CDH: Edit /etc/flume-ng/conf/flume.conf.

For authentication to succeed, you must make sure the system clock is set correctly on the Flume agent host that connects to Twitter. You can install NTP and keep the host synchronized by running the ntpd service, or manually synchronize by using the command sudo ntpdate pool.ntp.org. To confirm that the time is set correctly, make sure that the output of the command date --utc matches the time shown at http://www.timeanddate.com/worldclock/timezone/utc. You can also set the time manually using the date command.

Starting the Flume Agent

  1. If you are using Kerberos, kinit as the user that has privileges to update the collection:
    kinit jdoe@EXAMPLE.COM

    Replace EXAMPLE.COM with your Kerberos realm name.

  2. Delete all existing documents in the cloudera_tutorial_tweets collection. If your cluster does not have security enabled, run the following command as the solr user by adding sudo -u solr before the command:
    solrctl collection --deletedocs cloudera_tutorial_tweets
  3. Start or restart the Flume agent configured in Configuring the Flume Solr Sink:
    • Cloudera Manager: Flume service > Instances > Agent (select one) > Actions > Restart this Agent. Make sure that you selected the same agent that you configured earlier.
    • Unmanaged CDH:
      sudo /etc/init.d/flume-ng-agent restart
  4. Monitor progress in the Flume log file and watch for errors:
    tail -f /var/log/flume-ng/flume*.log
After restarting the Flume agent, use the Cloudera Search web UI. For example, if you have a Solr server running on search01.example.com, go to one of the following URLs in a browser to verify that new tweets have been ingested into Solr:
  • Security Enabled: https://search01.example.com:8985/solr/cloudera_tutorial_tweets/select?q=*%3A*&sort=created_at+desc&wt=json&indent=true
  • Security Disabled: http://search01.example.com:8983/solr/cloudera_tutorial_tweets/select?q=*%3A*&sort=created_at+desc&wt=json&indent=true
The query sorts the result set such that the most recently ingested tweets are at the top, based on the created_at timestamp. If you rerun the query, new tweets show up at the top of the result set.

To print diagnostic information, such as the content of records as they pass through the morphline commands, enable TRACE log level diagnostics by adding the following to your log4j.properties file:

log4j.logger.org.kitesdk.morphline=TRACE

In Cloudera Manager, you can use the safety valve to add the setting.

Go to Flume service > Configuration > View and Edit > Agent > Advanced > Agent Logging Advanced Configuration Snippet (Safety Valve). After setting this value, restart the service.

Indexing a File Containing Tweets with Flume HTTPSource

HTTPSource lets you ingest data into Solr by POSTing a file over HTTP. HTTPSource sends data using a channel to a sink, in this case a SolrSink. For more information, see Flume Solr BlobHandler Configuration Options.

  1. Stop the Flume agent that you configured in Configuring the Flume Solr Sink:
    • Cloudera Manager: Flume service > Instances > Agent (select one) > Actions > Stop this Agent. Make sure that you selected the same agent that you configured earlier.
    • Unmanaged CDH:
      sudo /etc/init.d/flume-ng-agent stop
  2. If you are using Kerberos, kinit as the user that has privileges to update the collection:
    kinit jdoe@EXAMPLE.COM

    Replace EXAMPLE.COM with your Kerberos realm name.

  3. Delete all existing documents in the cloudera_tutorial_tweets collection. If your cluster does not have security enabled, run the following commands as the solr user by adding sudo -u solr before the command:
    solrctl collection --deletedocs cloudera_tutorial_tweets
  4. Modify the Flume configuration:
    • Cloudera Manager: In the Cloudera Manager Admin Console, go to Flume service > Instances > Agent (select one) > Configuration. When prompted to make configuration changes on the service configuration page, click Cancel. Replace the Configuration File configuration with the following:
      twitter_stream.sources = httpSrc
      twitter_stream.channels = memoryChannel
      twitter_stream.sinks = solrSink
      
      twitter_stream.sources.httpSrc.type = org.apache.flume.source.http.HTTPSource
      twitter_stream.sources.httpSrc.port = 5140
      twitter_stream.sources.httpSrc.handler = org.apache.flume.sink.solr.morphline.BlobHandler
      twitter_stream.sources.httpSrc.handler.maxBlobLength = 2000000000
      twitter_stream.sources.httpSrc.channels = memoryChannel
      
      
      twitter_stream.channels.memoryChannel.type = memory
      twitter_stream.channels.memoryChannel.capacity = 10000
      twitter_stream.channels.memoryChannel.transactionCapacity = 1000
      
      twitter_stream.sinks.solrSink.type = org.apache.flume.sink.solr.morphline.MorphlineSolrSink
      twitter_stream.sinks.solrSink.channel = memoryChannel
      twitter_stream.sinks.solrSink.morphlineFile = morphlines.conf

      Click Save Changes.

    • Unmanaged CDH: If you copied the configuration templates as described in Copy Configuration Template Files, edit /etc/flume-ng/conf/flume.conf and comment out all sources except the HTTP source as follows:
      #agent.sources = twitterSrc
      agent.sources = httpSrc
      #agent.sources = spoolSrc
      #agent.sources = avroSrc
  5. Start the Flume agent:
    • Cloudera Manager: Flume service > Instances > Agent (select one) > Actions > Start this Agent. Make sure that you selected the same agent that you configured earlier.
    • Unmanaged CDH:
      sudo /etc/init.d/flume-ng-agent start
  6. Send a file containing tweets to the HTTP source. Run the following commands on a cluster host, replacing flume01.example.com with the hostname of the Flume agent you configured as the HTTP source:
    • Cloudera Manager:
      cd /opt/cloudera/parcels/CDH/share/doc/search-*/examples/test-documents
      curl --data-binary @sample-statuses-20120906-141433-medium.avro 'http://flume01.example.com:5140?resourceName=sample-statuses-20120906-141433-medium.avro' --header 'Content-Type:application/octet-stream' --verbose
    • Unmanaged CDH:
      cd /usr/share/doc/search-*/examples/test-documents
      curl --data-binary @sample-statuses-20120906-141433-medium.avro 'http://flume01.example.com:5140?resourceName=sample-statuses-20120906-141433-medium.avro' --header 'Content-Type:application/octet-stream' --verbose
  7. Check the log for status or errors:
    tail /var/log/flume-ng/flume*.log 
  8. Run a Solr query. For example, for a Solr server running on search01.example.com, go to one of the following URLs in a browser, depending on whether you have enabled TLS on your cluster:
    • Security Enabled: https://search01.example.com:8985/solr/cloudera_tutorial_tweets/select?q=*%3A*&wt=json&indent=true
    • Security Disabled: http://search01.example.com:8983/solr/cloudera_tutorial_tweets/select?q=*%3A*&wt=json&indent=true
    If indexing was successful, this page displays the first 10 query results.

Indexing a File Containing Tweets with Flume SpoolDirectorySource

SpoolDirectorySource specifies a directory on a local disk that Flume monitors. Flume automatically transfers data from files in this directory to Solr. SpoolDirectorySource sends data over a channel to a sink, in this case a SolrSink.

  1. Stop the Flume agent configured in Configuring the Flume Solr Sink:
    • Cloudera Manager: Flume service > Instances > Agent (select one) > Actions > Stop this Agent. Make sure that you selected the same agent that you configured earlier.
    • Unmanaged CDH:
      sudo /etc/init.d/flume-ng-agent stop
  2. If you are using Kerberos, kinit as the user that has privileges to update the collection:
    kinit jdoe@EXAMPLE.COM

    Replace EXAMPLE.COM with your Kerberos realm name.

  3. Delete all existing documents in the cloudera_tutorial_tweets collection. If your cluster does not have security enabled, run the following commands as the solr user by adding sudo -u solr before the command:
    solrctl collection --deletedocs cloudera_tutorial_tweets
  4. Modify the Flume configuration:
    • Cloudera Manager: In the Cloudera Manager Admin Console, go to Flume service > Instances > Agent (select one) > Configuration. When prompted to make configuration changes on the service configuration page, click Cancel. Replace the Configuration File configuration with the following:
      twitter_stream.sources = spoolSrc
      twitter_stream.channels = memoryChannel
      twitter_stream.sinks = solrSink
      
      twitter_stream.sources.spoolSrc.type = spooldir
      twitter_stream.sources.spoolSrc.spoolDir = /tmp/myspooldir
      twitter_stream.sources.spoolSrc.ignorePattern = \.
      twitter_stream.sources.spoolSrc.deserializer = org.apache.flume.sink.solr.morphline.BlobDeserializer$Builder
      twitter_stream.sources.spoolSrc.deserializer.maxBlobLength = 2000000000
      twitter_stream.sources.spoolSrc.batchSize = 1
      twitter_stream.sources.spoolSrc.fileHeader = true
      twitter_stream.sources.spoolSrc.fileHeaderKey = resourceName
      twitter_stream.sources.spoolSrc.channels = memoryChannel
      
      twitter_stream.channels.memoryChannel.type = memory
      twitter_stream.channels.memoryChannel.capacity = 10000
      twitter_stream.channels.memoryChannel.transactionCapacity = 1000
      
      twitter_stream.sinks.solrSink.type = org.apache.flume.sink.solr.morphline.MorphlineSolrSink
      twitter_stream.sinks.solrSink.channel = memoryChannel
      twitter_stream.sinks.solrSink.morphlineFile = morphlines.conf

      Click Save Changes.

    • Unmanaged CDH: If you copied the configuration templates as described in Copy Configuration Template Files, edit /etc/flume-ng/conf/flume.conf and comment out all sources except the spool source as follows:
      #agent.sources = twitterSrc
      #agent.sources = httpSrc
      agent.sources = spoolSrc
      #agent.sources = avroSrc
  5. Delete the spool directory if it exists, and then create a new spool directory. Run the following commands on the host running the Flume agent that you configured:
    rm -rf /tmp/myspooldir
    sudo -u flume mkdir /tmp/myspooldir
  6. Start the Flume agent:
    • Cloudera Manager: Flume service > Instances > Agent (select one) > Actions > Start this Agent. Make sure that you selected the same agent that you configured earlier.
    • Unmanaged CDH:
      sudo /etc/init.d/flume-ng-agent start
  7. Copy a file containing tweets to the /tmp/myspooldir directory. To ensure that no partial files are ingested, copy and then atomically move files. Run the following commands on the host running the Flume agent that you configured:
    • Cloudera Manager:
      sudo -u flume cp /opt/cloudera/parcels/CDH/share/doc/search*/examples/test-documents/sample-statuses-20120906-141433-medium.avro /tmp/myspooldir/.sample-statuses-20120906-141433-medium.avro
      sudo -u flume mv /tmp/myspooldir/.sample-statuses-20120906-141433-medium.avro /tmp/myspooldir/sample-statuses-20120906-141433-medium.avro
    • Unmanaged CDH:
      sudo -u flume cp /usr/share/doc/search*/examples/test-documents/sample-statuses-20120906-141433-medium.avro /tmp/myspooldir/.sample-statuses-20120906-141433-medium.avro
      sudo -u flume mv /tmp/myspooldir/.sample-statuses-20120906-141433-medium.avro /tmp/myspooldir/sample-statuses-20120906-141433-medium.avro
  8. Check the log for status or errors:
    tail /var/log/flume-ng/flume*.log 
  9. Check the completion status:
    find /tmp/myspooldir
  10. Run a Solr query. For example, for a Solr server running on search01.example.com, go to one of the following URLs in a browser, depending on whether you have enabled security on your cluster:
    • Security Enabled: https://search01.example.com:8985/solr/cloudera_tutorial_tweets/select?q=*%3A*&wt=json&indent=true
    • Security Disabled: http://search01.example.com:8983/solr/cloudera_tutorial_tweets/select?q=*%3A*&wt=json&indent=true
    If indexing was successful, this page displays the first 10 query results.