使用🤗 Transformers进行概率时间序列预测

Using 🤗 Transformers for probabilistic time series prediction.

介绍

时间序列预测是一个重要的科学和业务问题,近年来在经典方法之外,也看到了基于深度学习模型的许多创新。经典方法(如ARIMA)和新型深度学习方法之间的一个重要区别是:

概率预测

通常情况下,经典方法是逐个拟合数据集中的每个时间序列的。这些方法通常被称为“单一”或“局部”方法。然而,在某些应用中,当处理大量时间序列时,训练一个“全局”模型对于学习来自多个不同来源的潜在表示是有益的。

一些经典方法是点估计的(意味着它们只输出每个时间步的单个值),并且模型是通过最小化与地面真实数据的L2或L1类型的损失来训练的。然而,由于预测通常用于某些实际的决策流程中,即使人类介入,提供预测的不确定性也是更有益的。这也被称为“概率预测”,与“点预测”相对应。这涉及到对概率分布进行建模,从中可以进行抽样。

所以简而言之,我们希望训练的不是本地的点预测模型,而是全局的概率模型。深度学习非常适合这一点,因为神经网络可以从几个相关的时间序列中学习表示,并且可以对数据的不确定性进行建模。

在概率设置中,通常学习某个选择的参数化分布(如高斯或学生T分布)的未来参数;或者学习条件分位函数;或者使用适应于时间序列设置的符合预测框架。方法的选择不会影响建模方面,因此通常可以将其视为另一个超参数。可以通过取经验均值或中位数将概率模型转化为点预测模型。

时间序列Transformer

在对顺序数据进行建模时,研究人员提出了使用循环神经网络(RNN)如LSTM或GRU,卷积网络(CNN)以及最近的Transformer等方法。这些方法自然地适用于时间序列预测。

在这篇博文中,我们将利用经典Transformer(Vaswani et al., 2017)进行单变量概率预测任务(即单独预测每个时间序列的一维分布)。编码器-解码器Transformer是预测的自然选择,因为它很好地包含了几个归纳偏差。

首先,在推理时使用编码器-解码器架构是有帮助的,通常对于一些已记录数据,我们希望预测未来的一些时间步。这可以类比于文本生成任务,在给定一些上下文的情况下,我们采样下一个标记并将其传递回解码器(也称为“自回归生成”)。同样,在这里,我们也可以根据某种分布类型进行采样,以提供预测直到我们期望的预测范围。这被称为贪婪采样/搜索,关于NLP设置的这方面有一篇很棒的博文。

其次,Transformer使我们能够在可能包含数千个时间点的时间序列数据上进行训练。由于注意力机制的时间和内存限制,可能无法一次将时间序列的所有历史输入模型。因此,可以考虑一些适当的上下文窗口,并在构建随机梯度下降(SGD)批次时从训练数据中抽样这个窗口和随后的预测长度大小的窗口。上下文大小的窗口可以传递给编码器,预测窗口可以传递给一个有因果屏蔽的解码器。这意味着解码器在学习下一个值时只能看到先前的时间步。这与训练机器翻译的经典Transformer的方式相同,称为“教师强制”。

与其他架构相比,Transformer的另一个好处是我们可以将缺失值(在时间序列设置中常见)作为编码器或解码器的附加掩码进行处理,并且仍然可以在不进行填充或插值的情况下进行训练。这类似于Transformers库中BERT和GPT-2等模型的attention_mask,用于不在注意矩阵的计算中包括填充标记。

Transformer架构的一个缺点是由于原始Transformer的计算和内存需求是二次的,所以上下文和预测窗口的大小受到限制,详见Tay等人,2020。此外,由于Transformer是一种强大的架构,与其他方法相比,它可能更容易过拟合或学习到虚假的相关性。

🤗 Transformers库提供了一个原始的概率时间序列Transformer模型,简称为Time Series Transformer。在下面的章节中,我们将展示如何在自定义数据集上训练这样的模型。

设置环境

首先,让我们安装必要的库:🤗 Transformers、🤗 Datasets、🤗 Evaluate、🤗 Accelerate和GluonTS。

正如我们将要展示的,GluonTS将用于将数据转换为特征以及创建适当的训练、验证和测试批次。

!pip install -q transformers

!pip install -q datasets

!pip install -q evaluate

!pip install -q accelerate

!pip install -q gluonts ujson

加载数据集

