PySpark in Machine Learning

PySpark is the API of Python to support the framework of Apache Spark. Apache Spark is the component of Hadoop Ecosystem, which is now getting very popular with the big data frameworks.

Apache Spark is a very powerful component which provides real time stream processing, interactive frameworks, graphs processing, batch processing and in-memory processing in a very fast speed.

In python we can access the Apache Spark using PySpark, as the work in machine learning is increasing with the use of Apache Spark, you should know how to deal with this component. As python is one of the most simple programming languages, PySpark framework is also not difficult. So, let’s dive into PySpark to understand how it will help in Machine Learning.

PySpark in Machine Learning

To explain PySpark , I will use a real time machine learning problem, so that you can understand how to apply this library in your dataset while working on a real time machine learning tasks. You can download the dataset I will use in this article below.

Data Exploration with PySpark

The dataset that I have taken for this article is related to direct marketing campaigns (phone calls) of a Portuguese banking institution. The classification goal is to predict whether the client will subscribe (Yes/No) to a term deposit. Now, lets start with the PySpark:

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('ml-bank').getOrCreate()
df = spark.read.csv('bank.csv', header = True, inferSchema = True)
df.printSchema()Code language: Python (python)
Image for post
  • Input variables in the dataset: age, job, marital, education, default, balance, housing, loan, contact, day, month, duration, campaign, pdays, previous, poutcome.
  • Output variable in the dataset: deposit

Now let’s have a look at the first five observations. Using the pandas data frame is prettier than Spark DataFrame.show().

import pandas as pd
pd.DataFrame(df.take(5), columns=df.columns).transpose()Code language: Python (python)
Image for post

Our classes are perfectly balanced.

import pandas as pd
pd.DataFrame(df.take(5), columns=df.columns).transpose()Code language: Python (python)
Image for post

Summary statistics for numeric variables

numeric_features = [t[0] for t in df.dtypes if t[1] == 'int']
df.select(numeric_features).describe().toPandas().transpose()Code language: Python (python)
Image for post

Correlations between independent variables.

numeric_data = df.select(numeric_features).toPandas()
axs = pd.scatter_matrix(numeric_data, figsize=(8, 8));
n = len(numeric_data.columns)
for i in range(n):
    v = axs[i, 0]
    v.yaxis.label.set_rotation(0)
    v.yaxis.label.set_ha('right')
    v.set_yticks(())
    h = axs[n-1, i]
    h.xaxis.label.set_rotation(90)
    h.set_xticks(())Code language: Python (python)
Image for post

It’s quite obvious that there aren’t any highly correlated numeric variables. Therefore, I will keep all of them for the machine learning model. However, day and month columns are not really useful, we will remove these two columns.

df = df.select('age', 'job', 'marital', 'education', 'default', 'balance', 'housing', 'loan', 'contact', 'duration', 'campaign', 'pdays', 'previous', 'poutcome', 'deposit')
cols = df.columnsCode language: Python (python)

Data Preparation with PySpark for Machine Learning

This process will include categorical indexing, one-hot encoding and vector assembling (a feature transformer that joins multiple columns into one vector).

from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler
categoricalColumns = ['job', 'marital', 'education', 'default', 'housing', 'loan', 'contact', 'poutcome']
stages = []
for categoricalCol in categoricalColumns:
    stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + 'Index')
    encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    stages += [stringIndexer, encoder]
label_stringIdx = StringIndexer(inputCol = 'deposit', outputCol = 'label')
stages += [label_stringIdx]
numericCols = ['age', 'balance', 'duration', 'campaign', 'pdays', 'previous']
assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]Code language: Python (python)

The above code indexes each categorical column using the StringIndexer, and then converts the indexed categories into one-hot encoded variables. The resulting output has the binary vectors appended to the end of each row. I will use the StringIndexer again to encode our labels to label indices. Then, I will use the VectorAssembler to combine all the feature columns into a single vector column.

Creating Pipelines with PySpark

In Machine Learning we use Pipeline to chain multiple Transformers and Estimators together to specify our machine learning workflow. A Pipeline’s stages are specified as an ordered array. In PySpark we use pipelines as below:

from pyspark.ml import Pipeline
pipeline = Pipeline(stages = stages)
pipelineModel = pipeline.fit(df)
df = pipelineModel.transform(df)
selectedCols = ['label', 'features'] + cols
df = df.select(selectedCols)
df.printSchema()Code language: Python (python)
Image for post
pd.DataFrame(df.take(5), columns=df.columns).transpose()Code language: Python (python)
PySpark

As you can see, we now have features column and label column. Now, randomly split the data into training and test sets, and set seed for reproducibility.

train, test = df.randomSplit([0.7, 0.3], seed = 2018)
print("Training Dataset Count: " + str(train.count()))
print("Test Dataset Count: " + str(test.count()))Code language: Python (python)

Training Dataset Count: 7764
Test Dataset Count: 3398

Logistic Regression Model with PySpark

I will use the Logistic Regression model as our Machine Learning Model. And this time I will train this model using PySpark.

from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(featuresCol = 'features', labelCol = 'label', maxIter=10)
lrModel = lr.fit(train)Code language: Python (python)

We can visualize the coefficients by using LogisticRegressionModel’s attributes as follows:

import matplotlib.pyplot as plt
import numpy as np
beta = np.sort(lrModel.coefficients)
plt.plot(beta)
plt.ylabel('Beta Coefficients')
plt.show()Code language: Python (python)
PySpark

Now, let’s summarize our logistic regression model:

trainingSummary = lrModel.summary
roc = trainingSummary.roc.toPandas()
plt.plot(roc['FPR'],roc['TPR'])
plt.ylabel('False Positive Rate')
plt.xlabel('True Positive Rate')
plt.title('ROC Curve')
plt.show()
print('Training set areaUnderROC: ' + str(trainingSummary.areaUnderROC))Code language: Python (python)
PySpark

Now let’s make predictions using the logistic regression model trained using PySpark:

predictions = lrModel.transform(test)
predictions.select('age', 'job', 'label', 'rawPrediction', 'prediction', 'probability').show(10)Code language: Python (python)
PySpark

Final Step: Evaluate the Model

from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator()
print('Test Area Under ROC', evaluator.evaluate(predictions))Code language: Python (python)

Test Area Under ROC 0.8858324614449619

Also, read – Predict Diabetes with Machine Learning.

I hope you liked this article on PySpark in Machine Learning. Feel free to ask your valuable questions in the comments section. Don’t forget to subscribe for our Daily Newsletters below, to receive daily post email notifications if you like my work.

Follow Us:

Aman Kharwal
Aman Kharwal

I'm a writer and data scientist on a mission to educate others about the incredible power of data📈.

Articles: 1433

Leave a Reply