XenonStack - A Stack Innovator

Data Ingestion using Apache NiFi for Building Data Lakes - Twitter Data - Part 1

by Navdeep | July 07, 2017 |  Categories -  Apache Nifi, Apache Kafka, Apache Flink, Data Lake, Data Ingestion

Blog Image


What is Data Collection and Ingestion?


Data Collection and Data Ingestion are the processes of fetching data from any data source which we can perform in two ways -



In Today’s World, Enterprises are generating data from different Sources and building Real Time Data lake; we need to Integrate various sources of Data into One Stream.


In this Blog We are sharing how to Ingest, Store and Process Twitter Data using Apache Nifi and in Coming Blogs, we will be Sharing Data Collection and Ingestion from Below Sources


  • Data ingestion From Logs

  • Data Ingestion from IoT Devices

  • Data Collection and Ingestion from RDBMS (e.g., MySQL)

  • Data Collection and Ingestion from ZiP Files

  • Data Collection and Ingestion from Text/CSV Files


Objectives for the Data Lake


  • A Central Repository for Big Data Management

  • Reduce costs by offloading analytical systems and archiving cold data

  • Testing Setup for experimenting with new technologies and data

  • Automation of Data pipelines

  • MetaData Management and Catalog

  • Tracking measurements with alerts on failure or violations

  • Data Governance with clear distinction of roles and responsibilities

  • Data Discovery, Prototyping, and experimentation


Goals of Data Ingestion


Different Objectives and Goals of Data Ingestion.

Click to Tweet



Introduction to Apache NiFi


Apache NiFi provides an easy to use, the powerful, and reliable system to process and distribute the data over several resources.


Apache NiFi is used for routing and processing data from any source to any destination. The process can also do some data transformation.


It is a UI based platform where we need to define our source from where we want to collect data, processors for the conversion of the data, a destination where we want to store the data.


Each processor in NiFi have some relationships like success, retry, failed, invalid data, etc. which we can use while connecting one processor to another. These links help in transferring the data to any storage or processor even after the failure by the processor.


Benefits of Apache NiFi


  • Real-time/Batch Streaming

  • Support both Standalone and Cluster mode

  • Extremely Scalable, extensible platform

  • Visual Command and Control

  • Better Error handling


Core Features of Apache NiFi


  • Guaranteed Delivery - A core philosophy of NiFi has been that even at very high scale, guaranteed delivery is a must. It is achievable through efficient use of a purpose-built persistent write-ahead log and content repository.


  • Data Buffering / Back Pressure AND Pressure Release - NiFi supports buffering of all queued data as well as the ability to provide back pressure as those lines reach specified limits or to an age of data as it reaches a specified age (its value has perished).


  • Prioritized Queuing - NiFi allows the setting of one or more prioritization schemes for how data from a queue is retrieved. The default is oldest first, but it can be configured to pull newest first, largest first, or some other custom scheme.


  • Flow Specific QoS - There are points of a data flow where the data is critical, and it is less intolerant. There are also times when it must be processed and delivered within seconds to be of any value. NiFi enables the fine-grained flow particular configuration of these concerns.


  • Data Provenance - NiFi automatically records, indexes, and makes available provenance data as objects flow through the system even across fan-in, fan-out, transformations, and more. This information becomes extremely critical in supporting compliance, troubleshooting, optimization, and other scenarios.


  • Recovery / Recording a rolling buffer of fine-grained history - NiFi’s content repository is designed to act as a rolling buffer of history. As Data ages off, it is removed from the content repository or as space is needed.


  • Visual Command and Control - NiFi enables the visual establishment of data flows in real-time. And provide UI based approach to build different data flow.


  • Flow Templates - It also allows us to create templates of frequently used data streams. It can also help in migrating the data flows from one machine to another.


  • Security - NiFi supports Multi-tenant Authorization. The authority level of a given data flow applies to each component, allowing the admin user to have a fine grained level of access control. It means each NiFi cluster is capable of handling the requirements of one or more organizations.


  • Parallel Stream to Multiple Destination - With NiFi we can move data to multiple destinations at one time. After processing the data stream, we can route the flow to the various destinations using NiFi’s processor. It can be helpful when we need to back our data on multiple destinations.


What is Apache Nifi - A Complete Introduction to Benefits and Core Features of Apache Nifi

Click to Tweet


NiFi Clustering


When we require moving a large amount of data, then the only single instance of NiFi is not enough to handle that amount of data. So to handle this we can do clustering of the NiFi Servers, this will help us in scaling.


We just need to create the data flow on one node, and this will make a copy of this data flow on each node in the cluster.


NiFi introduces Zero-Master Clustering paradigm in Apache NiFi 1.0.0. A previous version of Apache NiFi based upon a single “Master Node” (more formally known as the NiFi Cluster Manager).


If the master node gets lost, data continued to flow, but the application was unable to show the topology of the flow, or show any stats. But in Zero-Master we can make changes from any node of the cluster.


And if master node disconnects, then automatically any active node is elected as Master Node.


