Predict Customer Churn with Pyspark

T.Duratorre
17 min readNov 26, 2021
Source https://smartmeasures.ai/

1. Project Motivation

Streaming services see a consant flux of customers subscribing or cancelling to their services. Every business in this sector would like to know who are those customer who are more likely to cancel their subscriptions. This is the so called problem of churn prediction, i.e. classify those customers deemed at risk of churn.

To tackle this problem we must resort to machine learning (ML) solutions that are able to process large data fast and give accurate enough results. Apache Spark is a data environment that takes into account all these issues, as data pipelines deployed on Spark use distributed systems like HDFS to assure the scalability of the ML models.

In this post we focus on a dataset of customers of a fictitious music streaming service called Sparkify. The goal of the project is to predict customer churn in the best way possible. We will use a small subset (128MB) of the customer dataset (12GB) to build, train and validate a model. Data is provided by Udacity.

Customer chiurn prediction can be considered as a binary classification problem, where the variable of interest is in fact the label “churn” or “not-churn” for each customer. In order to evaluate the goodness of the model, we will use several metrics, like accuracy, precision, recall and f1 score.

2. Data Exploration before churn

First of all, let’s look at the data and check column number, data types and the amount of nulls.

df.printSchema()
# checking number of nulls and empties in the columns
empty = []
nans = []
nulls = []
for c in df.columns:
empty.append(df.filter(col(c) == '').count())
nans.append(df.filter(isnan(c)).count())
nulls.append(df.filter(isnull(c)).count())
print('empty: {}\nnans: {}\nnulls: {}'.format(empty, nans, nulls))

Dataframe contains personal information about each user, like gender, subscripition level, first and last name, location, and information about user’s interaction with the service, like action taken, song and artist listened, items in each session etc.

There are many null values in the dataframe, as columns like song have no value when a certain user performs other actions than playing a song, like visiting homepage, changing settings, liking a song. Moreover, there are some records that have no information about userId, as these are the case when actions are performed without having users logged in beforehand.

Since we are interested in predicting customer churn, we need to relate each action to a specific user, so we first remove those rows with no information about userId, then we can check the total number of actions performed by users when they are not playing songs.

Fig. 2.1: Total count of actions performed by all users

As for further analysis, we can look at brief statistics regarding the total songs played by the users and the time period they have been subscribed to the service. We can see that on average, users play around 1000 songs, with minimums and maximums respectively of 3 and 8000, whereas the average registration time is approximately 80 days, with minimums of less than a day and maximums of 256 days.

Fig. 2.2: Distribution of total songs played for a user
Fig. 2.3: Distribution of total days of registration time for a user

3. Data Exploration after churn

First of all, we determine churn by looking at the action performed. If a user shows in its page values “Cancellation Confirmation”, that user has requested to interrupt the service and has left Sparkify. This is the definition of churn in this project.

make_churn = udf(lambda x: 1 if x=="Cancellation Confirmation" else 0, IntegerType())
df = df.withColumn("churn", make_churn(df.page))

In the small dataset we use for the analysis, by using the churn definition above, out of 225 different user, 52 are churn users and 173 are not churn users.

Let’s look at some differences between churn and not churn users.

Fig 3.1: Total actions performed on average by churn and not churn users
Fig. 3.2: Total user count split by gender and churn/not churn
Fig. 3.3: Total action count performed on average by male users split by churn and not churn

From Fig. 3.1 it is clear that on average, users who cancel their subscriptions are less involved with the service, as on average they perform less of each action.

By looking at the gender differences in Fig. 3.2, users who do not churn are almost 50% male and 50% female, but users who cancel their subscription are predominantly males, with >60% males and <40% females.

Analyzing behaviour of males between the two groups in Fig. 3.3, the same conclusions as Fig. 3.1 can be made: male who leave the service are generally less involved.

As for behaviour differences between churn and not churn for location and userAgent, the small numbers of the dataset can’t tell us exactly if there are any relevant differences(Fig 3.4 & 3.5).

