Sentiment Analysis Part 1

Extract-Transform-Load with PySpark and MongoDB

Originally published in 2019.

I’ve been itching to learn some more Natural Language Processing and thought I might try my hand at the classic problem of Twitter sentiment analysis. I found labeled twitter data with 1.6 million tweets on the Kaggle website here. While 1.6 million tweets is not a substantial amount of data and does not require working with Spark, I wanted to use Spark for ETL as well as modeling since I haven’t seen too many examples of how to do so in the context of Sentiment Analysis. In addition, since I was working with text data I thought I would use MongoDB, since it allows for flexible data models and is easy to use. Luckily Spark and MongoDB work well together and I’ll show how to work with both later.

At first I figured I would make this one blog post, but after getting started I realized it was a significant amount of material and would break it into two posts. This first post covers the topics of ETL working with Spark and MongoDB. The second post will deal with the actual modeling of sentiment analysis using Spark. The source code for this post can be found here.

Spark is a parallel processing framework that has become the de-facto standard in data engineering for extract-transform-load (ETL) operations. It has a number of features that make it great for working with large data sets including:

  • Natural integration with Hadoop for working with large distributed datasets
  • Fault tolerance
  • Lazy evaluation that allows for behind the scenes optimizations

Spark is also great because it enables developers to use a signal framework for working with structured and unstructured data, machine learning, graph computations and even streaming. Some references that I have used for working with Spark in the past include:

In this blog post I will NOT be covering the basics of Spark, there are plenty of other resources (like those above) that will a better job than I can. Instead, I want to cover the basics of working with Spark for ETL on text data. I’ll explain the steps of ETL I took in detail in this post. While I used a notebook for development, in practice I wrote a Python script that I used to the perform batch analysis. You can find that script here. The script was used to connect to my Atlas MongoDB cluster and I had to change the normalize UDF so that the results are strings instead of arrays of string. This was necessary so that the resulting collection was within the storage limits of the free tier.

Now let’s dive into the extract-transform-load operations in Spark and MongodDB!

First we download and extract the dataset from the Kaggle website:

Next we import the datatypes that we will need for ETL and the functions module from spark.sql

Now we need to define the schema of the CSV file we want to read. Alternately, we could have Spark infer the schema, however, this would take longer since Spark would have to scan the file twice: once to infer the schema and once to read in the data.

Now we can define the path to the file, specify its format, schema and then “read” it in as a Dataframe. Since I am working in standalone mode on my local machine I’ll use the address of the csv in my local filesystem:

I put read in quotations since Spark uses a lazy-evaluation model for computation. This means that the csv is not actually read into the worker nodes (see this for definition) until we perform an action on it. An action is any operation that,

  • writes to disk
  • brings results back to the driver (see this for definition), i.e. count, show, collect, toPandas,

Even though we have not read in the data, we can still obtain metadata on the dataframe such as its schema:

Let’s take a look at the first few rows in our dataframe:

We can see that the table has a target field which is the label of whether the sentiment was positive or negative, an id which is a unique number for the tweet, a date field, a flag field (which we will not use), the user field which is the twitter user's handle and the actual tweet which is labeled as text. We'll have to do transformations on all the fields (except flag which we will drop) in order to get them into the correct format. Specifically, we will:

  1. Extract relevant fields information the date field
  2. Clean and transform the text field

Transformations in Spark are computed on executor nodes (computations in Spark occur where the data is in memory/disk which is the data nodes in Hadoop) and use lazy evaluation. The fact transformations are lazy is a very useful aspect of Spark because we can chain transformations together into Directed Acyclic Graphs (DAG). Because the transformations are lazy, Spark can see the entire pipeline of transformations and optimize the execution of operations in the DAG.

We perform most of our transformations on our Spark dataframes in this post by using User Defined Functions or UDFs. UDFs allow us to transform one Spark dataframe into another. UDFs act on one or more columns in a dataframe and return a column vector that we can assign as a new column to our dataframe. We’ll first show how we define UDFs to extract relevant date-time information from the date field in our dataframe. First let's take a look at the actual date field:

Note that we could not use the .show(N) method and had to use the .take(N) method. This returns the first N rows in our dataframe as a list of Row objects; we used this method because it allows us to see the entire string in the date field while .show(N) would not.

Our first transformation will take the above strings and return the day of the week associated with the date-time in that string. We write a Python function to do that:

Next we define the desired transformation on the dataframe using a Spark UDF. UDFs look like wrappers around our Python functions with the format:

Note that specifying the return type is not entirely necessary since Spark can infer this at runtime, however, explicitly declaring the return type does improve performance by allowing the return type to be known at compile time.

In our case the UDF for the above function becomes:

Now we apply the UDF to columns to our dataframes and the results are appended as a new column to our dataframe. This is efficient since Spark dataframes use column-based storage. In general we would write the transformation as:

