by Ali Zaidi, Data Scientist at Microsoft
Apache Spark and a Tale of APIs
Spark is an exceptionally popular processing engine for distributed data. Dealing with data in distributed storage and programming with concurrent systems often requires learning complicated new paradigms and techniques. Statisticans and data scientists familiar with R are unlikely to have much experience with such systems. Fortunately, Spark is a very intuitive distributed computing platform, and there are APIs available in R and Python that are more familiar to the average data scientist than scala.
One of the most intriguing APIs available for Spark for R users is the sparkapi package
developed by RStudio and the associated sparklyr
project, which provides a very similar dplyr
syntax for manipulating Spark DataFrames.
Azure HDInsight
Azure HDInsight is the Microsoft developed Apache Hadoop distribution for the cloud. The Hadoop ecosystem is a wild zoo of components, and configuring them together can be a difficult job. Microsoft's Azure HDInsight service gives data scientists and analysts the most exciting capabilities of distributed computing on the cloud without the hassle of configuration and management. In this post, I will show how we can utilize premium Spark clusters with a complete installation of Microsoft R's stack for doing data manipulation at scale.
Provisioning
To provision Azure HDInsight Spark clusters with Microsoft R Server, follow the instructions on the Azure Documentation Page, or my Spark course repository. To create a cluster, simply navigate to your portal and select a new HDInsight cluster:
On the Azure portal page, you will be asked to select a subscription, and also provide some security credentials to access your cluster via a secure terminal connection (SSH).
Make sure you select the premium option to get the awesome features of Microsoft R Server:
A follow up blog post will show you how to develop scalable machine learning models using the Microsoft R Server machine learning libraries.
Spark DataFrames and Spark SQL
In order to accommodate the variety of workloads a data scientist encounters during her work, Spark contains four core pieces aimed at a specific use-case:
The component aimed at data manipulation workloads is the Spark SQL component. Inside the Spark SQL context, Spark developers create DataFrames, which are a schematic versions of RDDs (Resilient Distributed Datasets). RDDs are the standard primitive of data in the Spark ecosystem, and can be thought of as distributed lists. However, just as lists are somewhat too general for the workloads necessitated by data manipulation in R, RDDs are also too general for data analysis workloads in Spark. By injecting a schema of columns (type schema and equality of rows), DataFrames are born.
Functional Programming and the Art of Being Lazy
The majority of Spark is written in Scala (~80% of Spark core), which is a functional programming language. Functional programming languages emphasize functional purity (the output only depends on the inputs) and strive to avoid side-effects. One important component of most functional programming languages is their lazy evaluation. While it might seem odd that we would appreciate laziness from our computing tools, lazy evaluation is an effective way of ensuring computations are evaluated in the most efficient manner possible.
Lazy evaluation allows Spark SQL to highly optimize the queries. When a user submits a query to Spark SQL, Spark composes the components of the SQL query into a logical plan. The logical plan is basically a recipe Spark SQL creates in order to evaluate the desired query. Spark SQL then submits the logical plan to its highly optimized engine called Catalyst, which optimizes this plan into a physical plan of action that is executed inside Spark computation engine (a series of coordinating JVMs).
Spark SQL and dplyr
Some of the more R-enthusiastic readers might already see where this is going. Since Spark SQL is Spark's core component of doing data analysis, why can't we just use our favorite SQL-like paradigm in R to connect to Spark as an inteface? For many an R user, the only package that brings up memories of love and joy for data analysis is dplyr
.
More than just being an awesome package with a simple to understand grammar, dplyr
is highly extensible and works hand-in-hand with SQL. It supports working with many SQL databases by utilizing that database engine as it's backend, and converting/translating all of it's verbs into SQL statements. For more information on working with databases using dplyr
, see the package's vignette on databases. Take a look at dplyr
's translate_sql
function to see how it converts dplyr
verbs into SQL statements.
The sparklyr
package takes advantage of dplyr's SQL connectivity and R's lazy evaluation procedures by sending all the dplyr
operations to Spark SQL. Therefore, as an R user, you don't need to write anything but dplyr
to get the awesomeness of Spark SQL and Catalyst!
Spark Context
In order to start using Spark as our computation engine, we need to define a Spark context (or Spark Session). We will use the sparklyr
function spark_connect
to create a Spark SQL context. There are a slew of arguments you can use to modify the parameters of your Spark SQL context, but we will stick to the defaults. By specifying yarn-client
as our master, we are telling Spark we want a yarn client application. For an overview of the environments on which you can run spark, take a look at the documentation page here.
library(sparklyr) sc <- spark_connect("yarn-client")
Import Into Spark DataFrames
In my cluster's storage account on Azure HDInsight, I have loaded data from Freddie Mac's single family loan database. I then use the spark_read_csv
function to read my folder of mortgage originations data into a Spark DataFrame.
origins <- file.path("wasb://[email protected]", "user/RevoShare/alizaidi/Freddie/Acquisition") freddie_origins <- spark_read_csv(sc, path = origins, name = 'freddie_origins', header = FALSE, delimiter = "|" )
Using dplyr
to Query our Spark DataFrame
The resulting object from the above operation is a Spark DataFrame. If we peek at it's class, we notice that in addition to being a tbl_spark
object, it is also of class tbl_sql
, so all dplyr
methods are converted to Spark SQL
statements and run on the spark application defined through the spark context sc
.
class(freddie_origins)
## [1] "tbl_spark" "tbl_sql" "tbl_lazy" "tbl"
library(dplyr)
freddie_origins %>% head
## Source: query [?? x 25] ## Database: spark connection master=yarn-client app=sparklyr local=FALSE ## ## V1 V2 V3 V4 V5 V6 V7 V8 V9 V10 V11 ## <chr> <int> <chr> <int> <int> <chr> <int> <chr> <dbl> <chr> <int> ## 1 751 199910 N 202909 NA 000 1 O 71 20 180000 ## 2 733 199909 N 202908 29540 000 1 O 51 116000 ## 3 755 199905 N 202904 29540 30 1 O 95 38 138000 ## 4 669 200206 N 202901 NA 000 1 O 80 33 162000 ## 5 732 199904 N 202903 17140 000 1 O 25 10 53000 ## 6 715 199904 N 202903 17140 000 1 O 67 35 91000 ## # ... with 14 more variables: V12 <int>, V13 <dbl>, V14 <chr>, V15 <chr>, ## # V16 <chr>, V17 <chr>, V18 <chr>, V19 <int>, V20 <chr>, V21 <chr>, ## # V22 <int>, V23 <int>, V24 <chr>, V25 <chr>
Therefore, we can do any our common dplyr
operations directly on this Spark DataFrame, and through the magic of method dispatch in R, the operations are converted to SQL and sent to Catalyst for evaluation in Spark SQL.
freddie_rename <- freddie_origins %>% rename( credit_score = V1, first_payment = V2, first_home = V3, maturity = V4, msa = V5, mi_perc = V6, num_units = V7, occ_status = V8, cltv = V9, dti = V10, upb = V11, ltv = V12, orig_rate = V13, channel = V14, ppm = V15, prod_type = V16, state = V17, prop_type = V18, post_code = V19, loan_number = V20, loan_purpose = V21, orig_term = V22, num_borrowers = V23, seller = V24, servicer = V25 ) freddie_rename %>% head
## Source: query [?? x 25] ## Database: spark connection master=yarn-client app=sparklyr local=FALSE ## ## credit_score first_payment first_home maturity msa mi_perc num_units ## <chr> <int> <chr> <int> <int> <chr> <int> ## 1 751 199910 N 202909 NA 000 1 ## 2 733 199909 N 202908 29540 000 1 ## 3 755 199905 N 202904 29540 30 1 ## 4 669 200206 N 202901 NA 000 1 ## 5 732 199904 N 202903 17140 000 1 ## 6 715 199904 N 202903 17140 000 1 ## # ... with 18 more variables: occ_status <chr>, cltv <dbl>, dti <chr>, ## # upb <int>, ltv <int>, orig_rate <dbl>, channel <chr>, ppm <chr>, ## # prod_type <chr>, state <chr>, prop_type <chr>, post_code <int>, ## # loan_number <chr>, loan_purpose <chr>, orig_term <int>, ## # num_borrowers <int>, seller <chr>, servicer <chr>
The origination date is buried inside the loan number field. We will pick it out by indexing the loan number substring:
freddie_rename %>% select(loan_number)
## Source: query [?? x 1] ## Database: spark connection master=yarn-client app=sparklyr local=FALSE ## ## loan_number ## <chr> ## 1 F199Q1000001 ## 2 F199Q1000002 ## 3 F199Q1000003 ## 4 F199Q1000004 ## 5 F199Q1000005 ## 6 F199Q1000006 ## 7 F199Q1000007 ## 8 F199Q1000008 ## 9 F199Q1000009 ## 10 F199Q1000010 ## # ... with more rows
Observe that year field is augmented with the quarter field and then some additional loan identification numbers. We will extract that by using some very simple string operations:
freddie_rename <- freddie_rename %>% mutate(orig_date = substr(loan_number, 3, 4), year = as.numeric(substr(loan_number, 3, 2))) freddie <- freddie_rename %>% mutate(orig_year = paste0(ifelse(year < 10, "200", ifelse(year > 16, "19", "20")), year)) freddie <- freddie %>% mutate(orig_year = substr(orig_year, 1, 4)) freddie %>% head
## Source: query [?? x 28] ## Database: spark connection master=yarn-client app=sparklyr local=FALSE ## ## credit_score first_payment first_home maturity msa mi_perc num_units ## <chr> <int> <chr> <int> <int> <chr> <int> ## 1 751 199910 N 202909 NA 000 1 ## 2 733 199909 N 202908 29540 000 1 ## 3 755 199905 N 202904 29540 30 1 ## 4 669 200206 N 202901 NA 000 1 ## 5 732 199904 N 202903 17140 000 1 ## 6 715 199904 N 202903 17140 000 1 ## # ... with 21 more variables: occ_status <chr>, cltv <dbl>, dti <chr>, ## # upb <int>, ltv <int>, orig_rate <dbl>, channel <chr>, ppm <chr>, ## # prod_type <chr>, state <chr>, prop_type <chr>, post_code <int>, ## # loan_number <chr>, loan_purpose <chr>, orig_term <int>, ## # num_borrowers <int>, seller <chr>, servicer <chr>, orig_date <chr>, ## # year <dbl>, orig_year <chr>
Calculate Average Credit Score by Year
Now that we have created features defining the quarter and year of the mortgage origination, we can create a summary of average credit attributes by year. For example, here's the average credit score by year for each state:
fico_year <- freddie %>% group_by(orig_year, state) %>% summarise(ave_fico = mean(credit_score)) %>% collect fico_year %>% head
## Source: local data frame [6 x 3] ## Groups: orig_year [6] ## ## orig_year state ave_fico ## <chr> <chr> <dbl> ## 1 2008 NM 730.7395 ## 2 2012 WI 769.2008 ## 3 2011 KS 763.1039 ## 4 2006 WI 728.6378 ## 5 2014 AR 750.9074 ## 6 2015 MI 752.5942
Even though we are using Spark to do most of computation, we can still proceed as though we were working entirely in R. For example, we could create a function for evaluating the average credit attribute, which will make it easier to obtain different summarizations:
year_state_sum <- function(val = "credit_score") { library(lazyeval)
year_state <- freddie %>% group_by(orig_year, state) %>% summarise_(sum_val = interp(~mean(var), var = as.name(val))) year_state <- year_state %>% collect names(year_state)[3] <- paste0("ave_", val) return(year_state) }
Summarize, Plot, ...?, Profit!
While our original dataset was very large, our resulting aggregated result is pretty small. The length of the output is just the number of years of data we have times the number of states:
$$ \text{number of rows} = \sum years \times\sum states $$
Therefore, we could collect that object into a local R data.frame and plot it using the plethora of plotting libraries in R. In this case, we have data at the state-level, so I will use the awesome rMaps
to make a htmlwidget containing average debt-to-income by state and year:
library(rMaps) year_state_sum("dti") %>% mutate(year = as.numeric(orig_year)) %>% rMaps::ichoropleth(ave_dti ~ state, data = ., animate = "year", geographyConfig = list(popupTemplate = "#!function(geo, data) { return '<div class=\"hoverinfo\"><strong>'+ data.state + '<br>' + 'Average DTI in '+ data.year + ': ' + data.ave_dti.toFixed(2)+ '</strong></div>';}!#")) -> state_dti state_dti$save("StateMapDTI.html", cdn = T) htmltools::includeHTML("StateMapDTI.html")
Click here for a larger version without the scrollbar.
The End
Thank you for reading! If this post was interesting, I welcome you to check out my course on R and Spark here and my in-development booklet on Spark and R. I also spoke about Spark and R at the recent Spark Summit in Brussels, and you can find the recording of that talk here. Thanks!
There is no "R Server on Spark" option in my Cluster Configuration - there is R Server OR Spark...
Posted by: Jeff van Geete | November 15, 2016 at 09:37
Also,
Error in force(code) :
Failed while connecting to sparklyr to port (8880) for sessionid (6826): Gateway in port (8880) did not respond.
Path: C:\Users\jvangeete\Apache\Spark-2.0.2\bin\spark-submit2.cmd
Parameters: --class, sparklyr.Backend, "C:\Users\jvangeete\Documents\R\win-library\3.3\sparklyr\java\sparklyr-2.0-2.11.jar", 8880, 6826
---- Output Log ----
The system cannot find the path specified.
---- Error Log ----
Posted by: Jeff van Geete | November 15, 2016 at 14:21
Thanks for the comment, Jeff!
You're right, the portal has changed slightly. The correct choice for the cluster configuration is just "R Server". It will automatically build on top of Spark (1.6.2 for a few more days, then 2.0!).
Did that error come when you tried to use `spark_connect("yarn-client")`? Could you try
`sc <- spark_connect("yarn",
config = list(default = list(spark.submit.deployMode= "client"))` instead? It appears you are using Spark 2.0 clusters rather than the 1.6.2 clusters I used for this post.
Thanks,
Ali
Posted by: Ali Zaidi | November 15, 2016 at 21:33