在本博文中,我们将使用Hugging Face Hub上可用的tourism_monthly数据集。该数据集包含澳大利亚366个地区的月旅游量。

该数据集是Monash时间序列预测库的一部分,是来自许多领域的时间序列数据集的集合。可以将其视为时间序列预测的GLUE基准。

from datasets import load_dataset

dataset = load_dataset("monash_tsf", "tourism_monthly")

如您所见,数据集包含3个拆分:训练集、验证集和测试集。

dataset

>>> DatasetDict({
        train: Dataset({
            features: ['start', 'target', 'feat_static_cat', 'feat_dynamic_real', 'item_id'],
            num_rows: 366
        })
        test: Dataset({
            features: ['start', 'target', 'feat_static_cat', 'feat_dynamic_real', 'item_id'],
            num_rows: 366
        })
        validation: Dataset({
            features: ['start', 'target', 'feat_static_cat', 'feat_dynamic_real', 'item_id'],
            num_rows: 366
        })
    })

每个示例包含几个键,其中starttarget是最重要的。让我们看一下数据集中的第一个时间序列:

train_example = dataset['train'][0]
train_example.keys()

>>> dict_keys(['start', 'target', 'feat_static_cat', 'feat_dynamic_real', 'item_id'])

start只是指示时间序列的开始(作为日期时间),target包含时间序列的实际值。

start将用于将与时间相关的特征添加到时间序列值中,作为模型的额外输入(例如”年的月份”)。由于我们知道数据的频率是monthly,我们知道例如第二个值的时间戳为1979-02-01,等等。

print(train_example['start'])
print(train_example['target'])

>>> 1979-01-01 00:00:00
    [1149.8699951171875, 1053.8001708984375, ..., 5772.876953125]

验证集包含与训练集完全相同的数据,只是时间跨度更长prediction_length。这样可以将模型的预测结果与真实值进行验证。

测试集与验证集相比又是一个prediction_length更长的数据(或者相对于训练集的多个滚动窗口的prediction_length更长的数据进行多次测试)。

validation_example = dataset['validation'][0]
validation_example.keys()

>>> dict_keys(['start', 'target', 'feat_static_cat', 'feat_dynamic_real', 'item_id'])

初始值与相应的训练示例完全相同:

print(validation_example['start'])
print(validation_example['target'])

>>> 1979-01-01 00:00:00
    [1149.8699951171875, 1053.8001708984375, ..., 5985.830078125]

然而,与训练示例相比,该示例具有prediction_length=24个额外值。让我们进行验证。

freq = "1M"
prediction_length = 24

assert len(train_example["target"]) + prediction_length == len(
    validation_example["target"]
)

让我们可视化一下:

import matplotlib.pyplot as plt

figure, axes = plt.subplots()
axes.plot(train_example["target"], color="blue")
axes.plot(validation_example["target"], color="red", alpha=0.5)

plt.show()

让我们拆分数据:

train_dataset = dataset["train"]
test_dataset = dataset["test"]

start更新为pd.Period

我们首先要做的是使用数据的freq将每个时间序列的start特征转换为pandas的Period索引:

from functools import lru_cache

import pandas as pd
import numpy as np

@lru_cache(10_000)
def convert_to_pandas_period(date, freq):
    return pd.Period(date, freq)

def transform_start_field(batch, freq):
    batch["start"] = [convert_to_pandas_period(date, freq) for date in batch["start"]]
    return batch

现在,我们使用datasetsset_transform功能在原地进行这个转换:

from functools import partial

train_dataset.set_transform(partial(transform_start_field, freq=freq))
test_dataset.set_transform(partial(transform_start_field, freq=freq))

定义模型

接下来,让我们实例化一个模型。该模型将从头开始训练,因此我们不会在这里使用from_pretrained方法,而是随机初始化一个config的模型。

我们对模型指定了一些额外的参数:

  • prediction_length(在我们的例子中为24个月):这是Transformer解码器学习预测的时间范围;
  • context_length:如果没有指定context_length,模型将把context_length(编码器的输入)设为prediction_length
  • 对于给定频率的lags:这些指定了我们要“向后看”的时间量,以添加为额外特征。例如,对于每日频率,我们可以考虑[1, 2, 7, 30, ...],即向后看1、2、…天;而对于分钟数据,我们可以考虑[1, 30, 60, 60*24, ...]等;
  • 时间特征的数量:在我们的例子中,这将是2,因为我们将添加MonthOfYearAge特征;
  • 静态分类特征的数量:在我们的例子中,这将仅为1,因为我们将添加一个单一的“时间序列ID”特征;
  • 基数:每个静态分类特征的值的数量,作为一个列表,对于我们的例子,将是[366],因为我们有366个不同的时间序列;
  • 嵌入维度:每个静态分类特征的嵌入维度,作为一个列表,例如[3]表示模型将为每个366个时间序列(地区)学习一个大小为3的嵌入向量。

