Time Series Forecasting

Live Notebook

You can run this notebook in a live session Binder or view it on Github.

Time Series Forecasting

This example shows using Prophet and Dask for scalable time series forecasting.

Prophet is a procedure for forecasting time series data based on an additive model where non-linear trends are fit with yearly, weekly, and daily seasonality, plus holiday effects.

As discussed in the Forecasting at scale, large datasets aren’t the only type of scaling challenge teams run into. In this example we’ll focus on the third type of scaling challenege indentified in that paper:

[I]n most realistic settings, a large number of forecasts will be created, necessitating efficient, automated means of evaluating and comparing them, as well as detecting when they are likely to be performing poorly. When hundreds or even thousands of forecasts are made, it becomes important to let machines do the hard work of model evaluation and comparison while efficiently using human feedback to fix performance problems.

That sounds like a perfect opportunity for Dask. We’ll use Prophet and Dask together to parallize the diagnostics stage of research. It does not attempt to parallize the training of the model itself.

[1]:
import pandas as pd
from prophet import Prophet
Importing plotly failed. Interactive plots will not work.

We’ll walk through the example from the Prophet quickstart. These values represent the log daily page views for Peyton Manning’s wikipedia page.

[2]:
df = pd.read_csv(
    'https://raw.githubusercontent.com/facebook/prophet/master/examples/example_wp_log_peyton_manning.csv',
    parse_dates=['ds']
)
df.head()
[2]:
ds y
0 2007-12-10 9.590761
1 2007-12-11 8.519590
2 2007-12-12 8.183677
3 2007-12-13 8.072467
4 2007-12-14 7.893572
[3]:
df.plot(x='ds', y='y');
../_images/applications_forecasting-with-prophet_4_0.png

Fitting the model takes a handful of seconds. Dask isn’t involved at all here.

[4]:
%%time
m = Prophet(daily_seasonality=False)
m.fit(df)
/usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/prophet/forecaster.py:896: FutureWarning: The frame.append method is deprecated and will be removed from pandas in a future version. Use pandas.concat instead.
  components = components.append(new_comp)
CPU times: user 2.46 s, sys: 108 ms, total: 2.56 s
Wall time: 2.61 s
[4]:
<prophet.forecaster.Prophet at 0x7f73c3efc9d0>

And we can make a forecast. Again, Dask isn’t involved here.

[5]:
future = m.make_future_dataframe(periods=365)
forecast = m.predict(future)
m.plot(forecast);
/usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/prophet/forecaster.py:896: FutureWarning: The frame.append method is deprecated and will be removed from pandas in a future version. Use pandas.concat instead.
  components = components.append(new_comp)
/usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/prophet/forecaster.py:896: FutureWarning: The frame.append method is deprecated and will be removed from pandas in a future version. Use pandas.concat instead.
  components = components.append(new_comp)
../_images/applications_forecasting-with-prophet_8_1.png

Parallel Diagnostics

Prophet includes a prophet.diagnostics.cross_validation function method, which uses simulated historical forecasts to provide some idea of a model’s quality.

This is done by selecting cutoff points in the history, and for each of them fitting the model using data only up to that cutoff point. We can then compare the forecasted values to the actual values.

See https://facebook.github.io/prophet/docs/diagnostics.html for more.

Internally, cross_validation generates a list of cutoff values to try. Prophet fits a model and computes some metrics for each of these. By default each model is fit sequentially, but the models can be trained in parallel using the parallel= keyword. On a single machine parallel="processes" is a good choice. For large problems where you’d like to distribute the work on a cluster, use parallel="dask" after you’ve connected to the cluster by creating a Client.

[6]:
import dask
from distributed import Client, performance_report
import prophet.diagnostics

client = Client(threads_per_worker=1)
client
[6]:

Client

Client-97840748-0de0-11ed-9f87-000d3a8f7959

Connection method: Cluster object Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status

Cluster Info

[7]:
%%time
df_cv = prophet.diagnostics.cross_validation(
    m, initial="730 days", period="180 days", horizon="365 days",
    parallel="dask"
)
INFO:prophet:Making 11 forecasts with cutoffs between 2010-02-15 00:00:00 and 2015-01-20 00:00:00
INFO:prophet:Applying in parallel with <Client: 'tcp://127.0.0.1:44751' processes=2 threads=2, memory=6.78 GiB>
CPU times: user 868 ms, sys: 122 ms, total: 990 ms
Wall time: 27.5 s

Be sure to watch the Dask Dashboard as that runs. The models are fit in parallel on the cluster. At the start there’s bit of overhead from having to move the model and data to the workers but after that the scaling looks quite nice.