Fig. 3.4: User count split by location and churn/not churn
Fig. 3.5: User count split by userAgent and churn/not churn

4. Feature Engineering

Having explored the dataset, the features to be used at first tentative are all related to user actions and characteristics. User characteristics variables comprehend all the fixed variables, i.e. those variables that do not change across the user history like gender, whereas user actions encompass all the variables calculated with the user behaviour history, like the number of songs played.

We don’t consider information about userAgent and location, as these columns showed to not provide much information for the purpose of the project.

User characteristics: gender, level, days_since_registration

User actions: total_songs, total_likes, total_dislikes, upgrade, downgrade, total_seconds, pais

Gender will be taken directly from the dataset, as it is holds the same value for every user history, while level will be taken as the value in the last record, in terms of timestamp, of each user. The other columns are calculated as follows:

  • upgrade: if the user has ever upgraded (boolean)
  • downgrade: if the user has ever downgraded (boolean)
  • total_songs: count of all the action labelled as “NextSong” for each user
  • total_likes: count of all the action labelled as “ThumbsUp” for each user
  • total_dislikes: count of all the action labelled as “ThumbsDown” for each user
  • total_seconds: sum of the column length for each user
  • days_since_registration: number of days passed at the timestamp of each user last record in the dataset from the moment of registration
  • paid: whether the user has or had a paid subscription
# define the user functions to calculate the variables written above
is_thumbs_up = udf(lambda x: 1 if x == "Thumbs Up" else 0, IntegerType())
is_thumbs_down = udf(lambda x: 1 if x == "Thumbs Down" else 0, IntegerType())
has_upgraded = udf(lambda x: 1 if x == "Upgrade" else 0, IntegerType())
has_downgraded = udf(lambda x: 1 if x == "Downgrade" else 0, IntegerType())
is_song = udf(lambda x: 1 if x is not None else 0, IntegerType())
has_been_paid = udf(lambda x:1 if x == "paid" else 0, IntegerType())
# calculate the new columns
df_clean = df.withColumn("thumbsUp", is_thumbs_up(df.page))\
.withColumn("thumbsDown", is_thumbs_down(df.page))\
.withColumn("upgraded", has_upgraded(df.page))\
.withColumn("downgraded", has_downgraded(df.page))\
.withColumn("songCheck", is_song(df.song))\
.withColumn("paid", has_been_paid(df.level))
numerical_df = df_clean.groupBy("userId").agg(Fsum("songCheck").alias("total_songs"),\ Fsum("thumbsUp").alias("total_likes"),\ Fsum("thumbsDown").alias("total_dislikes"),\ max("upgraded").alias("upgrade"),\ max("downgraded").alias("downgrade"),\ Fround(Fsum("length"),2).alias("total_seconds"),\ max("paid").alias("paid"),\ max("churn").alias("churn"))

Let’s see now how the numerical features are correlated with the target feature churn (boolean), and see which variables are the most suitable for the prediction analysis.

# correlation analysis among the numerical features and churn variable
corr_df = ml_df.select("churn", "days_since_registration", "total_seconds", "total_songs", "total_likes", "total_dislikes", "upgrade", "downgrade", "paid").toPandas()
sns.set(rc={'figure.figsize':(10,10)},font_scale=1)
# plot correlation matrix
sns.heatmap(corr_df.corr(), annot=True)
Fig 4.1: Correlation matrix for the selected numerical features

total_songs and total_seconds are the same in term of correlation, therefore we need only one of the two. Also total_likes and total_dislikes are pretty similar, therefore we can keep only one. We’ll keep only total_likes, as it has a higher correlation with churn.

The final dataset looks like the following.

5. Modelling

The next step consists in the model set up. In the dataset we have categorical and numerical variables, therefore they need to be treated differently. We will use StringIndexer and OneHotEncoder to handle categorical variables, and then combine them into a vector along with the numerical variables with VectorAssembler. This way, we have a dataset ready for ML analysis in Spark.