让我们使用GluonTS提供的默认滞后值来处理给定频率(”monthly”):

from gluonts.time_feature import get_lags_for_frequency

lags_sequence = get_lags_for_frequency(freq)
print(lags_sequence)

>>> [1, 2, 3, 4, 5, 6, 7, 11, 12, 13, 23, 24, 25, 35, 36, 37]

这意味着我们将在每个时间步长上回溯最多37个月,作为额外的特征。

接下来,让我们来看看GluonTS提供的默认时间特征:

from gluonts.time_feature import time_features_from_frequency_str

time_features = time_features_from_frequency_str(freq)
print(time_features)

>>> [<function month_of_year at 0x7fa496d0ca70>]

在这种情况下,只有一个特征,即”month of year”。这意味着对于每个时间步长,我们将添加月份作为一个标量值(例如,如果时间戳是”january”,则为1,如果时间戳是”february”,则为2,依此类推)。

现在我们已经准备好定义模型了:

from transformers import TimeSeriesTransformerConfig, TimeSeriesTransformerForPrediction

config = TimeSeriesTransformerConfig(
    prediction_length=prediction_length,
    # 上下文长度:
    context_length=prediction_length * 2,
    # 从helper中获得的滞后值:
    lags_sequence=lags_sequence,
    # 我们将添加2个时间特征("month of year"和"age",详见后文):
    num_time_features=len(time_features) + 1,
    # 我们有一个单一的静态分类特征,即时间序列ID:
    num_static_categorical_features=1,
    # 它有366个可能的值:
    cardinality=[len(train_dataset)],
    # 模型将学习每个366个可能值的大小为2的嵌入:
    embedding_dimension=[2],
    
    # transformer参数:
    encoder_layers=4,
    decoder_layers=4,
    d_model=32,
)

model = TimeSeriesTransformerForPrediction(config)

请注意,类似于🤗 Transformers库中的其他模型,TimeSeriesTransformerModel对应于没有顶部任何头的编码器-解码器Transformer,而TimeSeriesTransformerForPrediction对应于在其顶部添加了一个分布头TimeSeriesTransformerModel。默认情况下,该模型使用学生-t分布(但可以进行配置):

model.config.distribution_output

>>> student_t

这与用于自然语言处理的Transformers模型有一个重要的区别,后者的头通常由一个固定的分类分布实现为nn.Linear层。

定义转换

接下来,我们定义数据的转换,特别是用于创建时间特征(基于数据集或通用时间特征)的转换。

同样,我们将使用GluonTS库来完成这个任务。我们定义一个转换链(类似于图像的torchvision.transforms.Compose),它允许我们将多个转换组合成单个管道。

from gluonts.time_feature import (
    time_features_from_frequency_str,
    TimeFeature,
    get_lags_for_frequency,
)
from gluonts.dataset.field_names import FieldName
from gluonts.transform import (
    AddAgeFeature,
    AddObservedValuesIndicator,
    AddTimeFeatures,
    AsNumpyArray,
    Chain,
    ExpectedNumInstanceSampler,
    InstanceSplitter,
    RemoveFields,
    SelectFields,
    SetField,
    TestSplitSampler,
    Transformation,
    ValidationSplitSampler,
    VstackFeatures,
    RenameFields,
)

下面的转换带有注释,以解释它们的作用。在高层次上,我们将遍历数据集的每个时间序列,并添加/删除字段或特征:

from transformers import PretrainedConfig

