XenonStack Recommends

Big Data Engineering

Real Time Streaming Application with Apache Spark

Navdeep Singh Gill | 09 May 2023

Apache Spark Streaming

What is Apache Spark?

Apache Spark is a fast, in-memory data processing engine with expressive development APIs to allow data workers to execute streaming conveniently. With Spark running on Apache Hadoop YARN, developers everywhere can now create applications to exploit Spark’s power, derive insights, and enrich their data science workloads within a single, shared dataset in Apache Hadoop.

What is Streaming?

It is unstructured data that is generated continuously by thousands of data sources. This Streaming data includes a wide variety of data such as log files generated by customers using your mobile or web applications, in-game player activity, information from social networks, Financial trading, and telemetry from connected devices or instrumentation in data centers.

Each Dataset in Spark Resilient Distributed Dataset is divided into logical partitions across the cluster. Click to explore about our, RDD in Apache Spark

What is Structured Streaming?

It limits what can express that enables optimizations since we can perform complex data computation that is not possible in stream processing. Structure Streaming

Why we use Structured Streaming?

In it any data stream treat as unbound data: new records added to the stream are like rows being appended to the table. This allows us to treat both batch and streaming data as tables. DataFrame/Dataset queries can apply to both batch and streaming data. Users describe the query they want to run, the input and output locations, and a few more details optionally.

Real-Time Streaming with Apache Spark, Apache Nifi, and Apache Kafka

Real-Time Streaming with Apache Kafka

What is Data Collection Layer?

The data collection layer is the very first layer where we ingest data from different locations. For this, we will define our data flow pipelines by using the different data sources in Apache NiFi or Apache MiNiFi.
  • File System
  • Cloud bucket(Google storage, Amazon S3)
  • Databases(MySql, Postgre, MongoDB, Cassandra)
  • Real-time stream from the IoT devices

Apache Nifi

Nifi is a data flow automation tool that allows users to send, receive, route, transform, and sort data as needed in an automated and configurable way. Nifi has a user-friendly drag and drops graphical user interface that allows users to create data flow per their requirements. Apache NiFi has its predefined processor used for fetching/pushing data to any data source. We can also perform some data transformations in Apache NiFi by using predefined processors.

What is Data Routing Layer?

This layer refers to the layer that will send data to multiple different data sources, i.e., Cloud bucket, database, and local file system. This we can define in our Apache NiFi flow. Apache NiFi enables the routing of the data to multiple destinations in parallel.
Fast data is not about just volume of data, we measure volume but concerning its incoming rate. Volume and Velocity both are considered while talking about Fast Data Source- A Guide to Streaming Analytics

Apache Kafka

Apache Kafka is a distributed publish-subscribe messaging system used to ingest real-time data streams and make them available to the consumer in a parallel and fault-tolerant manner. Kafka is suitable for building a real-time streaming data pipeline that reliably moves data between different processing systems. Kafka consists of Topics, Consumers, Producers, Brokers, Partitions, and Clusters. Kafka's topics are divided into many partitions. Partitions allow you to parallelize the topic by splitting the data in a particular topic across multiple brokers. Each partition can be placed on a separate machine, allowing multiple consumers to read from a topic in parallel.

Apache Kafka use cases.

  • Stream Processing
  • Metrics Collection and Monitoring
  • Website activity tracking

What are the Data Transformation Layers?

Below given are the Data Transformation layers.

Apache Spark Structured Streaming

It is the new Apache Spark streaming model and framework built on the SQL engine. Introduce in its 2.0 version too, provides fast, scalable, fault-tolerant, and low-latency processing. The main idea is that you should not have to reason about it but instead use a single API for both operations. Thus it allows you to write batch queries on your streaming data. It provides dataset/data frame API in Scala, Java, Python, or R to express its aggregation,event-time windows, and stream-to-bath join.

Dataframe

The Data frame is a distributed collection of data organized in a named column and row. It is similar to the table in the relational database with proper optimization. Dataframe comes into existence to deal with both structured and unstructured data formats—for example, Avro, CSV, elastic search, and Cassandra.

Dataset

