前言
《Chronos: Learning the Language of Time Series》原文地址,Github开源代码地址 Chronos:学习时间序列的大语言模型(论文解读)CSDN地址 Chronos:学习时间序列的大语言模型(代码解析)CSDN地址 GitHub项目地址Some-Paper-CN。本项目是译者在学习长时间序列预测、CV、NLP和机器学习过程中精读的一些论文,并对其进行了中文翻译 。还有部分最佳示例教程 。 如果有帮助到大家,请帮忙点亮Star ,也是对译者莫大的鼓励,谢谢啦~ 本文代码已同步至项目Some-Paper-CN 本来准备自己写一个Lora
微调的教程,结果官方先放了预训练(微调)代码,那就先看看官方写的微调代码是怎样的吧~
数据转换
将开源代码从Github上下载到本地,关键文件在chronos-forecasting/scripts/training
下,train.py文件。 在chronos-forecasting/scripts/README.md
文件中给出了数据组织方式,还有论文中提到的数据合成方法KernelSynth
。
from pathlib import Path
from typing import List, Optional, Union
import numpy as np
from gluonts. dataset. arrow import ArrowWriter
def convert_to_arrow (
path: Union[ str , Path] ,
time_series: Union[ List[ np. ndarray] , np. ndarray] ,
start_times: Optional[ Union[ List[ np. datetime64] , np. ndarray] ] = None ,
compression: str = "lz4" ,
) :
if start_times is None :
start_times = [ np. datetime64( "2000-01-01 00:00" , "s" ) ] * len ( time_series)
assert len ( time_series) == len ( start_times)
dataset = [
{ "start" : start, "target" : ts} for ts, start in zip ( time_series, start_times)
]
ArrowWriter( compression= compression) . write_to_file(
dataset,
path= path,
)
if __name__ == "__main__" :
time_series = [ np. random. randn( 1024 ) for i in range ( 20 ) ]
convert_to_arrow( "./noise-data.arrow" , time_series= time_series)
在进行下面的步骤之前,需要先将数据转换为GluonTS
的arrow
这一点非常重要 !
训练、微调代码解析
打开chronos-forecasting/scripts/training
文件夹下的train.py
文件,我们先看主函数main()
主函数上的装饰器@app.command()
是库typer
提供的,其主要作用是构建CLI 程序。Github地址,官方文档,对代码理解没什么太大影响,感兴趣的可以去看看特性。 另外一个装饰器@use_yaml_config()
是typer-config
,Github地址提供的,主要作用是读取配置文件,官方文档,建议大家去看看,使用方法,有助于理解代码。
@app. command ( )
@use_yaml_config ( param_name= "config" )
def main (
training_data_paths: str ,
probability: Optional[ str ] = None ,
context_length: int = 512 ,
prediction_length: int = 64 ,
min_past: int = 64 ,
max_steps: int = 200_000 ,
save_steps: int = 50_000 ,
log_steps: int = 500 ,
per_device_train_batch_size: int = 32 ,
learning_rate: float = 1e-3 ,
optim: str = "adamw_torch_fused" ,
shuffle_buffer_length: int = 100 ,
gradient_accumulation_steps: int = 2 ,
model_id: str = "google/t5-efficient-tiny" ,
model_type: str = "seq2seq" ,
random_init: bool = False ,
tie_embeddings: bool = False ,
output_dir: Path = Path( "./output/" ) ,
tf32: bool = True ,
torch_compile: bool = True ,
tokenizer_class: str = "MeanScaleUniformBins" ,
tokenizer_kwargs: str = "{'low_limit': -15.0, 'high_limit': 15.0}" ,
n_tokens: int = 4096 ,
n_special_tokens: int = 2 ,
pad_token_id: int = 0 ,
eos_token_id: int = 1 ,
use_eos_token: bool = True ,
lr_scheduler_type: str = "linear" ,
warmup_ratio: float = 0.0 ,
dataloader_num_workers: int = 1 ,
max_missing_prop: float = 0.9 ,
num_samples: int = 20 ,
temperature: float = 1.0 ,
top_k: int = 50 ,
top_p: float = 1.0 ,
seed: Optional[ int ] = None ,
) :
training_data_paths = ast. literal_eval( training_data_paths)
assert isinstance ( training_data_paths, list )
if isinstance ( probability, str ) :
probability = ast. literal_eval( probability)
elif probability is None :
probability = [ 1.0 / len ( training_data_paths) ] * len ( training_data_paths)
assert isinstance ( probability, list )
if isinstance ( tokenizer_kwargs, str ) :
tokenizer_kwargs = ast. literal_eval( tokenizer_kwargs)
assert isinstance ( tokenizer_kwargs, dict )
assert model_type in [ "seq2seq" , "causal" ]
if not model_type == "seq2seq" :
raise NotImplementedError( "Only seq2seq models are currently supported" )
if seed is None :
seed = random. randint( 0 , 2 ** 32 )
log_on_main( f"Using SEED: { seed} " , logger)
transformers. set_seed( seed= seed)
output_dir = get_next_path( "run" , base_dir= output_dir, file_type= "" )
log_on_main( f"Logging dir: { output_dir} " , logger)
log_on_main(
f"Loading and filtering { len ( training_data_paths) } datasets "
f"for training: { training_data_paths} " ,
logger,
)
log_on_main(
f"Mixing probabilities: { probability} " ,
logger,
)
train_datasets = [
Filter(
partial(
has_enough_observations,
min_length= min_past + prediction_length,
max_missing_prop= max_missing_prop,
) ,
FileDataset( path= Path( data_path) , freq= "h" ) ,
)
for data_path in training_data_paths
]
log_on_main( "Initializing model" , logger)
model = load_model(
model_id= model_id,
model_type= model_type,
vocab_size= n_tokens,
random_init= random_init,
tie_embeddings= tie_embeddings,
pad_token_id= pad_token_id,
eos_token_id= eos_token_id,
)
chronos_config = ChronosConfig(
tokenizer_class= tokenizer_class,
tokenizer_kwargs= tokenizer_kwargs,
n_tokens= n_tokens,
n_special_tokens= n_special_tokens,
pad_token_id= pad_token_id,
eos_token_id= eos_token_id,
use_eos_token= use_eos_token,
model_type= model_type,
context_length= context_length,
prediction_length= prediction_length,
num_samples= num_samples,
temperature= temperature,
top_k= top_k,
top_p= top_p,
)
model. config. chronos_config = chronos_config. __dict__
shuffled_train_dataset = ChronosDataset(
datasets= train_datasets,
probabilities= probability,
tokenizer= chronos_config. create_tokenizer( ) ,
context_length= context_length,
prediction_length= prediction_length,
min_past= min_past,
mode= "training" ,
) . shuffle( shuffle_buffer_length= shuffle_buffer_length)
training_args = TrainingArguments(
output_dir= str ( output_dir) ,
per_device_train_batch_size= per_device_train_batch_size,
learning_rate= learning_rate,
lr_scheduler_type= lr_scheduler_type,
warmup_ratio= warmup_ratio,
optim= optim,
logging_dir= str ( output_dir / "logs" ) ,
logging_strategy= "steps" ,
logging_steps= log_steps,
save_strategy= "steps" ,
save_steps= save_steps,
report_to= [ "tensorboard" ] ,
max_steps= max_steps,
gradient_accumulation_steps= gradient_accumulation_steps,
dataloader_num_workers= dataloader_num_workers,
tf32= tf32,
torch_compile= torch_compile,
ddp_find_unused_parameters= False ,
remove_unused_columns= False ,
)
trainer = Trainer(
model= model,
args= training_args,
train_dataset= shuffled_train_dataset,
)
log_on_main( "Training" , logger)
trainer. train( )
if is_main_process( ) :
model. save_pretrained( output_dir / "checkpoint-final" )
上面的main
函数有一个函数has_enough_observations
,用于检查时序长度是否满足要求,逐行代码解析如下
def has_enough_observations (
entry: dict , min_length: int = 0 , max_missing_prop: float = 1.0
) - > bool :
def has_enough_observations (
entry: dict , min_length: int = 0 , max_missing_prop: float = 1.0
) - > bool :
"""
检查数据中的 ``"target"`` 是否满足最小需求量
参数
----------
entry
需要进行检测的数据
min_length
``"target"`` 的最小长度
max_missing_prop
``"target"``的最大缺失率
"""
if (
len ( entry[ "target" ] ) >= min_length
and np. isnan( entry[ "target" ] ) . mean( ) <= max_missing_prop
) :
return True
return False
if (
len ( entry[ "target" ] ) >= min_length
and np. isnan( entry[ "target" ] ) . mean( ) <= max_missing_prop
) :
return True
return False
上面的main
函数有一个很重要的类ChronosDataset
用于组织数据,逐行代码解析如下
class ChronosDataset ( IterableDataset, ShuffleMixin) :
"""
数据包装器, 使用 ``ChronosTokenizer`` 将时间序列数据变为HuggingFace的格式`` input_ids ``,
``attention_mask``和``labels``。
假定原始数据集中有键 ``"start"`` (值的类型为``pd.Period``), 键 ``"target"`` (值的类型为``
np.ndarray``).
参数
----------
datasets
包含原始时间序列数据的数据集。
probabilities
在训练模式下,将按照概率列表从每个原始数据集中抽取数据。
tokenizer
用于将数值序列转换为 ``token ID`` 。
context_length
输入 ``token ID`` 的长度。
prediction_length
预测长度
drop_prob
样本中的观测值将按此概率变成 ``np.nan``,即缺失值。
min_past
只有至少有 ``min_past`` 个历史观测数据时,才会考虑样本。
mode
模式为``"training"``, ``"validation"``或 ``"test"``中的一个。
np_dtype
Numpy浮点数据类型。
"""
def __init__ (
self,
datasets: list ,
probabilities: List[ float ] ,
tokenizer: ChronosTokenizer,
context_length: int = 512 ,
prediction_length: int = 64 ,
drop_prob: float = 0.2 ,
min_past: Optional[ int ] = None ,
mode: str = "training" ,
np_dtype= np. float32,
) - > None :
super ( ) . __init__( )
assert len ( probabilities) == len ( datasets)
assert mode in ( "training" , "validation" , "test" )
self. datasets = datasets
self. probabilities = probabilities
self. tokenizer = tokenizer
self. context_length = context_length
self. prediction_length = prediction_length
self. drop_prob = drop_prob
self. min_past = min_past or prediction_length
self. mode = mode
self. np_dtype = np_dtype
def preprocess_entry ( self, entry: dict , mode: str ) - > dict :
entry = { f: entry[ f] for f in [ "start" , "target" ] }
entry[ "target" ] = np. asarray( entry[ "target" ] , dtype= self. np_dtype)
assert entry[ "target" ] . ndim == 1 , f"got { entry[ 'target' ] . ndim= } , expected 1"
if mode == "training" and self. drop_prob > 0 :
target = entry[ "target" ] . copy( )
drop_p = np. random. uniform( low= 0.0 , high= self. drop_prob)
mask = np. random. choice(
[ True , False ] , size= len ( target) , p= [ drop_p, 1 - drop_p]
)
target[ mask] = np. nan
entry[ "target" ] = target
return entry
def _create_instance_splitter ( self, mode: str ) :
assert mode in [ "training" , "test" , "validation" ]
instance_sampler = {
"training" : ExpectedNumInstanceSampler(
num_instances= 1.0 ,
min_instances= 1 ,
min_past= self. min_past,
min_future= self. prediction_length,
) ,
"test" : TestSplitSampler( ) ,
"validation" : ValidationSplitSampler( min_future= self. prediction_length) ,
} [ mode]
return InstanceSplitter(
target_field= "target" ,
is_pad_field= "is_pad" ,
start_field= "start" ,
forecast_start_field= "forecast_start" ,
instance_sampler= instance_sampler,
past_length= self. context_length,
future_length= self. prediction_length,
dummy_value= np. nan,
)
def create_training_data ( self, data) :
data = Cyclic( data)
split_transform = self. _create_instance_splitter(
"training"
) + FilterTransformation(
condition= lambda entry: ( ~ np. isnan( entry[ "past_target" ] ) ) . sum ( ) > 0
)
data = split_transform. apply ( data, is_train= True )
return data
def create_test_data ( self, data) :
data = self. _create_instance_splitter( "test" ) . apply ( data, is_train= False )
return data
def create_validation_data ( self, data) :
data = self. _create_instance_splitter( "validation" ) . apply ( data, is_train= False )
return data
def to_hf_format ( self, entry: dict ) - > dict :
past_target = torch. tensor( entry[ "past_target" ] ) . unsqueeze( 0 )
input_ids, attention_mask, scale = self. tokenizer. input_transform( past_target)
future_target = torch. tensor( entry[ "future_target" ] ) . unsqueeze( 0 )
labels, labels_mask, _ = self. tokenizer. input_transform( future_target, scale)
labels[ labels_mask == 0 ] = - 100
return {
"input_ids" : input_ids. squeeze( 0 ) ,
"attention_mask" : attention_mask. squeeze( 0 ) ,
"labels" : labels. squeeze( 0 ) ,
}
def __iter__ ( self) - > Iterator:
preprocessed_datasets = [
Map(
partial( self. preprocess_entry, mode= self. mode) ,
dataset,
)
for dataset in self. datasets
]
if self. mode == "training" :
iterables = [
self. create_training_data( dataset) for dataset in preprocessed_datasets
]
elif self. mode == "test" :
iterables = [
self. create_test_data( dataset) for dataset in preprocessed_datasets
]
else :
iterables = [
self. create_validation_data( dataset)
for dataset in preprocessed_datasets
]
worker_info = get_worker_info( )
if worker_info is None :
probs = list ( self. probabilities)
else :
worker_id = worker_info. id
num_workers = worker_info. num_workers
iterables = list ( itertools. islice( iterables, worker_id, None , num_workers) )
probs = list (
itertools. islice( self. probabilities, worker_id, None , num_workers)
)
probs = [ prob / sum ( probs) for prob in probs]
iterators = list ( map ( iter , iterables) )
if self. mode == "training" :
while True :
idx = np. random. choice( range ( len ( iterators) ) , p= probs)
try :
yield self. to_hf_format( next ( iterators[ idx] ) )
except StopIteration:
probs[ idx] = 0
if sum ( probs) == 0 :
return
probs = [ prob / sum ( probs) for prob in probs]
else :
for entry in itertools. chain( * iterators) :
yield self. to_hf_format( entry)
在ChronosDataset
类中有大量的gluonts
中的方法,gluonts
的Github地址,这些方法可以在官方文档中搜索 train.py
文件最重要的几部分就是main
函数和ChronosDataset
类,要多理解逐行看,实际上是很简单的,比前面的算法代码解析要简单一些,就是gluonts
抽象化的东西太多了,不是很好理解。其实我觉得这个代码封装程度太高了,很多东西不是那么好理解,并且在gluonts
的官方文档中,方法的确搜的到,但是很多没有注释,只能去看源代码,希望后面gluonts
能完善API
文档的说明吧。
配置文件
训练的参数需要使用配置文件来设定,配置文件的目录在scripts/training/configs
,第一次尝试跑通代码可以从预训练chronos-t5-tiny
模型开始,模型体量小,计算要求不高,需要注意的是chronos
的官方代码是不支持CPU微调的,必须要使用GPU chronos-t5-tiny.yaml
文件及参数说明如下
training_data_paths :
- "/home/ubuntu/tsmixup-data.arrow"
- "/home/ubuntu/kernelsynth-data.arrow"
probability :
- 0.9
- 0.1
context_length : 512
prediction_length : 64
min_past : 60
max_steps : 200_000
save_steps : 100_000
log_steps : 500
per_device_train_batch_size : 32
learning_rate : 0.001
optim : adamw_torch_fused
num_samples : 20
shuffle_buffer_length : 100_000
gradient_accumulation_steps : 1
model_id : google/t5- efficient- tiny
model_type : seq2seq
random_init : true
tie_embeddings : true
output_dir : ./output/
tf32 : true
torch_compile : true
tokenizer_class : "MeanScaleUniformBins"
tokenizer_kwargs :
low_limit : -15.0
high_limit : 15.0
n_tokens : 4096
lr_scheduler_type : linear
warmup_ratio : 0.0
dataloader_num_workers : 1
max_missing_prop : 0.9
use_eos_token : true
如果想要微调已经训练好的模型,可以将model_id
改为amazon/chronos-t5-small
,关闭no-random-init
,调整learning_rate
和step
参数。