def assemble_ml_data(numeric_cols, index_cols, ml_df):

# indexers and econders needed to deal with categorical variables
indexers = [StringIndexer(inputCol=col_name, outputCol=col_name + "_index", handleInvalid='keep') for col_name in index_cols]
encoders = [OneHotEncoder(inputCol=indexer.getOutputCol(),
outputCol="{}_ohe".format(indexer.getOutputCol())) for indexer in indexers]
# collect all variables, categorical and numerical and assemble them into a vector column
assembler = VectorAssembler(inputCols=[encoder.getOutputCol() for encoder in encoders] + numeric_cols, outputCol="vecfeatures")
p_stages = indexers + encoders + [assembler]
pipeline = Pipeline(stages = p_stages)
data = pipeline.fit(ml_df).transform(ml_df)

return data

To make our prediction, we will use the dataset we just calculated with assemble_ml_data along with StandardScaler and a transformer, combine them into a pipeline and fit the train portion of the dataset to the pipeline. We will use CrossValidation to determine the best parameters of each transformer used.

def make_prediction(data, transformer=None, paramGrid=None):

# split data into train and test sets and set the pipeline
train, test = data.randomSplit([0.8, 0.2], seed=42)
standardscaler = StandardScaler(inputCol="vecfeatures", outputCol="features", withMean=True, withStd=True)
pipeline = Pipeline(stages=[standardscaler, transformer])

# if a parameter grid is passed, the function gives the possibility to perform cross-validation
if paramGrid != None:
crossval = CrossValidator(estimator=pipeline,
estimatorParamMaps=paramGrid,
evaluator=MulticlassClassificationEvaluator(),
numFolds=3)

model = crossval.fit(train.withColumnRenamed("churn", "label"))
else:
model = pipeline.fit(train.withColumnRenamed("churn", "label"))

results = model.transform(test)

# output both the model and the dataset with the prediction column
return model, results

Since this is a classification problem, we will test the algorithm with three methods: Random Forest (RF), Linear Regression (LR) and Linear SVC (LSVC).

Once the model has predicted the output variable from the test portion of the dataset, we will calculate the goodness of the model with accuracy, precision, recall and f1 score.

def evaluation_metrics(results, label_col="label", pred_col="prediction"):

pandas_df = results.select(label_col, pred_col).toPandas()
print("Accuracy: ", sum(pandas_df[label_col] == pandas_df[pred_col])/pandas_df.shape[0])
print(classification_report(np.array(pandas_df[label_col]), np.array(pandas_df[pred_col])))

6.Metrics

We will look at several metrics to score our models. Usually, when it comes to determine the goodness of a model, different metrics can be chosen. In the case of a classification problem, the most common metrics are accuracy, precision, recall, and f1 score.

Accuracy can be defined as the ratio of all the correct predictions over the total predictions made by the model. This metric is usually sufficient to score models, however in case of an imbalanced dataset, i.e. a dataset where the proportion of the different labels differ by a large measure, accuracy alone is not realiable.

For instance, in a dataset with two labels, with one of the two labels being present 90% of the time and the other one making up just 10% of the data, a classification model may label all the predictions as the class present 90% of the time.

This would result in the model having 90% accuracy, which usually is a great result. However, if the purpose of the model is to detect both classes, it would be useless as it doesn’t allow us to classify for the other label. That’s why other metrics are needed, like precision, recall, and f1 score.

Precision quantifies the ratio of correctly predicted cases of a class over the total number of that specific class predictions made by a classification model, i.e. how much of a certain number of predicted labels of the same class are actually belonging to that class.

On the other hand, recall quantifies the number of correct predictions of a certain class out of all the instances existing of that particular class, i.e. it provides an indication of how many instances of a particular class were missed in the prediction process.

