Analyze Transit Patterns with Apache NiFi
- NiFi DataFlow Automation Concepts
- Launch NiFi HTML UI
- Build a NiFi Process Group to Simulate NextBus API
- Build a NiFi Process Group to Parse Transit Events
- Build a NiFi Process Group to Validate the GeoEnriched Data
- Build a NiFi Process Group to Store Data As JSON
- Integrate NextBus API To Pull In Transit Live Feed
You will build a NiFi DataFlow and package it into a process group to simulate the NextBus API transit feed and check the data generating from the simulator.
Figure 1: Here is a visualization of the transit data you will be ingesting into NiFi.
- Completed the prior tutorials within this tutorial series
- Approach 1: Build SimulateXmlTransitEvents Process Group
- Step 1: Create a Process Group
- Step 2: Add GetFile to Ingest NextBus Data Seed
- Step 3: Add UnpackContent to Decompress the Zipped Data
- Step 4: Add ControlRate to Regulate Data Flow Speed
- Step 5: Add UpdateAttribute to Make Each FlowFile Name Unique
- Approach 2: Import NiFi SimulateXmlTransitEvents Process Group
- Further Reading
If you prefer to build the dataflow manually step-by-step, continue on to Approach 1. Else if you want to see the NiFi flow in action within minutes, refer to Approach 2.
1. Go to the components toolbar, drag and drop the Label icon onto the NiFi canvas.
2. Click on the right bottom corner and stretch the label over approximately 24 squares.
3. Right click, select configure and name it
Generate Transit Location via Data Seed based on NextBus API. Change the font size to
1. Go to the components toolbar, drag and drop the Process Group icon onto the NiFi canvas. Name it
SimulateXmlTransitEvents. Press ADD.
2. Double click on new process group. At the bottom left corner, breadcrumbs will show you entered SimulateXmlTransitEvents Process Group.
1. Add the processor icon onto the graph.
2. Select the GetFile processor and press ADD.
- Creates FlowFiles from files in a directory. NiFi will ignore files it doesn’t have read permissions for.
3. Right click on the GetFile processor, click configure from dropdown menu
4. Enter the Properties tab. Add the properties listed in Table 1 to update the processor's appropriate properties. Press OK after changing a property.
Table 1: Update GetFile Properties Tab
Input Directory location at which data is ingested into the dataflow
Keep Source File source files in directory remain after all data is ingested
5. Once each property is updated, go to the Scheduling tab, add the configuration information listed in Table 2.
Table 2: Update GetFile Scheduling Tab
- Run Schedule the processor executes a task every 6 seconds
6. Click Apply.
1. Add the UnpackContent processor onto the NiFi canvas.
2. Connect GetFile to UnpackContent processor by dragging the arrow icon from the first processor to the next. When the Create Connection window appears, verify success checkbox is checked, else check it. Click Add.
Figure 2: Arrow Icon Appears When Hovering Over GetFile Processor
Figure 3: Drag Arrow Icon to Connect two Processors (GetFile -> UnpackContent)
Figure 4: "Create Connection" Window for GetFile -> "UnpackContent" -> success checkbox checked.
3. Open the UnpackContent processor configuration properties tab. Add the properties listed in Table 3 to update the processor's appropriate properties.
Table 3: Update UnpackContent Properties Tab
- Packaging Format tells the processor of packaging format used to decompress the file
4. Once each property is updated, enter the Scheduling tab, add the configuration information listed in Table 4.
Table 4: Update UnpackContent Scheduling Tab
- Run Schedule the processor executes a task every 1 second and avoids back pressure downstream to the next processor
5. Open the processor config Settings tab, under Auto terminate relationships, check the failure and original checkboxes. Click Apply.
Figure 5: UnpackContent Settings Tab Window
1. Add the ControlRate processor onto the NiFi canvas.
2. Connect UnpackContent to ControlRate processor. When the Create Connection window appears, verify success checkbox is checked, else check it. Click Add.
Figure 6: Connect UnpackContent to ControlRate
3. Open the processor configuration properties tab. Add the properties listed in Table 5 to update the processor's appropriate properties.
Table 5: Update ControlRate Properties Tab
Rate Control Criteria instructs the processor to count the number of FlowFiles before a transfer takes place
Maximum Rate instructs the processor to transfer 20 FlowFiles at a time
Time Duration makes it so only 20 FlowFiles will transfer through this processor every 6 seconds.
4. Open the processor config Settings tab, under Auto terminate relationships, check the failure checkbox. Click Apply.
1. Add the UpdateAttribute processor onto the NiFi canvas.
2. Connect ControlRate to UpdateAttribute. When the Create Connection window appears, verify success checkbox is checked, else check it. Click Add.
Figure 7: Connect ControlRate to UpdateAttribute
3. Add a new dynamic property for NiFi expression, click on the plus button "+" in the top right corner. Insert the following property name and value into your properties tab as shown in the table below:
Table 6: Add UpdateAttribute Properties Tab
Figure 8: UpdateAttribute Property Tab Window
- filename updates each FlowFile with a unique identifier
4. Click Apply.
1. Add the PutFile processor onto the NiFi canvas.
2. Connect UpdateAttribute to PutFile. When the Create Connection window appears, verify success checkbox is checked, else check it. Click Add.
Figure 9: Connect UpdateAttribute to PutFile
3. Open the processor configuration properties tab. Add the property listed in Table 7 to update the processor's appropriate properties.
Table 7: Update PutFile Properties Tab
4. Open the processor config Settings tab, under Auto terminate relationships, check the failure and success checkboxes. Click Apply.
1. Add the Output Port component onto the NiFi canvas. Name it
2. Connect UpdateAttribute to RawTransitEvents output port. When the Create Connection window appears, verify success checkbox is checked, else check it. Click Add.
Figure 10: Connect UpdateAttribute to Output Port (RawTransitEvents)
1. With the NiFi DataFlow unselected, hit the start button located in the Operate Palette to activate the SimulateXmlTransitEvents process group dataflow.
2. Let the flow run for about 1 minute, then stop the flow by hitting the stop button.
3. Launch Sandbox Web Shell Client via Cloudera DataFlow (CDF) Splash Screen from Advanced HDF SB Quick Links Link.
Figure 11: SB Quick Link for Web Shell Client
Note: Username is "root", initial password is "hadoop".
You have now SSH'd into the CDF Sandbox Server.
4. Navigate to the output directory in which the transit data is being written to:
5. Run the
ls command to list files in the current directory:
6. Run the
cat command to output the content of the data file to the console:
Figure 12: Output of Transit Data in Web Shell Client
1. Download the tutorial-3-nifi-flow-SimulateXmlTransitEvents.xml template file.
2. Use the template icon located in the Operate Palette.
3. Browse, find the template file, click Open and hit Upload.
4. From the Components Toolbar, drag the template icon onto the graph and select the tutorial-3-nifi-flow-SimulateXmlTransitEvents.xml template file.
5. Right click on the process group, hit the start button to activate the dataflow.
Figure 13: NiFi Flow that pulls in San Francisco Muni Transit Events from the XML Simulator, stores the raw transit events to the local file system and also sends the data out to the rest of the flow through the output port.
The building blocks of every dataflow consist of processors. When sections of the flow become complex, you can start grouping processors inside process groups. These tools perform actions on data to ingest, route, extract, split, aggregate or store it. Our dataflow process group above contains processors, each processor includes a high level description of their role in the tutorial:
- SimulateXmlTransitEvents (Process Group)
- GetFile fetches the vehicle location simulator data for files in a directory.
- UnpackContent decompresses the contents of FlowFiles from the traffic simulator zip file.
- ControlRate controls the rate at which FlowFiles are transferred to follow-on processors enabling traffic simulation.
- UpdateAttribute renames every FlowFile to give them unique names
- PutFile stores data to local file system
- Output Port makes the connection for the process group to connect to other components (process groups, processors, etc)
Refer to NiFi's Documentation to learn more about each processor described above.
Congratulations! You just built a NiFi SimulateXmlTransitEvents process group to replicate the NextBus API, which generates transit data for passengers. You learned to use GetFile processor to ingest the transit data seed. The UnpackContent processor decompressed the transit data seed zip file and routed the data onto the rest of the flow. ControlRate processor controlled the rate at which each FlowFile was distributed to the remaining components of the flow. The Output Port allows for FlowFiles to be dispersed to the next process group you will learn to build in the next tutorial called ParseTransitEvents.
- Process Group
- Under "Adding Components to the Canvas," head to Output Port