Each node has the same the data flow, so they work on the same task as the other nodes are working, but each operates on the different datasets.


In NiFi cluster, one node is elected as the Master(Cluster Coordinator), and another node sends heartbeats/status information to the master node. This node is responsible for the disconnection of the other nodes that do not send any pulse/status information.


This election of the master node is done via Apache Zookeeper. And In the case when the master nodes get disconnected, Apache Zookeeper elects any active node as the master node.


Data Collection and Ingestion from Twitter using Apache NiFi to Build Data Lake



Fetching Tweets with NiFi’s Processor


NiFi’s ‘GetTwitter’ processor is used to fetch tweets. It uses Twitter Streaming API for retrieving tweets. In this processor, we need to define the endpoint which we need to use. We can also apply filters by location, hashtags, particular IDs.


  • Twitter Endpoint - Here we can set the endpoint from which data should get pulled. Available parameters - 


- Sample Endpoint - Fetch public tweets from all over the world.


- Firehose Endpoint - This is same as streaming API, but it ensures 100% guarantee delivery of tweets with filters.


- Filter Endpoint - If we want to filter by any hashtags or keywords


  • Consumer Key - Consumer key provided by Twitter.


  • Consumer Secret - Consumer Secret provided by Twitter.


  • Access Token - Access Token provided by Twitter.


  • Access Token Secret - Access Token Secret provided by Twitter.


  • Languages - Languages for which tweets should fetch out.


  • Terms to Filter - Hashtags for which tweets should fetch out.


  • IDs to follow - Twitter user IDs that should be followed.


Fetching Tweets With Nifi Processor


Now processor GetTwitter is ready for transmission of the data(tweets). From here we can move our data stream to anywhere like Amazon S3, Apache Kafka, ElasticSearch, Amazon Redshift, HDFS, Hive, Cassandra, etc. NiFi can move data multiple destinations parallelly.


Data Integration Using Apache NiFi and Apache Kafka


For this, we are using NiFi processor ‘PublishKafka_0_10’.


In the Scheduling Tab, we can configure how many concurrent tasks to be executed and schedule the processor.


In Properties Tab, we can set up our Kafka broker URLs, topic name, request size, etc. It will write data to the given topic. For the best results, we can create a Kafka topic manually of a defined partitions.


Apache Kafka can be used to process data with Apache Beam, Apache Flink, Apache Spark.


Data Integration Using Apache Nifi and Apache Kafta


Data Integration Using Apache Nifi and Apache Kafka


Data Integration Using Apache Nifi and Apache Kafka


Data Integration Using Apache NiFi to Amazon RedShift with Amazon Kinesis Firehose Stream




Now we integrate Apache NiFi to Amazon Redshift. NiFi uses Amazon Kinesis Firehose Delivery Stream to store data to Amazon Redshift.


This delivery Stream should get utilized for moving data to Amazon Redshift, Amazon S3, Amazon ElasticSearch Service. We need to specify this while creating Amazon Kinesis Firehose Delivery Stream.


Now we have to move data to Amazon Redshift, so firstly we need to configure Amazon Kinesis Firehose Delivery Stream. While delivering data to Amazon Redshift, firstly the data is provided to Amazon S3 bucket, and then Amazon Redshift Copy command is used to move data to Amazon Redshift Cluster.


We can also enable data transformation while creating Kinesis Firehose Delivery Stream. In this, we can also backup the data to another Amazon S3 bucket other than an intermediate bucket.


So for this, we will use processor PutKinesisFirehose. This processor will use that Kinesis Firehose stream for delivering data to Amazon Redshift. Here we will configure AWS credentials and Kinesis Firehose Delivery Stream.




Data Integration Using Apache NiFi to Amazon S3


Data Integration Using Apache Nifi to Amazon S3


PutKinesisFirehose sends data to both Amazon Redshift and uses Amazon S3 as the intermediator. Now if someone only wants to use Amazon S3 as the storage so NiFi can also use for sending data to Amazon S3 only.

For this, we need to use NiFi processor PutS3Object. In it, we have to configure our AWS credentials, bucket name, and path, etc.


Data Integration using Apache Nifi to Amazon S3


Partitioning in Amazon S3 Bucket


Most important aspect while storing data in S3 is partitioning. Here we can partition our data using expression language in the object key field. So Right now we have used day wise partitioning.


So tweets should be stored days folder. And this partitioning approach can be beneficial while doing twitter analysis. Suppose we want to analyze tweets for this day or this week.


So using partitioning, we don’t need to scan all tweets we stored in S3. We will just define our filters using partitions.


Expression Used: ${now():format('yyyy/MMM/dd')}/${filename}


It will create a path in our S3 Bucket like this: Year/Month/Date/filenames.


Data Integration Using Apache NiFi to Hive


For transferring data to Hive, NiFi has processors - PutHiveStreaming for which incoming flow file is expected to be in Avro format and PutHiveQL for which incoming FlowFile is projected to be the HiveQL command to execute.