Depending on the classification problem, one may decide to maximize either precision or recall. Maximizing precision will minimize the number of instances of predicted labels of a certain class that are actually belonging to other classes, while maximizing recall will maximize the number of labels of a certain class that are correctly predicted out of all the existing instances of that particular class.

Sometimes, in case of imbalanced classes, we want to maximize both metrics all together, however increases in recall usually determine decreases in precision. That’s why, instead of choosing one metric over the other, we can refer to a metric that comprises both precision and recall together, the f1 score

In this case study, focusing on the f1 score allows us to evaluate with one metric how well our model detects churn users, among those users labelled as churn, and how little it wrongly classify as churn those users who are actually labelled as not churn users.

Values for precision and recall range from 0 to 1, with 0 being the worst and 1 being the best, whereas values for f1 score goes from -∞ to 1, the latter being the best value possible.

7. Model Selection

The best transformer is chosen as the one who presents the best values for precision, recall and f1 score.

We also consider accuracy but with less importance, as the dataset is very imbalanced towards not churn users (almost 80% of the cases). In fact, in a binary classification problem a model may predict all labels as the most numerous one, resulting in high accuracy. Despite having good value for accuracy, the model can’t predict whether a customer is at risk of churn, therefore it’s useless for our purpose.

In a binary classification problem (0 or 1), the possible cases than can be generated by a prediction are four:

  • TP, i.e. true positives, when the model predicts positive and the actual value is positive
  • FP, i.e. false positives, when the model predicts positive but the actual value is negative
  • TN, i.e. true negatives, when the model predicts negative and the actual value is negative
  • FN, i.e. false negatives, when the model predicts negative but the actual value is positive

Given TP, FP, TN, and FN, the metrics chosen are defined as below.

  • Accuracy: (TP + TN)/(TP + FP + TN + FN)
  • Precision: TP/(TP + FP)
  • Recall: TP/(TP + FN)
  • F1 Score: (2 * Precision * Recall)/(Precision + Recall)

In our case the dataset is small, and we want to predict as best as possible both classes, therefore minimizing both FP and FN. Thus, we will focus mainly on getting the best f1 score.

Fig. 6.1: Metric values for the different classification algorithms

The best model is LR, with the best values for all the four metrics considered. RF performs well too, while LSVC is the worst, with very low precision and f1 score.

8. Model Tuning and Refinement

Now that we have chosen our model, it’s time to tune its parameters to increase its performance.

We use ParamGridBuilder to create the grid of parameters for model tuning. For LR and we look at regParam and threshold: the first will be evaluated for the values 0.0, 0.01, 0.05, 0.1, 0.5, 1, while the second will be evaluated for the values 0.3, 0.4, 0.5.

Before tuning however, we can look for potential class imbalance. In the dataset, 52 out of 225 user are churn user, therefore only 23% of user show the churn value at 1. When running a classification model, there’s the possbility that the model chooses to predict all labels as one of the two classes, and stillshowing good results in accuracy.This is the reason why we also take into account precision, recall and f1 score.

To handle class imbalance, we can introduce a weight column in the model that will equilibrate the importance of the classes to the model.

This weight column will be calculated as follows: given n_churn_users and n_not_churn_users respectively as the number of churn and not churn users, we will define a churn_weight variable and a not_churn_weight variable.

The two variables will be calculated as follows:

# let's calculate weights for churn and not churn users
churn_weight = n_not_churn_users/(n_churn_users + n_not_churn_users)
not_churn_weight = n_churn_users/(n_churn_users + n_not_churn_users)
label_weight = udf(lambda x: churn_weight if x==1 else not_churn_weight, DoubleType())
data = data.withColumn("weight", label_weight(data.churn))
data.select("churn", "weight").head(5)

This way, we will use these two variables to populate values in the weight column of the dataframe.

Running LR again with class weight and Cross-Validation, we obtain the following results:

Fig. 7.1: Metric values for Linear Regression algorithm

with regParam and threshold equal respectively to 0.01 and 0.5

9. A different approach