With the above UDF our example becomes:

We can now see the results of this transformation:

(Note: If this were an extremely big table, a more efficient way to do this operation would be to use a regular expression to pull out the day of the week abbreviation as a new column. Then we could use a broadcast join with a look-up-table which has the day abbreviation to day of week.)

Another way to define UDFs is by defining them on Lambda functions. An example is shown below:

This UDF takes the date field which is a string and splits the string into an array using white space as the delimiter. This was the easiest way I could think of to get the month, year, day and time information from the string in the date field. Notice that while the return type of the Python function is a simple list, in Spark we have to be more specific and declare the return type to be an array of strings.

We can define a new dataframe which is the result of appending this new array column:

We can see the result of this transformation below by using the toPandas() function to help with the formatting

png

One other thing to note is that Spark Dataframes are based on Resilient Distributed Datesets (RDDs) which are immutable, distributed Java objects. It is preferred when using structured data to use dataframes over RDDs since the former has built-in optimizations and compression. The fact RDDs are immutable means that Dataframes are immutable. While we can still call the resulting dataframe from transformations with the same variable name df, the new dataframe is actually pointing to a completely new object under-the-hood. Many times it is desirable to call the resulting dataframes by the same name, but sometimes we have to give the new dataframe a different variable name like we did in the previous cell. We do this for convenience sometimes and other times because we do not want to violate the acyclic nature of DAGS.

Next let’s define a more few functions to extract the day, month, year, time and create a timestamp for the tweet. The functions will take as an input the date_array column. That is it will take as input the array of strings that results from the delimiting of the date field by whitespaces. We don't show how these functions are defined (see source code), but rather import them from ETL.src:

Now we create UDFs around these functions as well as creating them around lambda functions to change the target field form 0 to 1:

Now we apply the above UDFs just as we did before. We can get the month of the tweet from the date_array column by applying the getMonthUDF function with the following:

Note that we had to use the notation F.col('input_col') instead of df['input_col']. This is because the column date_array is a derived column from the original dataframe/csv. In order for Spark to be able to act on derived columns we need to use the F.col to access the column instead of using the dataframe name itself.

Note: If I working on a cluster, the functions in ETL.src.date_utility_functionswould not exist on the executors (or the datanodes) of the cluster and I would get an error. I would have to send these functions to the executors using the addPyFile if working from a notebook or adding them to the command when submitting the batch job.

Now we want to apply multiple different UDFs (getYearUDF, getDayUDF, getTimeUDF) to the same date_array column. We could list these operations all out individually as we did before, but since the input is not changing we can group all the UDFs as well as their output column names into a list,

and then iterate through that list applying the UDFS to the single input column,

Now we want want to store an actual datetime object for the tweet and use the timeStampUDF function to do so. Notice how easy it is use UDFs that have multiple input columns, we just list them out!

Now we have finished getting the date-time information from the date column on our dataframe. We now rename some of the columns and prepare to transform the text data.

We can take a look at our dataframe’s entries by running,

png

In order to clean the text data we first tokenize our strings. This means we create an array from the text where each entry in the array is an element in the string that was seperated by white space. For example, the sentence,

becomes,

The reason we need to tokenize is two part. The first reason is because we want to build up arrays of tokens to use in our bag-of-words model. The second reason is because it allows us to apply regular-expressions to individual words/tokens and gives us a finer granularity on cleaning our text.

We use the Tokenizer class in Spark to create a new column of arrays of tokens:

We can take a look at the results again:

png

Now we want to clean up the tweets. This means we want to remove any web addresses, call outs and hashtags. We do this by defining a Python function that takes in a list of tokens and performs regular expressions on each token to remove the unwanted characters and returns the list of clean tokens:

Now we write a UDF around this function:

Next we define our last function which removes any non-english characters from the tokens and wrap it in a Spark UDF just as we did above.

Now we apply these UDFs and remove any tweets that would result in an empty array of tokens after cleaning.

Looking at the results:

png

Now we come to the last stage in ETL, i.e. the stage where we write the data into our database. Spark and MongoDB work well together and writing the dataframe to a collection is as easy as declaring the format and passing in the names of the database and collection you want to write to:

That’s it for the part on ETL with Spark. Spark is a great platform from doing batch ETL work on both structured and unstructed data. MongoDB is a document based NoSQL database that is fast, easy to use, allows for flexible schemas and perfect for working with text data. PySpark and MongoDB work well together allowing for fast, flexible ETL pipelines on large semi-structured data like those coming from tweets.

In the next part we’ll take a look at working with our MongoDB database next!

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Mike Harmon

Machine Learning Engineer, Apache Spark Lover, NLP Enthusiast, PhD Applied Math, Former Athlete