def create_transformation(freq: str, config: PretrainedConfig) -> Transformation:
    remove_field_names = []
    if config.num_static_real_features == 0:
        remove_field_names.append(FieldName.FEAT_STATIC_REAL)
    if config.num_dynamic_real_features == 0:
        remove_field_names.append(FieldName.FEAT_DYNAMIC_REAL)
    if config.num_static_categorical_features == 0:
        remove_field_names.append(FieldName.FEAT_STATIC_CAT)

    # 类似于torchvision.transforms.Compose
    return Chain(
        # 步骤1:如果未指定,则删除静态/动态字段
        [RemoveFields(field_names=remove_field_names)]
        # 步骤2:将数据转换为NumPy(可能不需要)
        + (
            [
                AsNumpyArray(
                    field=FieldName.FEAT_STATIC_CAT,
                    expected_ndim=1,
                    dtype=int,
                )
            ]
            if config.num_static_categorical_features > 0
            else []
        )
        + (
            [
                AsNumpyArray(
                    field=FieldName.FEAT_STATIC_REAL,
                    expected_ndim=1,
                )
            ]
            if config.num_static_real_features > 0
            else []
        )
        + [
            AsNumpyArray(
                field=FieldName.TARGET,
                # 对于多元时间序列,我们期望有一个额外的维度:
                expected_ndim=1 if config.input_size == 1 else 2,
            ),
            # 步骤3:通过用零填充目标来处理NaN,并返回掩码(观测值中的掩码)
            # 观测值为真,NaN为假
            # 解码器使用此掩码(未观察到的值不会产生损失)
            # 请参见xxxForPrediction模型中的loss_weights
            AddObservedValuesIndicator(
                target_field=FieldName.TARGET,
                output_field=FieldName.OBSERVED_VALUES,
            ),
            # 步骤4:基于数据集的频率添加时间特征
            # 如果频率为"M",则添加年份的月份
            # 这些作为位置编码
            AddTimeFeatures(
                start_field=FieldName.START,
                target_field=FieldName.TARGET,
                output_field=FieldName.FEAT_TIME,
                time_features=time_features_from_frequency_str(freq),
                pred_length=config.prediction_length,
            ),
            # 步骤5:添加另一个时间特征(仅为一个数字)
            # 告诉模型时间序列的值所在的位置,
            # 类似于一个运行计数器
            AddAgeFeature(
                target_field=FieldName.TARGET,
                output_field=FieldName.FEAT_AGE,
                pred_length=config.prediction_length,
                log_scale=True,
            ),
            # 步骤6:将所有时间特征在FEAT_TIME键中垂直堆叠
            VstackFeatures(
                output_field=FieldName.FEAT_TIME,
                input_fields=[FieldName.FEAT_TIME, FieldName.FEAT_AGE]
                + (
                    [FieldName.FEAT_DYNAMIC_REAL]
                    if config.num_dynamic_real_features > 0
                    else []
                ),
            ),
            # 步骤7:重命名以匹配HuggingFace的命名
            RenameFields(
                mapping={
                    FieldName.FEAT_STATIC_CAT: "static_categorical_features",
                    FieldName.FEAT_STATIC_REAL: "static_real_features",
                    FieldName.FEAT_TIME: "time_features",
                    FieldName.TARGET: "values",
                    FieldName.OBSERVED_VALUES: "observed_mask",
                }
            ),
        ]
    )

定义InstanceSplitter

为了训练/验证/测试,我们接下来创建一个InstanceSplitter,它用于从数据集中随机采样窗口(因为由于时间和内存限制,我们不能将整个历史值传递给Transformer)。

实例分割器从数据中采样随机大小为context_length的窗口,以及随后大小为prediction_length的窗口,并在相应的窗口的时间键中添加past_future_键。这确保了values将被拆分为past_values和随后的future_values键,分别用作编码器和解码器的输入。对于time_series_fields参数中的任何键,也会发生相同的情况:

from gluonts.transform.sampler import InstanceSampler
from typing import Optional

def create_instance_splitter(
    config: PretrainedConfig,
    mode: str,
    train_sampler: Optional[InstanceSampler] = None,
    validation_sampler: Optional[InstanceSampler] = None,
) -> Transformation:
    assert mode in ["train", "validation", "test"]

    instance_sampler = {
        "train": train_sampler
        or ExpectedNumInstanceSampler(
            num_instances=1.0, min_future=config.prediction_length
        ),
        "validation": validation_sampler
        or ValidationSplitSampler(min_future=config.prediction_length),
        "test": TestSplitSampler(),
    }[mode]

    return InstanceSplitter(
        target_field="values",
        is_pad_field=FieldName.IS_PAD,
        start_field=FieldName.START,
        forecast_start_field=FieldName.FORECAST_START,
        instance_sampler=instance_sampler,
        past_length=config.context_length + max(config.lags_sequence),
        future_length=config.prediction_length,
        time_series_fields=["time_features", "observed_mask"],
    )