The variables selected to be fed into the machine learning models refer to the user whole lifetime activity on the platform, but they exclude crucial information like number of session and average session length.

As information like total songs played and total likes and dislikes can be somewhat useful to determine user engagement with the platform, it is also important to know how many sessions and how long it took for a user to reach such numbers.

We can imagine that the longer the average session for each user, and the more of them, the less likely the user will cancel their subscription.

The new features we will use for ML prediction will be the following.

  • total_upgrades: number of times the user has upgraded
  • total_downgrades: number of times the user hasdowngraded
  • avg_songs: average count of all the actions labelled as “NextSong” for each user per session
  • avg_thumbsUp: average count of all the actions labelled as “ThumbsUp” for each user per session
  • avg_thumbsDown: average count of all the actions labelled as “ThumbsDown” for each user per session
  • num_sessions: total sessions for each user
  • avg_session_duration: average session duration for each user
  • avg_session_gap: average time gap between two sessions for each user
  • days_since_registration: number of days passed at the timestamp of each user last record in the dataset from the moment of registration
  • paid: whether the user has or had a paid subscription
  • gender
  • level

These new columns are calculated as follows:

is_thumbs_up = udf(lambda x: 1 if x == "Thumbs Up" else 0, IntegerType())
is_thumbs_down = udf(lambda x: 1 if x == "Thumbs Down" else 0, IntegerType())
has_upgraded = udf(lambda x: 1 if x == "Submit Upgrade" else 0, IntegerType())
has_downgraded = udf(lambda x: 1 if x == "Submit Downgrade" else 0, IntegerType())
is_song = udf(lambda x: 1 if x is not None else 0, IntegerType())
has_been_paid = udf(lambda x:1 if x == "paid" else 0, IntegerType())
df_ses = df.withColumn("thumbsUp", is_thumbs_up(df.page))\
.withColumn("thumbsDown", is_thumbs_down(df.page))\
.withColumn("upgraded", has_upgraded(df.page))\
.withColumn("downgraded", has_downgraded(df.page))\
.withColumn("songCheck", is_song(df.song))\
.withColumn("paid", has_been_paid(df.level))
df_ses = df_ses.select("userId", "sessionId", "churn", "ts", "rgstr_time", "thumbsUp", "thumbsDown",
"upgraded", "downgraded", "songCheck", "paid") # no use of song length because of correlation seen before
# the group by is now done both at the user and session level
df_user_ses = df_ses.groupby("userId", "sessionId").agg(max("churn").alias("churn"),\ min("ts").alias("ses_start"),\ max("ts").alias("ses_end"),\ Fsum("thumbsUp").alias("thumbsUp"),\ Fsum("thumbsDown").alias("thumbsDown"),\ Fsum("upgraded").alias("upgraded"),\ Fsum("downgraded").alias("downgraded"),\ Fsum("songCheck").alias("num_songs"),\ max("paid").alias("paid"))
df_user_ses = df_user_ses.withColumn("sessionDuration_hours", (col("ses_end") - col("ses_start"))/1000.0/3600.0)# create window function
user_window = Window \
.partitionBy("userID") \
.orderBy(asc("ses_start"))
# get the timestamp at the end of the previous session
df_user_ses = df_user_ses.withColumn("prev_ses_end", lag(col("ses_end")).over(user_window))
# calculate session gap as the difference between the start date of the current session and the end date of the previous one
df_user_ses = df_user_ses.withColumn("sessiongap_hours", (col("ses_start") - col("prev_ses_end"))/1000.0/3600.0)
# summarize all the features as average per session
df_user = df_user_ses.groupby("userId").agg(max("churn").alias("churn"),\ count("sessionId").alias("num_sessions"),\ Fround(avg("thumbsUp"),2).alias("avg_thumbsUp"),\ Fround(avg("thumbsDown"),2).alias("avg_thumbsDown"),\ Fsum("upgraded").alias("total_upgrades"),\ Fsum("downgraded").alias("total_downgrades"),\ Fround(avg("num_songs"),2).alias("avg_songs"),\ Fround(avg("sessionDuration_hours"),2).alias("avg_session_duration"),\ Fround(avg("sessiongap_hours"),2).alias("avg_session_gap"),\ max("paid").alias("paid"))

