XenonStack Recommends

Big Data Engineering

Batch and Real Time Data Ingestion with Apache NiFi for Data Lake

Navdeep Singh Gill | 30 April 2023

Introduction to Data Lake

If you are looking for an enterprise-level solution to reduce the cost and complexity of big data, using Data Lake services is the best and most cost-efficient solution. Moreover, it is easy and self-service management that requires little to no coding. Data Ingestion with Apache NiFi for Data Lake delivers easy-to-use, powerful, and reliable solutions to process and distribute data over several resources. NiFi works in both standalone mode and cluster mode. Apache NiFi is used for routing and processing data from any source to any destination. The process can also do some Data Transformation. It is a UI-based platform where we need to define our source from where we want to collect data, processors for converting the data, a destination where we want to store the data.

What is Big Data Analytics and Ingestion?

Data Collection and Data Ingestion are the processes of fetching data from any data source which we can perform in two ways -
  • Real-time Streaming
  • Batch Streaming
In Today’s World, Enterprises are generating data from different sources and building a Real-Time Data lake; we need to Integrate various sources of Data into One Stream. The problems and solution The problems and solution patterns that emerged have been discussed and articulated extensively. Patterns that emerged have been discussed and articulated extensively in this Blog. We are sharing how to Ingest, Store, and Process. Twitter Data using Apache Nifi, and in Coming Blogs, we will be Sharing Data Collection and Ingestion from the Below Sources.
  • Data ingestion From Logs
  • Data Ingestion from IoT Devices
  • The Data Collection and Ingestion from RDBMS (e.g., MySQL)
  • Data Collection and Ingestion from Zip Files
  • The Data Collection and Ingestion from Text/CSV Files

What are the Data Lake Objectives for Enterprise?

Below are the objectives of the enterprise data lake
  • A Central Repository for Big Data Management
  • Reduce costs by offloading analytical systems and archiving cold data
  • Testing Setup for experimenting with new technologies and data
  • Automation of Data pipelines
  • Metadata Management and Catalog
  • Tracking measurements with alerts on failure or violations
  • Data Governance with a clear distinction of roles and responsibilities
  • Data Discovery, Prototyping, and experimentation
goals-of-data-ingestion

What is Apache NiFi and its Architecture?

Apache NiFi provides an easy-to-use, powerful, and reliable system to process and distribute data over several resources. NiFi helps to route and process data from any source to any destination. The process can also do some data transformation. It is a UI-based platform where we need to define our source from where we want to collect data, processors for converting the data, a destination where we want to store the data. Each processor in NiFi has some relationships like success, retry, failed, invalid data, etc., which we can use while connecting one processor to another. These links help transfer the data to any storage or processor even after the failure by the processor.
xenonstack-dat-fabric-efficient-data-operations
Want to accelerate your adoption of Apache NiFi to be more productive? Get expert advice from Apache NiFi Expert.Apache NiFi for Data Lake

What are the benefits of Apache NiFi?

  • Real-time/Batch Streaming
  • Support both Standalone and Cluster mode
  • Extremely Scalable, extensible platform
  • Visual Command and Control
  • Better Error handling

What are the Features of Apache NiFi?

  • Guaranteed Delivery - A core philosophy of NiFi has been that guaranteed delivery is a must, even at a very high scale. It is achievable through the efficient use of a purpose-built persistent write-ahead log and content repository.
  • Data Buffering / Back Pressure and Pressure Release - Apache NiFi supports buffering of all queued data as well as the ability to provide back pressure as those lines reach specified limits or to an age of data as it reaches a specified age (its value has perished).
  • Prioritized Queuing - Apache NiFi allows setting one or more prioritization schemes for how to retrieve data from a queue. The default is oldest first, but it can be configured to pull the newest first, largest first, or another custom scheme.
  • Flow Specific QoS - There are points of a data flow where the data is critical and less intolerant. There are also times when it must be processed and delivered within seconds to be of any value. Apache NiFi enables the fine-grained flow of particular configurations of these concerns.
  • Data Provenance - Apache NiFi automatically records, indexes, and makes available provenance data as objects flow through the system, even across fan-in, fan-out, transformations, and more. This information becomes extremely critical in supporting compliance, troubleshooting, optimization, and other scenarios.
  • Recovery / Recording a rolling buffer of fine-grained history - Apache NiFi’s content repository is designed to act as a rolling history buffer. As Data ages off, remove it from the content repository or as space need.
  • Visual Command and Control - Apache NiFi enables the visual establishment of data flows in real time. And provide a UI-based approach to building different data flows.
  • Flow Templates -It also allows us to create templates of frequently used data streams. It can also help in migrating the data flows from one machine to another.
  • Security - Apache NiFi supports Multi-tenant Authorization. The authority level of a given data flow applies to each component, allowing the admin user to have a fine-grained level of access control. It means each NiFi cluster is capable of handling the requirements of one or more organizations.
  • Parallel Stream to Multiple Destination -With Apache NiFi, we can move data to multiple destinations at one time. After processing the data stream, we can route the flow to the various destinations using NiFi’s processor. It can be helpful when we need to back our data on multiple destinations.
