Building a real-time big data pipeline (part 9: Spark MLlib, Python, Regression)

Published:

Updated on December 26, 2020

Apache Spark MLlib 1 2 3 is a distributed framework that provides many utilities useful for machine learning tasks, such as: classification, regression, clustering, dimentionality reduction and, linear algebra and statistics. Python is a general purpose popular programming language with a number of packages that support data processing and machine learning tasks. The Spark Python API (PySpark) exposes the Spark programming model to Python.

PySpark (Python on Spark)

Python, or R, DataFrame exists on one machine rather than multiple machines. If you want to do distributed computation, then you’ll need to perform operations on Spark dataframes, and not using Python or R dataframe. This has been achieved by taking advantage of the SparkR4 or PySpark APIs. Spark’s dataframe object can be thought of as a table distributed across a cluster and has functionality that is similar to dataframe in R or python.

1. Start Hadoop/HDFS

The following command will start the namenode as well as the data nodes as cluster 3.

cd /Users/adinasa/bigdata/hadoop-3.2.1  
$bash sbin/start-dfs.sh  

Create a directory

$source ~/.bash_profile  
$hadoop fs -mkdir -p /user/adinasarapu

Insert data into HDFS: Use -copyFromLocal command to move one or more files from local location to HDFS.

$hadoop fs -copyFromLocal *.csv /user/adinasarapu

Verify the files using ls command.

$hadoop fs -ls /user/adinasarapu  
-rw-r--r--   1 adinasa supergroup     164762 2020-08-18 17:39 /user/adinasarapu/data_proteomics.csv  
-rw-r--r--   1 adinasa supergroup        786 2020-08-18 17:39 /user/adinasarapu/samples_proteomics.csv  

Start YARN with the script: start-yarn.sh

$bash start-yarn.sh  
Starting resourcemanager  
Starting nodemanagers  

Check the list of Java processes running in your system by using the command jps. If you are able to see the Hadoop daemons running after executing the jps command, we can safely assume that the Hadoop cluster is running.

$jps  
96899 NodeManager  
91702 SecondaryNameNode  
96790 ResourceManager  
97240 Jps  
91437 NameNode  
91550 DataNode  

Open a web browser to see your configurations for the current session.

Web UI
for HDFS: http://localhost:9870
for YARN Resource Manager: http://localhost:8088

2. PySpark, spark session in Python environment

Here are the instructions for Configuring Eclipse with Python and Spark on Hadoop

PySpark communicates with the Scala-based Spark via the Py4J library. Py4J isn’t specific to PySpark or Spark. Py4J allows any Python program to talk to JVM-based code.

Creating a spark context: “The entry-point of any PySpark program is a SparkContext object. This object allows you to connect to a Spark cluster and create RDDs. The local[*] string is a special string denoting that you’re using a local cluster, which is another way of saying you’re running in single-machine mode. The * tells Spark to create as many worker threads as logical cores on your machine. Creating a SparkContext can be more involved when you’re using a cluster. To connect to a Spark cluster, you might need to handle authentication and a few other pieces of information specific to your cluster” Source https://realpython.com

Since Spark 2.x, a new entry point called SparkSession has been introduced that essentially combined all functionalities available in Spark 1.x (entry ponits SparkContext, SQLContext and HiveContext).

spark-context
Image source https://www.tutorialspoint.com

from pyspark.conf import SparkConf
from pyspark.context import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import when   

conf = SparkConf()
conf.setMaster('local[*]')
conf.setAppName('MyApp')
# conf.set(key, value)
conf.set("spark.executor.memory", '4g')
conf.set('spark.executor.cores', '1')
conf.set('spark.cores.max', '1')
conf.set("spark.driver.memory",'4g')

SparkContext is available as sc by default.

If you are using the spark-shell, SparkContext is already available through the variable called sc. To create a new SparkContext, first you need to stop the default SparkContext.

# sc.stop()  
SparkContext().stop()

SparkSession gives access to SparkContext.

# sc = SparkContext(conf=conf)

spark = SparkSession.builder.config(conf=conf).getOrCreate()
sc = spark.sparkContext

