Building a real-time big data pipeline (8: Spark MLlib, Regression, R)

Published:

Updated on October 06, 2020

Apache Spark MLlib 1 2 3 is a distributed framework that provides many utilities useful for machine learning tasks, such as: Classification, Regression, Clustering, Dimentionality reduction and, Linear algebra, statistics and data handling. R is a popular statistical programming language with a number of packages that support data processing and machine learning tasks. However, R is single threaded and is often impractical to use it on large datasets. To address R’s scalability issue, the Spark community developed SparkR package4 which is based on a distributed data frame that enables structured data processing with a syntax familiar to R users.

SparkR (R on Spark) Architecture 4

SparkR Architecture

“SparkR is an R package that provides a light-weight frontend to use Apache Spark from R. In Spark 3.0.0, SparkR provides a distributed data frame implementation that supports operations like selection, filtering, aggregation etc. (similar to R data frames, dplyr) but on large datasets. SparkR also supports distributed machine learning using MLlib”.

1. Start Hadoop/HDFS

The following command will start the namenode as well as the data nodes as cluster 3.

cd /Users/adinasa/bigdata/hadoop-3.2.1  
$bash sbin/start-dfs.sh  

Create a directory

$source ~/.bash_profile  
$hadoop fs -mkdir -p /user/adinasarapu

Insert data into HDFS: Use -copyFromLocal command to move one or more files from local location to HDFS.

$hadoop fs -copyFromLocal *.csv /user/adinasarapu

Verify the files using ls command.

$hadoop fs -ls /user/adinasarapu  
-rw-r--r--   1 adinasa supergroup     164762 2020-08-18 17:39 /user/adinasarapu/data_proteomics.csv  
-rw-r--r--   1 adinasa supergroup        786 2020-08-18 17:39 /user/adinasarapu/samples_proteomics.csv  

Start YARN with the script: start-yarn.sh

$bash start-yarn.sh  
Starting resourcemanager  
Starting nodemanagers  

Check the list of Java processes running in your system by using the command jps. If you are able to see the Hadoop daemons running after executing the jps command, we can safely assume that the Hadoop cluster is running.

$jps  
96899 NodeManager  
91702 SecondaryNameNode  
96790 ResourceManager  
97240 Jps  
91437 NameNode  
91550 DataNode  

Open a web browser to see your configurations for the current session.

Web UI
for HDFS: http://localhost:9870
for YARN Resource Manager: http://localhost:8088

2. SparkR, spark session in R environment

a. Start sparkR shell from Spark installation

cd to spark installation directory and run sparkR command

cd bigdata/spark-3.0.0-bin-hadoop3.2/bin/  
>sparkR