The problems and solution patterns that emerged have been discussed and articulated extensively. Source- Apache Nifi

What is Apache NiFi Clustering?

When we require moving a large amount of data, only a single instance of Apache NiFi is insufficient to handle that amount of data. So to handle this, we can do clustering of the NiFi Servers. This will help us in scaling. We need to create the data flow on one node, and this will make a copy of this data flow on each node in the cluster. Apache NiFi introduces the Zero-Master Clustering paradigm in Apache NiFi 1.0.0. A previous Apache NiFi was based upon a single “Master Node” (more formally known as the NiFi Cluster Manager). If the master node gets lost, data continues to flow, but the application cannot show the topology of the flow or show any stats. But in Zero-Master, we can make changes from any node of the cluster.

Each node has the same data flow, so they work on the same task as the other nodes are working, but each operates on different datasets. In the Apache NiFi cluster, one node is elected as the Master (Cluster Coordinator), and another node sends heartbeats/status information to the master node. This node is responsible for the disconnection of the other nodes that do not send any pulse/status information. This election of the master node is done via Apache Zookeeper. And In the case when the master nodes disconnect, Apache Zookeeper elects any active node as the master node.

Building Data Lake using Apache NiFi

Building Data Lake using Apache Nifi required the following:
  • Amazon S3
  • Amazon Redshift
  • Apache Kafka
  • Apache Spark
  • ElasticSearch
  • Apache Flink
  • Apache Hive
AWS Data Pipeline helps you sequence, schedule, run, and manage recurring data processing workloads reliably and cost-effectively. Source: AWS Big Data Pipeline on Cloud

Fetching Tweets with NiFi’s Processor

NiFi’s ‘ GetTwitter’ processor is used to fetch tweets. It uses Twitter Streaming API for retrieving tweets. In this processor, we need to define the endpoint which we need to use. We can also apply filters by location, hashtags, and particular IDs.
  • Twitter Endpoint - Here, we can set the endpoint from which data should get pulled. Available Parameters -
  • Sample Endpoint - Fetch public tweets from all over the world.
  • Firehose Endpoint - This is the same as streaming API, but it ensures 100% guaranteed delivery of tweets with filters.
  • Filter Endpoint - If we want to filter by any hashtags or keywords
  • Consumer Key -Consumer key provided by Twitter.
  • Consumer Secret -Consumer video by Twitter.
  • Access Token -Access Token provided by Twitter.
  • Access Token Secret -Access Token Secret is provided by Twitter.
  • Languages - Languages for which tweets should fetch out.
  • Terms to Filter -Hashtags for which tweets should fetch out.
  • IDs to follow -Twitter user IDs that should be followed.
Now processor GetTwitter is ready for the transmission of the data(tweets). From here, we can move our data stream anywhere, like Amazon S3, Apache Kafka, ElasticSearch, Amazon Redshift, HDFS, Hive, Cassandra, etc. NiFi can move data to multiple destinations parallelly.

Data Lake using Apache NiFi and Apache Kafka

For this, we are using the NiFi processor ‘PublishKafka_0_10’. In the Scheduling tab, we can configure how many concurrent tasks to execute and schedule the processor. In the Properties Tab, we can set up our Kafka broker URLs, topic names, request sizes, etc. It will write data on the given topic. For the best results, we can create a Kafka topic manually of defined partitions. Apache Kafka uses to process data with Apache Beam, Flink, Spark.Configuring Processor for Data Integration Using Apache Nifi and Apache Kafka Data Lake Services are designed to preserve all attributes, especially when the scope of the data is unknown.

Data Integration Using Apache Nifi and Apache Kafka

Integrating Apache NiFi to Amazon Redshift using Amazon Kinesis

Now we integrate Apache NiFi to Amazon Redshift. NiFi uses Amazon Kinesis Firehose Delivery Stream to store data to Amazon Redshift. This delivery Stream should get utilized for moving data to Amazon Redshift, Amazon S3, Amazon ElasticSearch Service. We need to specify this while creating Amazon Kinesis Firehose Delivery Stream. We have to move data to Amazon Redshift, so we need to configure Amazon Kinesis Firehose Delivery Stream. While delivering data to Amazon Redshift, firstly, the data is provided to the Amazon S3 bucket.