txt = sc.textFile('hdfs://localhost:9000/user/adinasarapu/samples_proteomics.csv')
print(txt.collect())

You can start creating Resilient Distributed Datasets (RDDs) once you have a SparkContext.

One way to create RDDs is to read a file with textFile() method. RDDs are one of the foundational data structures in Spark. A single RDD can be divided into multiple logical partitions so that these partitions can be stored and processed on different machines of a cluster. RDDs are immutable (read-only) in nature. You cannot change an original RDD, but you can create new RDDs by performing operations, like transformations, on an existing RDD. An RDD in Spark can be cached and used again for future transformations. RDDs are said to be lazily evaluated, i.e., they delay the evaluation until it is really needed.

What are the limitations of RDD in Apache Spark?
RDD does not provide schema view of data. It has no provision for handling structured data. Dataset and DataFrame provide the Schema view of data. DataFrame is a distributed collection of data organized into named columns. Spark DataFrames can be created from various sources, such as external files or databases, or the existing RDDs. DataFrames allow the processing of huge amounts of data. Datasets are an extension of the DataFrame APIs in Spark. In addition to the features of DataFrames and RDDs, datasets provide various other functionalities. They provide an object-oriented programming interface, which includes the concepts of classes and objects.

You can start creating a DataFrame once you have a SparkSession.

A SparkSession can be used create DataFrame, register DataFrame as tables, execute SQL over tables, cache tables etc.
Creating Spark DataFrame from CSV file:

df = spark.read.format('csv').option('header',True).option('multiLine', True).load('hdfs://localhost:9000/user/adinasarapu/samples_proteomics.csv')  
df.show()  
+--------+-------+-------+---+------+  
|SampleID|Disease|Genetic|Age|   Sex|  
+--------+-------+-------+---+------+  
|     D_2|    Yes|     No| 63|female|  
|     D_7|    Yes|     No| 78|female|  
|    D_12|    Yes|     No| 65|female|  
|    D_17|    Yes|     No| 58|female|  
|    D_22|    Yes|     No| 65|female|  
|    D_27|    Yes|     No| 50|female|  
|    D_32|    Yes|     No| 67|female|  
|    D_37|    Yes|     No| 80|female|  
|    D_42|    Yes|     No| 66|female|  
|    D_47|    Yes|     No| 64|female|  
|    DG_3|    Yes|    Yes| 76|female|  
|    DG_8|    Yes|    Yes| 56|female|  
|   DG_13|    Yes|    Yes| 81|female|  
|   DG_18|    Yes|    Yes| 69|female|  
|   DG_23|    Yes|    Yes| 60|female|  
|   DG_28|    Yes|    Yes| 63|female|  
|   DG_33|    Yes|    Yes| 70|female|  
|   DG_38|    Yes|    Yes| 46|female|  
|   DG_43|    Yes|    Yes| 65|female|  
|   DG_48|    Yes|    Yes| 77|female|  
+--------+-------+-------+---+------+  
only showing top 20 rows  
print(df)  
DataFrame[SampleID: string, Disease: string, Genetic: string, Age: string, Sex: string]  

Replacing Yes or No with 1 or 0

newDf = df.withColumn('Disease', when(df['Disease'] == 'Yes', 1).otherwise(0))  
newDf = newDf.withColumn('Genetic', when(df['Genetic'] == 'Yes', 1).otherwise(0))  
newDf.show()  

+--------+-------+-------+---+------+  
|SampleID|Disease|Genetic|Age|   Sex|  
+--------+-------+-------+---+------+  
|     D_2|      1|      0| 63|female|  
|     D_7|      1|      0| 78|female|  
|    D_12|      1|      0| 65|female|  
|    D_17|      1|      0| 58|female|  
|    D_22|      1|      0| 65|female|  
|    D_27|      1|      0| 50|female|  
|    D_32|      1|      0| 67|female|  
|    D_37|      1|      0| 80|female|  
|    D_42|      1|      0| 66|female|  
|    D_47|      1|      0| 64|female|  
|    DG_3|      1|      1| 76|female|  
|    DG_8|      1|      1| 56|female|  
|   DG_13|      1|      1| 81|female|  
|   DG_18|      1|      1| 69|female|  
|   DG_23|      1|      1| 60|female|  
|   DG_28|      1|      1| 63|female|  
|   DG_33|      1|      1| 70|female|  
|   DG_38|      1|      1| 46|female|  
|   DG_43|      1|      1| 65|female|  
|   DG_48|      1|      1| 77|female|  
+--------+-------+-------+---+------+  
only showing top 20 rows  