Now we will use PutHiveStreaming for sending data to Hive. For twitter we have output data as JSON, so we need to convert it first to the Avro format and then we will send it to the PutHiveStreaming.


In PutHiveStreaming, we will configure our Hive Metastore URI, Database Name, and table name. For this, the table which we are using must exist in Hive.


Data Integration Using Apache Nifi to Hive


Data Integration Using Apache NiFi to ElasticSearch


Data Integration Using Apache Nifi to Elastic Search


Now we will visualize the incoming data in Kibana, for that we have routed the data to ElasticSearch.


Defining ElasticSearchHttp


For routing data ElasticSearch, we will use NiFi processor PutElasticSearchHttp. It will move the data to the defined ElasticSearch index. Here we have set our ElasticSearch’s URL, index, type, etc.


Data Integration Using Apache Nifi to Elastic Search


Now, this processor will write data twitter data to the ElasticSearch index. And firstly we need to create the index into ElasticSearch and need to do mapping manually for some fields like ‘created_at’ because we need this to type ‘Date.'


Visualization In Kibana


Setting Up Dashboard in Kibana


Firstly we need to add the created index into Kibana.


Visualization in Kibana


Visualization in Kibana


Integrating Apache Spark and NiFi for Data Lakes


Integrating Apache Spark and Nifi for Data Lake


Apache Spark is used widely for large data processing. Spark can process the data in both i.e. Batch processing Mode and Streaming Mode.


Apache NiFi to Apache Spark data transmission use site to site communication. And output port is used for publishing data from the source.


Integrating Apache Spark and Nifi For Data Lake


In the above data flow, we have used processor TailFile in which we have configured ‘nifi-app.log’ file to tail. It will send all the information to the output port ‘spark. Now, we can use this outport port while writing spark job.


In the same way, we can send out twitter records to any output port. And this output port can be further used for the spark streaming.


Integrating Apache Flink With Apache NiFi for Data Lake


Integrating Apache Flink With Apache Nifi For Data Lake


Apache Flink is an open source stream processing framework developed by the Apache Software Foundation. We can use this for stream processing, network/sensor monitoring, error detection, etc.


Apache NiFi to Apache Flink data transmission also uses the site to site communication. And output port is used for publishing data from the source.


Integrating Apache Flink with Apache Nifi For Data Lake


  • NiFi Source to pull data from the NiFi output port

  • NiFiSink to push data to NiFi input port

  • NiFiDataPacket to represent data to/from NiFi


Performance & Scaling Results For Apache NiFi


We have tested the data flows on four node Apache NiFi Cluster. We used NiFi processor GenerateFlowFile for load testing. This processor creates FlowFiles with random data or custom content. We have tested the data transmission to Amazon S3 and Apache Kafka.


The results shown in the table is the data processed by the NiFi(per five minutes)


Scaling Results For Apache Nifi


Note: These tests are performed using Amazon EC2 instances(m4.large). For Kafka, we have used three node Apache Kafka Cluster.


Data Ingestion Using Apache Nifi For Building Data Lakes Using Twitter Data

Click to Tweet


How Can XenonStack Help You?


XenonStack provides Secure Hadoop, Hive, HBase with Apache Knox, Apache Ranger Integration for Enterprise Data Lake.


Our Real Time Analytics Solutions help you integrate data from any source - API's, Databases, IoT Devices, CRM/ERP Files.


Our Big Data Analytics Solutions enables enterprises to Host On-Premises/Public Self Service Presto/Hive Cluster and can enable queries to direct on S3/HDFS/GlusterFS.


Now Build, Deploy Data lake on On-Premises, Hybrid Cloud, Apache Mesos, Amazon RedShift, Google BigQuery and Public Cloud. XenonStack Data Warehousing and Data Lake Solution enables the Monitoring as well as Visibility of the Data Flow.


Contact for initial assessment and Consulting.


XenonStack Offerings


XenonStack is a leading Software Company in Product Development and Solution Provider for DevOps, Big Data Integration, Real Time Analytics & Data Science.


Product NexaStack - Unified DevOps Platform Provides monitoring of Kubernetes, Docker, OpenStack infrastructure, Big Data Infrastructure and uses advanced machine learning techniques for Log Mining and Log Analytics.


Product ElixirData - Modern Data Integration Platform Enables enterprises and Different agencies for Log Analytics and Log Mining. 


Product Akira.AI is an Automated & Knowledge Drive Artificial Intelligence Platform that allows you to automate the Infrastructure to train and deploy Deep Learning Models on Public Cloud as well as On-Premises. 


Get 1 Hour Free Assessment for DevOps, Big Data Strategy, and Data Science. CONTACT US NOW

Share Post On Social Media

Related Posts

Build, Deploy, Manage & Secure Continuous Delivery Pipeline & Analytics Stack.

NexaStack - DevOps & Serverless Computing Platform

Elixir Data - Modern Data Integration Platform

Contact For Free Assessment

Chat With Our Experts

ChatContact Us

Your Information Submitted Successfully!