The new dataframe looks like the following:

The correlation analysis of the numerical features shows that the new variable avg_session_gap is a good indicator of churn.

Fig 8.1: Correlation matrix for the selected numerical features

The new dataframes calculated has some null values in the avg_session_gap column. This is because some users have registered only one session on the streaming service, therefore with the way the calculation is done the variable assumes null values. To avoid this, we will set the null in this variable to 0.

Let’s look at the metrics now with LR and weight taken into account.

Fig. 8.2: Metric values for Linear Regression algorithm

The f1 score improved with this new approach, therefore we will consider the last approach as the one to use for the analysis.

10. Conclusion

  • For Sparkify, around 20% of the users decide to churn. Users who churn are more predominantly males. Users who churn show less involvement with Sparkify.
  • Among the two approaches taken, the one that takes into account all the information regarding user sessions, like session average duration and average gap between sessions shows the best results.
  • The features that are the best predictors of churn risk are the total time a user has been subscribed to the service and the average time gap between sessions.
  • The best algorithm is Linear Regression with weight column to handle class imbalance, with values of accuracy, recall, precision and f1 score of 0.85.

To recap, we followed the steps below:

  • Analyzed briefly the dataset to look for null values and relevant features or statistics useful to predict churn.
  • Defined churn and implemented a method to calculate it from the dataset.
  • Analyzed further the dataset to look for differences between users who churn and users who do not churn, and it turned out that churn users are predominantly males.
  • Calculated relevant features to be used for predicting churn (i.e. total songs played, total days of registration, etc.).
  • Selected the classification models to be scored, in order to determine the most suitable for our case study scenario.
  • Defined the necessary metrics to be evaluated in order to score the models.
  • Chose the best model (Linear Regression) and refined and tuned it with label weights and cross-validation.
  • Calculated new features (i.e. accounting for user’s average session duration, average gap between sessions, etc.) and repeated all the subsequent steps to obtain a better solution.

The dataset required a lot of preprocessing in order to get the right features, which allowed for a better solution.

It was a bit of challenge trying to think of all the possible categorical and numerical features to use, as it took a lot of digging into the data before understanding what each record is about.

Therefore, creativity is key and helps a lot in these cases, so for those who think they lack analytical and technical prowess, your creativity might make up for it!

Moreover, the majority of features comes from two columns, i.e. page and sessionId. It might be useful to reduce the number of columns in the original dataset, in order to save storage, as not all the columns are taken into account in the analysis.

A very interesting aspect is that, in the first approach, accounting for imbalanced classification actually made the results worse. In fact, the improvement on the correct prediction of the churn users came at cost of not churn users, who got more instances of wrong labelling, therefore making the overall solution look weaker.

However, by changing the features, introducing label weights improved the overall solution.

Future improvements are the following:

  • Prediction can be made not only on churn, but also we can try to predict when customers decide to downgrade, i.e. changing their subscription from a paid to a free one.
  • Deployment on Amazon Web Services (AWS), as the current analysis has been carried out on a small portion (128MB) of the entire dataset (12GB). It would be interesting to train and test the model against the full dataset.

As for the project, we learned that choosing the best features is just as important as choosing the right model. From the analysis we saw an improvement of f1 score with variables that take into account ueach user number of sessions and average time gap between sessions.

Selecting the best features also allows us to reduce the dataset size, and make computations less intensive.

Choosing the right metrics is also important based on the problem. In a binary classification problem like this one, it is crucial to be able to predict correctly both cases, i.e. which users decide to leave the service and which not, in order to customize actions as best as possible and maximize the profits for the business.

All scripts and analysis are available at the following github repo.

11. References

http://spark.apache.org/docs/latest/api/python/

--

--