Xenonstack Recommends

Real Time Streaming Application with Apache Spark

Acknowledging Data Management
          Best Practices with DataOps

Subscription

Apache Spark Overview

Apache Spark is a fast, in-memory data processing engine with expressive development APIs to allow data workers to execute streaming conveniently. With Spark running on Apache Hadoop YARN, developers everywhere can now create applications to exploit Spark’s power, derive insights, and enrich their data science workloads within a single, shared dataset in Apache Hadoop. In this, we will consume and transform complex data streaming from Apache Kafka using this API. We can express complex transformations like exactly-once event-time aggregation and output the results to a variety of systems.

What is Streaming?

Streaming is unstructured data that is generated continuously by thousands of data sources. This Streaming data includes a wide variety of data such as log files generated by customers using your mobile or web applications, in-game player activity, information from social networks, Financial trading, and telemetry from connected devices or instrumentation in data centers. You can also explore more about real-time streaming data lake in this blog.

What is Structured Streaming?

Structure streaming limits what can express that enables optimizations since we can perform complex data computation that is not possible in stream processing. Structure Streaming

Why we use Structured Streaming?

In structure streaming, any data stream treat as unbound data: new records added to the stream are like rows being appended to the table. This allows us to treat both batch and streaming data as tables. DataFrame/Dataset queries can apply to both batch and streaming data. Users describe the query they want to run, the input and output locations, and optionally a few more details. The system then runs their query incrementally, maintaining enough state to recover from failure, keep the results consistent in external storage.

Real-Time Streaming with Apache Spark, Apache Nifi, and Apache Kafka

Real-Time Streaming with Apache Kafka

What is Data Collection Layer?

The data collection layer is the very first layer where we ingest data from different locations. For this, we will define our data flow pipelines by using the different data sources in Apache NiFi or Apache MiNiFi.
  • File System
  • Cloud bucket(Google storage, Amazon S3)
  • Databases(MySql, Postgre, MongoDB, Cassandra)
  • Real-time stream from the IoT devices

Apache Nifi

Nifi is a data flow automation tool that allows users to send, receive, route, transform, and sort data as needed in an automated and configurable way. Nifi has a user-friendly drag and drops graphical user interface that allows users to create data flow per their requirements. Apache NiFi has its predefined processor used for fetching/pushing data to any data source. We can also perform some data transformation in Apache NiFi by using predefined processors.

What is Data Routing Layer?

This layer refers to the layer that will send data to multiple different data sources, i.e., Cloud bucket, database, local file system. This we can define in our Apache NiFi flow. Apache NiFi enables the routing of the data to multiple destinations in parallel.
Fast data is not about just volume of data, we measure volume but concerning its incoming rate. Volume and Velocity both are considered while talking about Fast Data Source- Real-Time and Streaming Architecture with Tools

Apache Kafka

Apache Kafka is a distributed publish-subscribe messaging system used to ingest real-time data streams and make them available to the consumer in a parallel and fault-tolerant manner. Kafka is suitable for building a real-time streaming data pipeline that reliably moves data between different processing systems. Kafka consists of Topics, Consumers, Producers, Brokers, Partitions, and Clusters. Kafka's topics are divided into many partitions. Partitions allow you to parallelize the topic by splitting the data in a particular topic across multiple brokers. Each partition can be placed on a separate machine to allow multiple consumers to read from a topic in parallel.

Apache Kafka use cases.

  • Stream Processing
  • Metrics Collection and Monitoring
  • Website activity tracking

What are the Data Transformation Layers?

Below given are the Data Transformation layers.

Apache Spark Structured streaming

Structured Streaming is the new streaming model of the Apache Spark framework build on the SQL engine. Introduce it in Apache Spark 2.0 version too provides fast, scalable, fault-tolerant, and low latency processing. The main idea is that you should not have to reason about streaming but instead use a single API for both streaming and batch operations. Thus it allows you to write batch queries on your streaming data. Structured Streaming provides dataset/data frame api in Scala, Java, Python, or R to express streaming aggregation,event-time windows, and stream-to-bath join.

Dataframe

The Data frame is a distributed collection of data organized in a named column and row. It is similar to the table in the relational database with proper optimization. Dataframe comes into existence to deal with both structured and unstructured data formats. For example, Avro, CSV,elasticsearch, and Cassandra.

Dataset

