Include these Spark Window Functions in your Data Science Workflow

Original article was published on Artificial Intelligence on Medium

Rolling Aggregations

Sometimes it helps to provide rolling averages to our models. For example, we might want to have a rolling 7-day sales sum/mean as a feature for our sales regression model. Let us calculate the rolling mean of confirmed cases for the last seven days here. This is what a lot of the people are already doing with this dataset to see the real trends.

from pyspark.sql.window import WindowwindowSpec = Window().partitionBy(['province']).orderBy('date').rowsBetween(-6,0)timeprovinceWithRoll = timeprovince.withColumn("roll_7_confirmed",F.mean("confirmed").over(windowSpec))timeprovinceWithRoll.filter(>'2020-03-10').show()

There are a few things here to understand. First is the rowsBetween(-6,0) function that we are using here. This function has a form of rowsBetween(start,end) with both start and end inclusive. Using this, we only look at the past seven days in a particular window, including the current_day. Here 0 specifies the current_row, and -6 specifies the seventh row previous to current_row. Remember, we count starting from 0.

So to get roll_7_confirmed for date 2020–03–22 we look at the confirmed cases for dates 2020–03–22 to 2020–03–16 and take their mean.

If we had used rowsBetween(-7,-1) we would just have looked at the past seven days of data and not the current_day.

One could also find a use for rowsBetween(Window.unboundedPreceding, Window.currentRow) where we take the rows between the first row in a window and the current_row to get running totals. I am calculating cumulative_confirmed here.

from pyspark.sql.window import WindowwindowSpec = Window().partitionBy(['province']).orderBy('date').rowsBetween(Window.unboundedPreceding,Window.currentRow)
timeprovinceWithRoll = timeprovince.withColumn("cumulative_confirmed",F.sum("confirmed").over(windowSpec))