创建DataLoaders

接下来,是时候创建DataLoaders了,这样我们就可以拥有批量的(输入,输出)对,换句话说就是(past_valuesfuture_values)。

from typing import Iterable

import torch
from gluonts.itertools import Cached, Cyclic
from gluonts.dataset.loader import as_stacked_batches


def create_train_dataloader(
    config: PretrainedConfig,
    freq,
    data,
    batch_size: int,
    num_batches_per_epoch: int,
    shuffle_buffer_length: Optional[int] = None,
    cache_data: bool = True,
    **kwargs,
) -> Iterable:
    PREDICTION_INPUT_NAMES = [
        "past_time_features",
        "past_values",
        "past_observed_mask",
        "future_time_features",
    ]
    if config.num_static_categorical_features > 0:
        PREDICTION_INPUT_NAMES.append("static_categorical_features")

    if config.num_static_real_features > 0:
        PREDICTION_INPUT_NAMES.append("static_real_features")

    TRAINING_INPUT_NAMES = PREDICTION_INPUT_NAMES + [
        "future_values",
        "future_observed_mask",
    ]

    transformation = create_transformation(freq, config)
    transformed_data = transformation.apply(data, is_train=True)
    if cache_data:
        transformed_data = Cached(transformed_data)

    # 我们初始化一个训练实例
    instance_splitter = create_instance_splitter(config, "train")

    # 实例分割器将从366个可能的转换时间序列中随机采样一个上下文长度+滞后+预测长度的窗口,并返回一个迭代器。
    stream = Cyclic(transformed_data).stream()
    training_instances = instance_splitter.apply(
        stream, is_train=True
    )
    
    return as_stacked_batches(
        training_instances,
        batch_size=batch_size,
        shuffle_buffer_length=shuffle_buffer_length,
        field_names=TRAINING_INPUT_NAMES,
        output_type=torch.tensor,
        num_batches_per_epoch=num_batches_per_epoch,
    )

def create_test_dataloader(
    config: PretrainedConfig,
    freq,
    data,
    batch_size: int,
    **kwargs,
):
    PREDICTION_INPUT_NAMES = [
        "past_time_features",
        "past_values",
        "past_observed_mask",
        "future_time_features",
    ]
    if config.num_static_categorical_features > 0:
        PREDICTION_INPUT_NAMES.append("static_categorical_features")

    if config.num_static_real_features > 0:
        PREDICTION_INPUT_NAMES.append("static_real_features")

    transformation = create_transformation(freq, config)
    transformed_data = transformation.apply(data, is_train=False)

    # 我们创建一个测试实例分割器,它仅为编码器采样训练期间看到的最后一个上下文窗口。
    instance_sampler = create_instance_splitter(config, "test")

    # 我们以测试模式应用转换
    testing_instances = instance_sampler.apply(transformed_data, is_train=False)
    
    return as_stacked_batches(
        testing_instances,
        batch_size=batch_size,
        output_type=torch.tensor,
        field_names=PREDICTION_INPUT_NAMES,
    )

train_dataloader = create_train_dataloader(
    config=config,
    freq=freq,
    data=train_dataset,
    batch_size=256,
    num_batches_per_epoch=100,
)

test_dataloader = create_test_dataloader(
    config=config,
    freq=freq,
    data=test_dataset,
    batch_size=64,
)

让我们检查第一个批次:

batch = next(iter(train_dataloader))
for k, v in batch.items():
    print(k, v.shape, v.type())

>>> past_time_features torch.Size([256, 85, 2]) torch.FloatTensor
    past_values torch.Size([256, 85]) torch.FloatTensor
    past_observed_mask torch.Size([256, 85]) torch.FloatTensor
    future_time_features torch.Size([256, 24, 2]) torch.FloatTensor
    static_categorical_features torch.Size([256, 1]) torch.LongTensor
    future_values torch.Size([256, 24]) torch.FloatTensor
    future_observed_mask torch.Size([256, 24]) torch.FloatTensor

可以看到,我们没有将input_idsattention_mask传递给编码器(如NLP模型的情况),而是传递了past_values,以及past_observed_maskpast_time_featuresstatic_categorical_features

解码器的输入包括future_valuesfuture_observed_maskfuture_time_features。可以将future_values看作是NLP中的decoder_input_ids的等价物。

