Apache Spark is a must for Big data’s lovers. In a few words, Spark is a fast and powerful framework that provides an API to perform massive distributed processing over resilient sets of data.
Before we begin, please set up the Python and Apache Spark environment on your machine. Head over to this blog here to install if you have not done so.
Resilient Distributed Datasets (RDD) :
Resilient Distributed Datasets (RDD) is a fundamental data structure of Spark. It is an immutable distributed collection of objects. Each dataset in RDD is divided into logical partitions, which may be computed on different nodes of the cluster. RDDs can contain any type of Python, Java, or Scala objects, including user-defined classes.
Why do we need RDDs ?
Data sharing is slow in MapReduce due to replication, serialization, and disk IO. Most of the Hadoop applications, they spend more than 90% of the time doing HDFS read-write operations.
Recognizing this problem, researchers developed a specialized framework called Apache Spark. The key idea of spark is Resilient Distributed Datasets (RDD); it supports in-memory processing computation. This means, it stores the state of memory as an object across the jobs and the object is sharable between those jobs. Data sharing in memory is 10 to 100 times faster than network and Disk.
All work in Spark is expressed as either creating new RDDs, transforming existing RDDs, or calling actions on RDDs to compute a result. Spark automatically distributes the data contained in RDDs across your cluster and parallelizes the operations you perform on them
RDD creation :
Getting the data files :
We will use the reduced dataset (10 percent) provided for the KDD Cup 1999, containing nearly half million network interactions. The file is provided as a Gzip file that we will download locally.
Creating a RDD from a file or using parallelize :
RDD basics :
We will introduce three basic but essential Spark operations. Two of them are the transformations
filter. The other is the action
collect. At the same time we will introduce the concept of persistence in Spark.
As a last example combining all the previous, we want to collect all the
normal interactions as key-value pairs.
Basically the collect action will get all the elements in the RDD into memory for us to work with them, hence making the RDD persistent. For this reason it has to be used with care, specially when working with large RDDs.
Sampling RDDs :
sample will be introduced since it will be useful in many statistical learning scenarios. Then we will compare results with the
But the power of sampling as a transformation comes from doing it as part of a sequence of additional transformations. This will show more powerful once we start doing aggregations and key-value pairs operations, and will be specially useful when using Spark’s machine learning library MLlib.
Data aggregations on RDDs :
We can aggregate RDD data in Spark by using three different actions:
aggregate. The last one is the more general one and someway includes the first two.
As an example, imagine we want to know the total duration of our interactions for normal and attack interactions. We can use
reduce as follows.
Working with key/value pair RDDs:
Creating a pair RDD :
We want to profile each network interaction type in terms of some of its variables such as duration. In order to do so, we first need to create the RDD suitable for that, where each interaction is parsed as a CSV row representing the value, and is put together with its corresponding tag as a key.
Normally we create key/value pair RDDs by applying a function using
map to the original data. This function returns the corresponding pair for a given RDD element. We can proceed as follows.
Data aggregations with key/value pair RDDs :
We have a
reduceByKey transformation that we can use as follows to calculate the total duration of each network interaction type.
We can think about it as the
aggregate equivalent since it allows the user to return values that are not the same type as our input data.
Spark SQL: structured processing for Data Analysis
Basically, everything turns around the concept of Data Frame and using SQL language to query them. We will see how the data frame abstraction, very popular in other data analytics ecosystems (e.g. R and Python/Pandas), it is very powerful when performing exploratory data analysis. In fact, it is very easy to express data queries when used together with the SQL language. Moreover, Spark distributes this column-based data structure transparently, in order to make the querying process as efficient as possible.
Getting a Data Frame :
DataFrame is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R or Pandas. They can be constructed from a wide array of sources such as a existing RDD in our case.
The entry point into all SQL functionality in Spark is the
SQLContext class. To create a basic instance, all we need is a
SparkContext reference. Since we are running Spark in shell mode (using pySpark) we can use the global context object
sc for this purpose.
Spark SQL can convert an RDD of
Row objects to a
DataFrame. Rows are constructed by passing a list of key/value pairs as kwargs to the
Row class. The keys define the column names, and the types are inferred by looking at the first row. Therefore, it is important that there is no missing data in the first row of the RDD in order to properly infer the schema.
In our case, we first need to split the comma separated data, and then use the information in KDD’s 1999 task description to obtain the column names.
Now we can run SQL queries over our data frame that has been registered as a table.
The results of SQL queries are RDDs and support all the normal RDD operations.
DataFrame operations :
DataFrame provides a domain-specific language for structured data manipulation. This language includes methods we can concatenate in order to do selection, filtering, grouping, etc. For example, let's say we want to count how many interactions are there for each protocol type. We can proceed as follows.
Let’s say we want to count how many interactions are there for each protocol type. We can proceed as follows.