Natural Language Processing using Spark and Pandas

Eugenio Contreras
5 min readApr 23, 2019

I created this story to do a short demonstration of this library called NLP. This exercise is going to be really fun and beginner friendly.

We are going to analyze a dataset from Reddit and figure out what are the most common words. Just to clarify, this dataset is really small and it works just for practice but you can apply the same methods into other datasets too.

The DataSet came from Kaggle: Link

Pre-requisites

To follow this tutorial you need to install:
- pyspark
- spark-nlp
- pandas

You can do it just running the following code in Jupyter Notebook or Terminal with PyPi:

!pip install pyspark
!pip install spark-nlp==2.0.1
!pip install pandas

Let’s start with the tutorial. First import pandas Library and set the column width to 800. This is just for better visualization of the DataFrame

import pandas as pd
pd.set_option(‘max_colwidth’, 800)

Now we need to create a SparkSession. We're going declare a Spark package to use the NLP library and count the most common words from our dataset, that’s our objective with this project.

from pyspark.sql import SparkSessionspark = SparkSession \
.builder \
.config("spark.jars.packages", "JohnSnowLabs:spark-nlp:1.8.2") \
.getOrCreate()

Declare a path variable and read all CSV files with the SparkSession created before.

Set a header option as true and CSV format

path = '../input/*.csv'
df = spark.read.format('csv').option('header', 'true').load(path)
df.limit(5).toPandas()

Our goal with this project is to count the most common words from Reddit’s posts, so we don’t want null comments.

Let’s filter all null rows from the comment column.

df = df.filter('comment is not null')

I’m going to create a new DataFrame using explode and split functions of pyspark.

The purpose of this is to create a new column called word, this new column will contain all the words of our comments split with spaces.

from pyspark.sql.functions import split, explode, descdfWords = df.select(explode(split('comment', '\\s+')).alias('word')) \
.groupBy('word').count().orderBy(desc('word'))
dfWords.printSchema()
dfWords.orderBy(desc('count')).limit(5).toPandas()

Our new DataFrame doesn’t look so good, as you can see, we have null rows, pronouns, etc.

Our goal is to count the relevant words from posts. That’s why we are going to use NLP library. Natural Language Processing library will classify every word from the dataset as Noun, Pronoun, Verbs, etc.

from com.johnsnowlabs.nlp.pretrained.pipeline.en import BasicPipeline as bpdfAnnotated = bp.annotate(df, 'comment')
dfAnnotated.printSchema()

The Schema looks like this:

root
|-- Author: string (nullable = true)
|-- text: string (nullable = true)
|-- Score: string (nullable = true)
|-- ID: string (nullable = true)
|-- document: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- annotatorType: string (nullable = true)
| | |-- begin: integer (nullable = false)
| | |-- end: integer (nullable = false)
| | |-- result: string (nullable = true)
| | |-- metadata: map (nullable = true)
| | | |-- key: string
| | | |-- value: string (valueContainsNull = true)
|-- token: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- annotatorType: string (nullable = true)
| | |-- begin: integer (nullable = false)
| | |-- end: integer (nullable = false)
| | |-- result: string (nullable = true)
| | |-- metadata: map (nullable = true)
| | | |-- key: string
| | | |-- value: string (valueContainsNull = true)
|-- normal: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- annotatorType: string (nullable = true)
| | |-- begin: integer (nullable = false)
| | |-- end: integer (nullable = false)
| | |-- result: string (nullable = true)
| | |-- metadata: map (nullable = true)
| | | |-- key: string
| | | |-- value: string (valueContainsNull = true)
|-- lemma: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- annotatorType: string (nullable = true)
| | |-- begin: integer (nullable = false)
| | |-- end: integer (nullable = false)
| | |-- result: string (nullable = true)
| | |-- metadata: map (nullable = true)
| | | |-- key: string
| | | |-- value: string (valueContainsNull = true)
|-- pos: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- annotatorType: string (nullable = true)
| | |-- begin: integer (nullable = false)
| | |-- end: integer (nullable = false)
| | |-- result: string (nullable = true)
| | |-- metadata: map (nullable = true)
| | | |-- key: string
| | | |-- value: string (valueContainsNull = true)

* text: original text from comment column.
* pos.metadata: will contain a key, value for every word.
* pos.result: column is an array with a bunch of tags for every word in the DataSet.

Here is the list of NLP tags https://cs.nyu.edu/grishman/jet/guide/PennPOS.html

dfPos = dfAnnotated.select("text", "pos.metadata", "pos.result")
dfPos.limit(5).toPandas()

Let’s create a new DataFrame with the pos struct

dfSplitPos = dfAnnotated.select(explode("pos").alias("pos"))
dfSplitPos.limit(5).toPandas()

I want to count every word with the tag NNP or NNPs which means:
* NNP Proper noun, singular
* NNPS Proper noun, plural

NNPFilter = "pos.result = 'NNP' or pos.result = 'NNPs'"
dfNNPFilter = dfSplitPos.filter(NNPFilter)
dfNNPFilter.limit(10).toPandas()

I’m going to use the selectExpr function to create a new DataFrame with a word and tag columns

dfWordTag = dfNNPFilter.selectExpr("pos.metadata['word'] as word", "pos.result as tag")
dfWordTag.limit(10).toPandas()

Finally, we have our DataSet as we want and we can start counting the most common words from Reddit posts.

dfCountWords = dfWordTag.groupBy('word').count().orderBy(desc('count'))
dfCountWords.limit(20).toPandas()

Our DataFrame doesn’t say so much because the dataset is a little small, the idea is to apply this method into other projects, this is just for practice and discover what you can do with NLP library

Please feel free to let me know your thoughts about this and what I can do better for the next exercise.

You can find the entire tutorial on my GitHub repository: Link

You can reach me on Medium or Github

* https://github.com/kennycontreras
* https://medium.com/@kennycontreras

--

--

Eugenio Contreras

from coffee import * ☕️ Data Architect & Engineer 👨🏻‍💻