我们可以查阅文档以了解每个参数的详细解释。

正向传播

让我们使用刚刚创建的批次进行一次正向传播:

# 执行正向传播
outputs = model(
    past_values=batch["past_values"],
    past_time_features=batch["past_time_features"],
    past_observed_mask=batch["past_observed_mask"],
    static_categorical_features=batch["static_categorical_features"]
    if config.num_static_categorical_features > 0
    else None,
    static_real_features=batch["static_real_features"]
    if config.num_static_real_features > 0
    else None,
    future_values=batch["future_values"],
    future_time_features=batch["future_time_features"],
    future_observed_mask=batch["future_observed_mask"],
    output_hidden_states=True,
)

print("损失:", outputs.loss.item())

>>> 损失: 9.069628715515137

请注意,模型返回了一个损失值。这是因为解码器自动将future_values向右移动一位,以获得标签。这样可以计算预测值与标签之间的损失。

另外,请注意解码器使用了一个因果掩码,以防止查看未来,因为它需要预测的值在future_values张量中。

训练模型

是时候训练模型了!我们将使用标准的PyTorch训练循环。

这里我们将使用🤗 Accelerate库,它会自动将模型、优化器和数据加载器放置在适当的device上。

from accelerate import Accelerator
from torch.optim import AdamW

accelerator = Accelerator()
device = accelerator.device

model.to(device)
optimizer = AdamW(model.parameters(), lr=6e-4, betas=(0.9, 0.95), weight_decay=1e-1)

model, optimizer, train_dataloader = accelerator.prepare(
    model,
    optimizer,
    train_dataloader,
)

model.train()
for epoch in range(40):
    for idx, batch in enumerate(train_dataloader):
        optimizer.zero_grad()
        outputs = model(
            static_categorical_features=batch["static_categorical_features"].to(device)
            if config.num_static_categorical_features > 0
            else None,
            static_real_features=batch["static_real_features"].to(device)
            if config.num_static_real_features > 0
            else None,
            past_time_features=batch["past_time_features"].to(device),
            past_values=batch["past_values"].to(device),
            future_time_features=batch["future_time_features"].to(device),
            future_values=batch["future_values"].to(device),
            past_observed_mask=batch["past_observed_mask"].to(device),
            future_observed_mask=batch["future_observed_mask"].to(device),
        )
        loss = outputs.loss

        # 反向传播
        accelerator.backward(loss)
        optimizer.step()

        if idx % 100 == 0:
            print(loss.item())

推理

在推断时,建议使用generate()方法进行自回归生成,类似于NLP模型。

预测涉及从测试实例采样器获取数据,该采样器将从数据集中的每个时间序列中抽取最后一个context_length大小的窗口值,并将其传递给模型。请注意,我们将已知的future_time_features提前传递给解码器。

模型将自回归地从预测分布中抽取一定数量的值,并将它们传递回解码器以返回预测输出:

model.eval()

forecasts = []

for batch in test_dataloader:
    outputs = model.generate(
        static_categorical_features=batch["static_categorical_features"].to(device)
        if config.num_static_categorical_features > 0
        else None,
        static_real_features=batch["static_real_features"].to(device)
        if config.num_static_real_features > 0
        else None,
        past_time_features=batch["past_time_features"].to(device),
        past_values=batch["past_values"].to(device),
        future_time_features=batch["future_time_features"].to(device),
        past_observed_mask=batch["past_observed_mask"].to(device),
    )
    forecasts.append(outputs.sequences.cpu().numpy())

模型输出的张量形状为(batch_sizenumber of samplesprediction length)。

在这种情况下,我们得到下一个24个月的100个可能值(对于批次中的每个示例,批次大小为64):

forecasts[0].shape

>>> (64, 100, 24)

我们将它们垂直堆叠,以获取测试数据集中所有时间序列的预测:

forecasts = np.vstack(forecasts)
print(forecasts.shape)

>>> (366, 100, 24)

我们可以根据测试集中存在的离样本值来评估生成的预测结果。我们将使用MASE和sMAPE指标计算数据集中每个时间序列的指标:

from evaluate import load
from gluonts.time_feature import get_seasonality

mase_metric = load("evaluate-metric/mase")
smape_metric = load("evaluate-metric/smape")

forecast_median = np.median(forecasts, 1)