Then the Amazon Redshift Copy command is helpful to move data to Amazon Redshift Cluster. We can also enable data transformation while creating Kinesis Firehose Delivery Stream. In this, we can also back up the data to another Amazon S3 bucket other than an intermediate bucket. So for this, we will use the processor PutKinesisFirehose. This processor will use the Kinesis Firehose stream for delivering data to Amazon Redshift. Here we will configure AWS credentials and Kinesis Firehose Delivery Stream.

Big Data Integration Using Apache NiFi to Amazon S3

PutKinesisFirehose sends data to both Amazon Redshift and uses Amazon S3 as the intermediator. If someone only wants to use Amazon S3 as storage, NiFi can also send data to Amazon S3 only. For this, we need to use NiFi processor PutS3Object. In it, we have to configure our AWS credentials, bucket name, and path, etc. Configuring Processor for Data Integration using Apache Nifi to Amazon S3

Partitioning in Amazon S3 Bucket

The most important aspect while storing data in S3 is partitioning. Here we can partition our data using expression language in the object key field. So Right now, we have used day-wise partitioning. So tweets should be stored days folder. And this partitioning approach can be beneficial while doing Twitter analysis. Suppose we want to analyze tweets for this day or this week. So using partitioning, we don’t need to scan all tweets we stored in S3. We will define our filters using partitions. Expression Used: ${now():format('yyyy/MMM/dd')}/${filename} It will create a path in our S3 Bucket like this: Year/Month/Date/filenames.

Data Lake Services using Apache NiFi to Hive

For transferring data to Apache Hive, NiFi has processors - PutHiveStreaming, for which the incoming flow file is expected to be in Avro format, and PutHiveQL, for which incoming FlowFile is projected to be the HiveQL command to execute. Now we will use PutHiveStreaming to send data to Hive. We have output data as JSON for Twitter, so we need to convert it first to the Avro format, and then we will send it to PutHiveStreaming. In PutHiveStreaming, we will configure our Hive Metastore URI, Database Name, and table name. For this, the table that we are using must exist in the Hive.Data Lake using Apache NiFi to ElasticSearchData Integration Using Apache Nifi to Elastic Search Now we will visualize the incoming data in Kibana. For that, we have routed the data to ElasticSearch.

Defining ElasticSearch HTTP-basic

For routing data ElasticSearch, we will use the NiFi processor PutElasticSearchHttp. It will move the data to the defined ElasticSearch index. Here we have set our ElasticSearch’s URL, index, type, etc. Configuring Processor For Data Integration Using Apache Nifi to Elastic Search Now, this processor will write data Twitter data to the ElasticSearch index. And firstly, we need to create the index into ElasticSearch and do mapping manually for some fields like ‘created_at’ because we need this to type ‘Date.'

Big Data Visualization in Kibana for Data Lake Services

Implementing Big Data Visualization in Kibana is for:

Setting Up Dashboard in Kibana

Firstly we need to add the created index into Kibana. real-time-data-ingestion-with-apache-nifi kibana-real-time-dashboards

How to integrate Apache Spark and NiFi for Data Lake Services?

Integrating Apache Spark and Nifi for Data Lake Using Apache Spark widely for large data processing. Spark can process the data in both, i.e., Batch processing Mode and Streaming Mode. Apache NiFi to Apache Spark data transmission use site to site communication. And the output port is used for publishing data from the source. Integrating Apache Spark and Nifi For Data Lake In the above data flow, we have used processor TailFile in which we have configured the ‘nifi-app.log’ file to tail. It will send all the information to the output port's spark. Now, we can use this outport port while writing spark jobs. In the same way, we can send out twitter records to any output port. And this output port can be further helpful for spark streaming.

How to Integrate Apache Flink With Apache NiFi for Data Lake?

Integrating Apache Flink With Apache Nifi For Data Lake It is an open-source stream processing framework developed by the Apache Software Foundation. We can use this for stream processing, network/sensor monitoring, error detection, etc. We have tested the data transmission to Amazon S3 and Kafka. NiFi to Apache Flink data transmission also uses the site to site communication. And the output port is helpful for publishing data from the source. Integrating Apache Flink with Apache Nifi For Data Lake
  • NiFi Source to pull data from the NiFi output port
  • NiFiSink to push data to NiFi input port
  • NiFiDataPacket to represent data to/from NiFi

Performance & Scaling Results For Apache NiFi

We have tested the data flows on the four-node Apache NiFi Cluster. We used NiFi processor GenerateFlowFile for load testing. This processor creates FlowFiles with random data or custom content. The results shown in the table is the data processed by the NiFi(per five minutes) apache-nifi-performance-scaling Note: These tests perform using Amazon EC2 instances(m4.large). For Kafka, we have used a three-node Apache Kafka Cluster.

How Can XenonStack Help You?

XenonStack provides Data lake Services for building Real-time and streaming Big Data Analytics applications for IoT and Predictive analytics using Cloud Data Warehouse for enterprises and Startups.