Dataset is a data structure in SparkSQL that is strongly typed and mapped to a relational schema. It is an extension to the data frame API that represents structured queries with encoders. Spark Dataset provides both type safety and object-oriented programming interface.
Apache Spark is a cluster computing platform intended to be fast and general-purpose, it is an open-source, extensive range data processing engine. Source: Apache Spark Architecture and Use Cases

How to read Streaming data from Kafka through Apache Spark?

Structured Streaming provides a tied-in batch and streaming API to view data published to Kafka as a DataFrame. The first step is to specify the location of our Kafka cluster and the topic to read. Spark allows you to read an individual topic, like a specific set of topics, a regex pattern of topics, or even a specific set of partitions belonging to a set of topics.
    Import org.apache.spark.sql.function._
   Create sparkSession
   val spark = SparkSession
    .builder
    .appName("Spark-Kafka-Integration")
    .master("local")
    .getOrCreate()
In general, a session means interaction between two or more entities. But in Apache spark, SparkSession creates a single point of an entity to interact with underlying spark functionality and allow programming spark with data frames and dataset APIs.
  Connecting to kafka topic Val df = spark.readStream.format("kafka").option(
  "kafka.bootstrap.servers", "host1:port1,host2:port2"
).option("subscribe", "topic1").option("starting Offsets", "earliest").load() df.printSchema() reveals the schema of our DataFrame.root | -- key: binary (nullable = true)
| -- value: binary (nullable = true)
| -- topic: string (nullable = true)
| -- partition: integer (nullable = true)
| -- offset: long (nullable = true)
| -- timestamp: timestamp (nullable = true)
| -- timestampType: integer (nullable = true)
The returned DataFrame (pdf) contains all the intimate fields of a Kafka record and its associated metadata. We can now use all of the intimate DataFrame or Dataset operations to transform the result.

Streaming ETL

Now the stream is set up, we can start doing the necessary ETL ( Extract, Transform, and Load) to extract meaningful information. Let's say that real-time streaming data push by NIfi to Kafka as below.
 
 {
  "city": "",
  "country": "India",
  "countryCode": "+91”
  "lat": 0.00,
  "regionName": "Mumbai"
  "status": "success"
  "zip": ""
 }
It is now possible to analyze quickly, such as how many users are coming from India.
    
val result = df.select(get_json_object(($ "value").cast("string"), "$.country")
  .alias("countByCountry"))
 .groupBy("countByContry")
 .county()

result.writeStream.format("kafka")
 .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
 .option("topic", "topic1").start
Now, we can parse the country out of incoming JSON messages, group them, and count, all in real-time, as we are reading data from Kafka topic. The spark streaming job works in the background and continuously updates the counts as new messages arrive.

Low Latency Continuous Processing Mode in Structured Streaming

Continuous Processing is a new Execution engine in Spark 2.3 that allows very low end-to-end latency with at least one fault tolerance guarantee. Compare this with the default micro- batch processing engine, which can achieve exactly-once guarantees latency of 1 second. Spark structured streaming, spark wait for 1 second, and batches together all the events received during that interval into a micro-batch. Driver schedule this micro-batch to execute as tasks at the Executors. After a micro-batch execution is complete, collect the next batch is and reschedule. This scheduling is frequently complete to give an impression of streaming execution. However, low latency doesn’t come without any costs. In fact, faster processing decreases the delivery guarantees to at least once from exactly once. So the advice of continuous execution for the system where the processing latency is more important than the delivery guarantee.

val result = spark.writeStream.format("kafka")
 .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
 .option("topic", "topic1")
 .trigger(Trigger.Continuous(" 1 seconds ")) //only this line include for continuous
 .start()
For continuous mode, you can choose which mode helps to execute without modifying the application logic. To run a query in a continuous processing mode, all you need to do is specify a continuous trigger with the desired checkpoint interval as the parameter.No need to change the logic of the code.

Related blogs and Articles

Real Time Streaming Application with Apache Spark

Big Data Engineering

Real Time Streaming Application with Apache Spark

Apache Spark Overview Apache Spark is a fast, in-memory data processing engine with expressive development APIs to allow data workers to execute streaming conveniently. With Spark running on Apache Hadoop YARN, developers everywhere can now create applications to exploit Spark’s power, derive insights, and enrich their data science workloads within a single, shared dataset in Apache Hadoop. In...