The dataset is a data structure in SparkSQL that is strongly typed and mapped to a relational schema. It is an extension to the data frame API that represents structured queries with encoders. Spark Dataset provides both type safety and object-oriented programming interface.
A cluster computing platform intended to be fast and general-purpose, it is an open-source, extensive range data processing engine. Source: Apache Spark Architecture and Use Cases

How to read Streaming data from Kafka through Apache Spark?

It provides a tied-in batch and streaming API to view data published to Kafka as a DataFrame. The first step is to specify the location of our Kafka cluster and the topic to read. Spark allows you to read an individual topic, like a specific set of topics, a regex pattern of topics, or even a specific set of partitions belonging to a set of topics.
    Import org.apache.spark.sql.function._
   Create sparkSession
   val spark = SparkSession
    .builder
    .appName("Spark-Kafka-Integration")
    .master("local")
    .getOrCreate()
In general, a session means interaction between two or more entities. But in Apache Spark, SparkSession creates a single point of an entity to interact with underlying spark functionality and allow programming spark with data frames and dataset APIs.
  Connecting to kafka topic Val df = spark.readStream.format("kafka").option(
  "kafka.bootstrap.servers", "host1:port1,host2:port2"
).option("subscribe", "topic1").option("starting Offsets", "earliest").load() df.printSchema() reveals the schema of our DataFrame.root | -- key: binary (nullable = true)
| -- value: binary (nullable = true)
| -- topic: string (nullable = true)
| -- partition: integer (nullable = true)
| -- offset: long (nullable = true)
| -- timestamp: timestamp (nullable = true)
| -- timestampType: integer (nullable = true)
The returned DataFrame (pdf) contains all the intimate fields of a Kafka record and its associated metadata. We can now use all of the intimate DataFrame or Dataset operations to transform the result.

Streaming ETL

Now the stream is set up. We can start doing the necessary ETL ( Extract, Transform, and Load) to extract meaningful information. Let's say that real-time streaming data push by NIfi to Kafka as below.
 
 {
  "city": "",
  "country": "India",
  "countryCode": "+91”
  "lat": 0.00,
  "regionName": "Mumbai"
  "status": "success"
  "zip": ""
 }
It is now possible to analyze quickly, such as how many users are coming from India.
    
val result = df.select(get_json_object(($ "value").cast("string"), "$.country")
  .alias("countByCountry"))
 .groupBy("countByContry")
 .county()

result.writeStream.format("kafka")
 .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
 .option("topic", "topic1").start
Now, we can parse the country out of incoming JSON messages, group them, and count, all in real-time, as we are reading data from Kafka topic. The spark streaming job works in the background and continuously updates the counts as new messages arrive.

Low Latency Continuous Processing Mode in Structured Streaming

Continuous Processing is a new Execution engine in Spark 2.3 that allows very low end-to-end latency with at least one fault tolerance guarantee. Compare this with the default micro- batch processing engine, which can achieve exactly-once guarantees latency of 1 second. It spark wait for 1 second, and batches together all the events received during that interval into a micro-batch. Driver schedule this micro-batch to execute as tasks at the Executors.

After a micro-batch execution is complete, collect the next batch is and reschedule. This scheduling is frequently complete to give an impression of its execution. However, low latency doesn’t come without any costs. In fact, faster processing decreases the delivery guarantees to at least once from exactly once. So the advice of continuous execution for the system where the processing latency is more important than the delivery guarantee.


val result = spark.writeStream.format("kafka")
 .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
 .option("topic", "topic1")
 .trigger(Trigger.Continuous(" 1 seconds ")) //only this line include for continuous
 .start()

For continuous mode, you can choose which mode helps to execute without modifying the application logic. To run a query in a continuous processing mode, all you need to do is specify a continuous trigger with the desired checkpoint interval as the parameter.No need to change the logic of the code.

serverless-real-time-data
Apache Spark is a high-performing large-scale analytics and data processing engine that offers high batch and interactive processing performance. Apache Spark Development Solutions

Conclusion

In this, we will consume and transform complex data streaming from Apache Kafka using this API. We can express complex transformations like exactly-once event-time aggregation and output the results to a variety of systems.The system then runs their query incrementally, maintaining enough state to recover from failure, keep the results consistent in external storage.