mase_metrics = []
smape_metrics = []
for item_id, ts in enumerate(test_dataset):
    training_data = ts["target"][:-prediction_length]
    ground_truth = ts["target"][-prediction_length:]
    mase = mase_metric.compute(
        predictions=forecast_median[item_id], 
        references=np.array(ground_truth), 
        training=np.array(training_data), 
        periodicity=get_seasonality(freq))
    mase_metrics.append(mase["mase"])
    
    smape = smape_metric.compute(
        predictions=forecast_median[item_id], 
        references=np.array(ground_truth), 
    )
    smape_metrics.append(smape["smape"])

print(f"MASE: {np.mean(mase_metrics)}")

>>> MASE: 1.2564196892177717

print(f"sMAPE: {np.mean(smape_metrics)}")

>>> sMAPE: 0.1609541520852549

我们还可以绘制数据集中每个时间序列的各个指标,并观察少数时间序列对最终的测试指标有很大的贡献:

plt.scatter(mase_metrics, smape_metrics, alpha=0.3)
plt.xlabel("MASE")
plt.ylabel("sMAPE")
plt.show()

为了绘制任何时间序列相对于地面实况测试数据的预测结果,我们定义如下辅助函数:

import matplotlib.dates as mdates

def plot(ts_index):
    fig, ax = plt.subplots()

    index = pd.period_range(
        start=test_dataset[ts_index][FieldName.START],
        periods=len(test_dataset[ts_index][FieldName.TARGET]),
        freq=freq,
    ).to_timestamp()

    # Major ticks every half year, minor ticks every month,
    ax.xaxis.set_major_locator(mdates.MonthLocator(bymonth=(1, 7)))
    ax.xaxis.set_minor_locator(mdates.MonthLocator())

    ax.plot(
        index[-2*prediction_length:], 
        test_dataset[ts_index]["target"][-2*prediction_length:],
        label="actual",
    )

    plt.plot(
        index[-prediction_length:], 
        np.median(forecasts[ts_index], axis=0),
        label="median",
    )
    
    plt.fill_between(
        index[-prediction_length:],
        forecasts[ts_index].mean(0) - forecasts[ts_index].std(axis=0), 
        forecasts[ts_index].mean(0) + forecasts[ts_index].std(axis=0), 
        alpha=0.3, 
        interpolate=True,
        label="+/- 1-std",
    )
    plt.legend()
    plt.show()

例如:

plot(334)

我们如何与其他模型进行比较? Monash时间序列仓库有一个测试集MASE指标的比较表,我们可以添加进去:

请注意,我们的模型击败了所有其他报告的模型(也请参阅相应论文中的表2),而且我们没有进行任何超参数调整。我们只是训练了40个epochs的Transformer。

当然,我们需要小心地声称在时间序列上使用神经网络达到了最先进的结果,因为似乎“XGBoost通常就足够了”。我们只是非常好奇神经网络能带给我们多大的进步,以及Transformer在这个领域是否有用。这个特定的数据集似乎表明它绝对值得探索。

下一步

我们鼓励读者尝试使用Hub中的其他时间序列数据集来运行笔记本,并替换相应的频率和预测长度参数。对于您的数据集,您需要将其转换为GluonTS使用的约定,其在他们的文档中有很好的解释。我们还准备了一个示例笔记本,展示了如何将您的数据集转换为🤗 datasets格式。

对于时间序列研究人员来说,将基于Transformer的模型应用于时间序列问题一直受到很多关注。普通Transformer只是许多基于注意力的模型之一,因此需要向库中添加更多模型。

目前没有任何限制我们对多元时间序列进行建模,但是为此需要使用多元分布头来实例化模型。目前仅支持对角线独立分布,将会添加其他多元分布。请期待未来的博客文章,其中将包含一个教程。

路线图上的另一个任务是时间序列分类。这意味着在库中添加一个具有分类头的时间序列模型,例如用于异常检测任务。

当前模型假设时间序列值的存在与日期-时间相关,但这在每个实际时间序列中可能并不是情况。例如,像WOODS这样的神经科学数据集。因此,需要将当前模型进行泛化,使整个流程中的某些输入为可选项。

最后,自然语言处理/计算机视觉领域已经从大型预训练模型中受益匪浅,而就我们所知,时间序列领域并非如此。基于Transformer的模型似乎是在这个研究领域中追求的明显选择,我们迫不及待地想看到研究人员和从业者的成果!