Select Page
Recently I had the need to find all the Ngrams from large corpus. The NGrams ranged from 2 words to 40 words per ngram.

To calculate the longest Ngrams, I had to find Ngrams that are subset of larger Ngram and remove, keeping the longer one. This ending up with the Longest Ngrams.

While the code to find this using R is rather simple, running it on any single machine was not possible due to the size of the Corpus.

Here is the R code for that:

calc <- function(e, df){
    i <- 1
    while (!(grepl(e[[1]],df[i,1], fixed=TRUE, ignore.case = TRUE)) & i <=nrow(df)){

        i <- i + 1

    }       
    return (df[i,])
}


    reduced  <- lapply(input_df[,1], calc, df=input_df)
    output_df <- do.call(rbind,reduced)

Knime was not able to handle that much data either.

So I figured I should using Spark for this. With the distributed processing of Spark this should be a fairly fast process. And it was!

Here is the Spark Code to accomplish the said task. I used SparklyR mainly because it support DeplyR syntax. I am sure the same can be achieved using SparkR.

First load the SparklyR library and dataset. The data input Data has two columns- Ngram and Document_Frequency.

 

install.packages("sparklyr")
library(sparklyr)
library(dplyr)

sc <- spark_connect(method = "databricks")

Ngrams <- spark_read_csv(sc, name="Ngrams", path="/FileStore/tables/ngrams.csv", source = "csv",  inferSchema = "true")
Ngrams <- sdf_sample(Ngrams, fraction = 0.1, replacement = TRUE, seed = NULL)
NgramsDF <- collect(Ngrams)

The following is the secret sauce. The calc function finds the longest Ngram for any given sub-string.


calc <- function(e,df){ 
  dfSubset <- df[grep(e$Ngram, df$Ngram), ]
  return (dfSubset[1,])
}

Now we apply the calc function to the entire Spark Dataframe, passing the R Data Frame in the Context. spark_apply functions allows us to do that. spark_apply function also let’s us pass in the Context, which is very important in this, since the Context has to be available on all the nodes which are processing the Spark DataFrame. If we were using SparkR, we could have set this as the broadcast variable, which would made the R DataFrame available on the processing nodes.

reduced <- Ngrams %>% spark_apply(calc,context = {df <- NgramsDF})
reduced <- reduced %>% group_by(Ngram)
display(collect(reduced))
spark_write_table(reduced, name="longestngrams", mode="overwrite")

 

That’s it. Very simple. Extremely fast.

Here is the Databricks Notebook if you would like to see this in action:

https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/2261204958035783/1096802413860569/5266178915616677/latest.html