...
...
...

 Welcome to
    ____              __ 
   / __/__  ___ _____/ /__ 
  _\ \/ _ \/ _ `/ __/  '_/ 
 /___/ .__/\_,_/_/ /_/\_\   version  3.0.0 
    /_/ 


 SparkSession available as 'spark'.
During startup - Warning message:
In SparkR::sparkR.session() :
  Version mismatch between Spark JVM and SparkR package. JVM version was 3.0.0 , while R package version was 2.4.6

b. Install SparkR package

install.packages(remotes)  
library(remotes)
install_github(“cran/SparkR”)

c. Open RStudio and set SPARK_HOME and JAVA_HOME

Check java version system("java -version")
If you see an error like “R looking for the wrong java version” set Sys.setenv(JAVA_HOME=java_path)

Error in checkJavaVersion() : 
  Java version 8 is required for this package; found version: 14.0.2

Set env variables

Sys.setenv(SPARK_HOME = "/Users/adinasa/bigdata/spark-3.0.0-bin-hadoop3.2")
.libPaths(c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib"), .libPaths()))

java_path <- normalizePath("/Library/Java/JavaVirtualMachines/jdk1.8.0_261.jdk/Contents/Home")
Sys.setenv(JAVA_HOME=java_path)  

Load SparkR and start SparkSession which is the entry point into SparkR.

library(SparkR)  
SparkR::sparkR.session(master = "local", sparkHome = Sys.getenv("SPARK_HOME"), appName = "SparkR", spark.executor.memory = "2g")

To list config values with keys as their names,

sparkR.conf()  

Read data from hadoop/csv file. In Spark 2.0 csv is natively supported so you should be able to do something like this

s.df <- SparkR::read.df("hdfs://localhost:9000/user/adinasarapu/samples_proteomics.csv", source = "csv", header = "true")

# s.df
# SparkDataFrame[SampleID:string, Disease:string, Genetic:string, Age:string, Sex:string]

# Replace Yes or No with 1 or 0
newDF1 <- withColumn(s.df, "Disease", ifelse(s.df$Disease == "Yes", 1, 0))
newDF2 <- withColumn(newDF1, "Genetic", ifelse(s.df$Genetic == "Yes", 1, 0))

# head(newDF2)

# Change Column type and select required columns for model building
createOrReplaceTempView(newDF2, "df_view")
new_df <- SparkR::sql("SELECT DOUBLE(Disease), DOUBLE(Genetic), DOUBLE(Age) from df_view")

model <- spark.glm(new_df, Disease ~ Genetic + Age, family = "gaussian",maxIter=10, regParam=0.3)

# Print model summary
summary(model)

Results

Deviance Residuals: 
(Note: These are approximate quantiles with relative error <= 0.01)
     Min        1Q    Median        3Q       Max  
-0.68368  -0.40454   0.09583   0.39245   0.53625  

Coefficients:
              Estimate  Std. Error  t value  Pr(>|t|)
(Intercept)  0.0408043   0.3501497  0.11653  0.908092
Genetic      0.2797034   0.1278479  2.18778  0.037519
Age          0.0084589   0.0054737  1.54538  0.133896

(Dispersion parameter for gaussian family taken to be 0.1752621)

    Null deviance: 6.6667  on 29  degrees of freedom
Residual deviance: 4.7321  on 27  degrees of freedom
AIC: 37.73

Number of Fisher Scoring iterations: 1

Further, compute predictions using training data

preds <- predict(model, training)

Note: One of the major changes of Apache Spark for R version 4.0.0 (06/2020)

[2.4][SPARK-31918][R] Ignore S4 generic methods under SparkR namespace in closure cleaning to support R 4.0.0+ (+18, -13)>

This PR proposes to exclude the S4 generic methods under SparkR namespace in closure cleaning to support R 4.0.0+ in SparkR. Without this patch, you will hit the following exception when running R native codes with R 4.0.0

df <- createDataFrame(lapply(seq(100), function (e) list(value=e)))
count(dapply(df, function(x) as.data.frame(x[x$value < 50,]), schema(df)))

org.apache.spark.SparkException: R unexpectedly exited.
R worker produced errors: Error in lapply(part, FUN) : attempt to bind a variable to R_UnboundValue

Other important Spark Dataframe Operations: filter, select, summarize, groupBy, arrange(=sort)

head(filter(s.df, s.df$Age > 50))  
head(select(s.df, s.df$SampleID, s.df$Genetic))  
head(select(filter(s.df, s.df$Age > 55), s.df$SampleID, s.df$Disease))
head(summarize(groupBy(s.df, s.df$Disease), mean=mean(s.df$Age), count=n(s.df$Genetic)))
head(arrange(s.df, asc(s.df$Age)))

Combine Spark DataFrame operations using library magrittr

library(magrittr)
f_df <- filter(s.df, s.df$Age > 50) %>% groupBy(s.df$Disease) %>% summarize(mean=mean(s.df$Age))

SQL like queries: First register Spark Dataframe as sql table

createOrReplaceTempView(s.df, "df_view")
new_df <- SparkR::sql("SELECT * FROM df_view WHERE Age < 51")  

Convert Spark Dataframe to R DataFrame: as.data.frame or collect

r.df <- SparkR::as.data.frame(new_df)  
# r.df <- SparkR::collect(new_df)
r.df

Finally, shutting down the HDFS
You can stop all the daemons using the command stop-all.sh. You can also start or stop each daemon separately.

$bash stop-all.sh
Stopping namenodes on [localhost]
Stopping datanodes
Stopping secondary namenodes [Ashoks-MacBook-Pro.2.local]
Stopping nodemanagers
Stopping resourcemanager

Further reading…
Logistic Regression in Spark ML
Logistic Regression with Apache Spark
Feature Transformation
SparkR and Sparking Water
Integrate SparkR and R for Better Data Science Workflow
A Compelling Case for SparkR
Spark – How to change column type?
SparkRext - SparkR extension for closer to dplyr

References