Introduction to Apache Spark RDDs using Python

Apache Spark is a must for Big data’s lovers. In a few words, Spark is a fast and powerful framework that provides an API to perform massive distributed processing over resilient sets of data.

Prerequisites :

Before we begin, please set up the Python and Apache Spark environment on your machine. Head over to this blog here to install if you have not done so.

For our dataset, we will use the KDD Cup 1999 competition dataset is described in detail here. The results of this competition can be found here.

Resilient Distributed Datasets (RDD) :

Resilient Distributed Datasets (RDD) is a fundamental data structure of Spark. It is an immutable distributed collection of objects. Each dataset in RDD is divided into logical partitions, which may be computed on different nodes of the cluster. RDDs can contain any type of Python, Java, or Scala objects, including user-defined classes.

Data sharing is slow in MapReduce due to replication, serialization, and disk IO. Most of the Hadoop applications, they spend more than 90% of the time doing HDFS read-write operations.

Recognizing this problem, researchers developed a specialized framework called Apache Spark. The key idea of spark is Resilient Distributed Datasets (RDD); it supports in-memory processing computation. This means, it stores the state of memory as an object across the jobs and the object is sharable between those jobs. Data sharing in memory is 10 to 100 times faster than network and Disk.

All work in Spark is expressed as either creating new RDDs, transforming existing RDDs, or calling actions on RDDs to compute a result. Spark automatically distributes the data contained in RDDs across your cluster and parallelizes the operations you perform on them

Getting the data files :

We will use the reduced dataset (10 percent) provided for the KDD Cup 1999, containing nearly half million network interactions. The file is provided as a Gzip file that we will download locally.

Creating a RDD from a file or using parallelize :

We will introduce three basic but essential Spark operations. Two of them are the transformations map and filter. The other is the action collect. At the same time we will introduce the concept of persistence in Spark.

As a last example combining all the previous, we want to collect all the normal interactions as key-value pairs.

Basically the collect action will get all the elements in the RDD into memory for us to work with them, hence making the RDD persistent. For this reason it has to be used with care, specially when working with large RDDs.

Regarding transformations, sample will be introduced since it will be useful in many statistical learning scenarios. Then we will compare results with the takeSample action.

But the power of sampling as a transformation comes from doing it as part of a sequence of additional transformations. This will show more powerful once we start doing aggregations and key-value pairs operations, and will be specially useful when using Spark’s machine learning library MLlib.

We can aggregate RDD data in Spark by using three different actions: reduce, fold, and aggregate. The last one is the more general one and someway includes the first two.

As an example, imagine we want to know the total duration of our interactions for normal and attack interactions. We can use reduce as follows.

Using reduce :

Using aggregate :

Creating a pair RDD :

We want to profile each network interaction type in terms of some of its variables such as duration. In order to do so, we first need to create the RDD suitable for that, where each interaction is parsed as a CSV row representing the value, and is put together with its corresponding tag as a key.

Normally we create key/value pair RDDs by applying a function using map to the original data. This function returns the corresponding pair for a given RDD element. We can proceed as follows.

Data aggregations with key/value pair RDDs :

Using reduceByKey :

We have a reduceByKey transformation that we can use as follows to calculate the total duration of each network interaction type.

Using combineByKey :

We can think about it as the aggregate equivalent since it allows the user to return values that are not the same type as our input data.

Basically, everything turns around the concept of Data Frame and using SQL language to query them. We will see how the data frame abstraction, very popular in other data analytics ecosystems (e.g. R and Python/Pandas), it is very powerful when performing exploratory data analysis. In fact, it is very easy to express data queries when used together with the SQL language. Moreover, Spark distributes this column-based data structure transparently, in order to make the querying process as efficient as possible.

Getting a Data Frame :

A Spark DataFrame is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R or Pandas. They can be constructed from a wide array of sources such as a existing RDD in our case.

The entry point into all SQL functionality in Spark is the SQLContext class. To create a basic instance, all we need is a SparkContext reference. Since we are running Spark in shell mode (using pySpark) we can use the global context object sc for this purpose.

Spark SQL can convert an RDD of Row objects to a DataFrame. Rows are constructed by passing a list of key/value pairs as kwargs to the Row class. The keys define the column names, and the types are inferred by looking at the first row. Therefore, it is important that there is no missing data in the first row of the RDD in order to properly infer the schema.

In our case, we first need to split the comma separated data, and then use the information in KDD’s 1999 task description to obtain the column names.

Now we can run SQL queries over our data frame that has been registered as a table.

The results of SQL queries are RDDs and support all the normal RDD operations.

Queries as DataFrame operations :

Spark DataFrame provides a domain-specific language for structured data manipulation. This language includes methods we can concatenate in order to do selection, filtering, grouping, etc. For example, let's say we want to count how many interactions are there for each protocol type. We can proceed as follows.

Let’s say we want to count how many interactions are there for each protocol type. We can proceed as follows.

Ressources/References :

Committed lifelong learner. I am passionate about machine learning, data engineering and currently working as a datascientist.

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store