Change Column type and select required columns for model building

newDf = newDf.withColumn("Disease",newDf["Disease"].cast('double'))  
newDf = newDf.withColumn("Genetic",newDf["Genetic"].cast('double'))  
newDf = newDf.withColumn("Age",newDf["Age"].cast('double'))  
newDf.show()    
+--------+-------+-------+----+------+  
|SampleID|Disease|Genetic| Age|   Sex|  
+--------+-------+-------+----+------+  
|     D_2|    1.0|    0.0|63.0|female|  
|     D_7|    1.0|    0.0|78.0|female|  
|    D_12|    1.0|    0.0|65.0|female|  
|    D_17|    1.0|    0.0|58.0|female|  
|    D_22|    1.0|    0.0|65.0|female|  
|    D_27|    1.0|    0.0|50.0|female|  
|    D_32|    1.0|    0.0|67.0|female|  
|    D_37|    1.0|    0.0|80.0|female|  
|    D_42|    1.0|    0.0|66.0|female|  
|    D_47|    1.0|    0.0|64.0|female|  
|    DG_3|    1.0|    1.0|76.0|female|  
|    DG_8|    1.0|    1.0|56.0|female|  
|   DG_13|    1.0|    1.0|81.0|female|  
|   DG_18|    1.0|    1.0|69.0|female|  
|   DG_23|    1.0|    1.0|60.0|female|  
|   DG_28|    1.0|    1.0|63.0|female|  
|   DG_33|    1.0|    1.0|70.0|female|  
|   DG_38|    1.0|    1.0|46.0|female|  
|   DG_43|    1.0|    1.0|65.0|female|  
|   DG_48|    1.0|    1.0|77.0|female|  
+--------+-------+-------+----+------+  
only showing top 20 rows  

Prepare data for Machine Learning.

In statistical modeling, regression analysis focuses on investigating the relationship between a dependent variable and one or more independent variables.

In data mining, Regression is a model to represent the relationship between the value of lable ( or target, it is numerical variable) and on one or more features (or predictors they can be numerical and categorical variables).

We need only two columns — features (Genetic and Age) and label (“Disease”)

from pyspark.ml.feature import VectorAssembler  
vectorAssembler = VectorAssembler(inputCols = ['Genetic','Age'], outputCol = 'features')  
vhouse_df = vectorAssembler.transform(newDf)  

Subset Dataset

vhouse_df = vhouse_df.select(['features', 'Disease'])  
vhouse_df.show()  
from pyspark.ml.regression import GeneralizedLinearRegression  
glr = GeneralizedLinearRegression(featuresCol = "features", labelCol="Disease", maxIter=10, regParam=0.3, family="gaussian", link="identity")  
lr_model = glr.fit(vhouse_df)  

Summarize the model over the training set

trainingSummary = lr_model.summary  
print("Coefficients: " + str(lr_model.coefficients))  
print("Intercept: " + str(lr_model.intercept))

Results

Coefficients: [0.2797034064149404,0.008458887145555321]  
Intercept: 0.04080427059655318

Finally, shutting down the HDFS
You can stop all the daemons using the command stop-all.sh. You can also start or stop each daemon separately.

$bash stop-all.sh
Stopping namenodes on [localhost]
Stopping datanodes
Stopping secondary namenodes [Ashoks-MacBook-Pro.2.local]
Stopping nodemanagers
Stopping resourcemanager

Further reading…
Logistic Regression in Spark ML
Logistic Regression with Apache Spark
Feature Transformation
PySpark: Apache Spark with Python
SparkR and Sparking Water
Integrate SparkR and R for Better Data Science Workflow
A Compelling Case for SparkR
Spark – How to change column type?
SparkRext - SparkR extension for closer to dplyr
Scala Vs Python Vs R Vs Java - Which language is better for Spark & Why?

References