from pathlib import Path
from typing import Literal, Tuple, List, Union
import pandas as pd
from sklearn.model_selection import KFold
from verona.data.download import DEFAULT_PATH
from verona.data.utils import XesFields, read_eventlog
[docs]
def make_temporal_split(dataset: Union[str, pd.DataFrame], dataset_name: str = 'Dataset', store_path: str = None,
test_offset: pd.Timedelta = pd.Timedelta(365, 'D'),
val_offset: pd.Timedelta = None,
timestamp_column: str = XesFields.TIMESTAMP_COLUMN,
case_column: str = XesFields.CASE_COLUMN) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
"""
Split a given dataset following a temporal scheme. Traces starting on a date equal to or greater than the date
of the first trace plus **test_offset** form the test partition. Optionally, traces starting on a date equal to or
greater than the date of the first trace plus **val_offset** but less than the date of the first trace plus
**test_offset** form the validation partition. The remaining traces form the training partition.
Args:
dataset (str | pd.DataFrame): If string, full path to the dataset to be split. Only csv, xes, and xes.gz
datasets are allowed. If Pandas DataFrame, the DataFrame containing the dataset.
dataset_name (str): Name of the dataset.
Default is ``Dataset``.
store_path (str, optional): Path where the splits will be stored. Defaults to the DEFAULT_PATH
test_offset (pd.Timedelta, optional): Time difference with respect to the starting timestamp of the first
trace, from which any trace with the same or a later starting timestamp is added to the test partition.
val_offset (pd.Timedelta, optional): Time difference with respect to the start timestamp of the first trace,
from which any trace with a start timestamp equal to or later, but less than the start timestamp plus
test_offset, is added to the validation partition.
timestamp_column (str, optional): Name of the timestamp column in the original dataset file.
Default is ``XesFields.TIMESTAMP_COLUMN``.
case_column (str, optional): Name of the case identifier in the original dataset file.
Default is ``XesFields.CASE_COLUMN``.
Returns:
Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]: Returns a tuple containing the DataFrames for the train,
validation, and test splits.
Raises:
ValueError: If an invalid value for **test_offset** or **val_offset** is provided.
Examples:
>>> train_df, _, test_df = make_temporal_split('path/to/dataset.csv', test_offset=pd.Timedelta(days=730))
"""
df_log = read_eventlog(dataset, sort_events_in_trace=True, sort_traces=True,
timestamp_column=timestamp_column, case_column=case_column)
start_timestamp = df_log.loc[0, timestamp_column]
val_timestamp = start_timestamp + val_offset if val_offset else None
test_timestamp = start_timestamp + test_offset if test_offset else None
df_groupby = df_log.groupby(case_column)
if val_timestamp and start_timestamp < val_timestamp < test_timestamp:
train_cases = df_groupby.filter(lambda case: case[timestamp_column].iloc[0] < val_timestamp)
val_cases = df_groupby.filter(lambda case: val_timestamp <= case[timestamp_column].iloc[0] < test_timestamp)
test_cases = df_groupby.filter(lambda case: case[timestamp_column].iloc[0] >= test_timestamp)
elif val_timestamp is None and start_timestamp < test_timestamp:
train_cases = df_groupby.filter(lambda case: case[timestamp_column].iloc[0] < test_timestamp)
val_cases = None
test_cases = df_groupby.filter(lambda case: case[timestamp_column].iloc[0] >= test_timestamp)
else:
raise ValueError(f'Wrong offset values: val_offset={val_offset}, test_offset={test_offset}. '
f'Offset values should be positive and the validation offset (if provided) '
f'should be lower than the test offset.')
if not store_path:
store_path = DEFAULT_PATH
train_df = __save_split_to_file(train_cases, store_path, dataset_name, 'train')
if val_offset:
val_df = __save_split_to_file(val_cases, store_path, dataset_name, 'val')
else:
val_df = None
test_df = __save_split_to_file(test_cases, store_path, dataset_name, 'test')
return train_df, val_df, test_df
[docs]
def make_holdout(dataset: Union[str, pd.DataFrame], dataset_name: str = 'Dataset', store_path: str = None,
test_size: float = 0.2, val_from_train: float = 0.2,
case_column: str = XesFields.CASE_COLUMN) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
"""
Split a given dataset following a holdout scheme (train-validation-test).
Args:
dataset (str | pd.DataFrame): If string, full path to the dataset to be split. Only csv, xes, and xes.gz
datasets are allowed. If Pandas DataFrame, the DataFrame containing the dataset.
dataset_name (str): Name of the dataset.
Default is ``Dataset``.
store_path (str, optional): Path where the splits will be stored. Defaults to the DEFAULT_PATH
test_size (float, optional): Float value between 0 and 1 (both excluded), indicating the percentage of traces
reserved for the test partition.
Default is ``0.2``.
val_from_train (float, optional): Float value between 0 and 1 (0 included, 1 excluded), indicating the
percentage of traces reserved for the validation partition within the cases of the training partition.
Default is ``0.2``.
case_column (str, optional): Name of the case identifier in the original dataset file.
Default is ``XesFields.CASE_COLUMN``.
Note:
The default values for **test_size** and **val_from_train** are based on the experimental setup from the first
version of [1].
[1] Rama-Maneiro, E., Vidal, J. C., & Lama, M. (2021). Deep Learning for Predictive Business Process Monitoring:
Review and Benchmark. https://arxiv.org/abs/2009.13251v1.
Returns:
Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]: Returns a tuple containing the DataFrames for the train,
validation, and test splits.
Raises:
ValueError: If an invalid value for test_size or val_from_train is provided.
Examples:
>>> train_df, val_df, test_df = make_holdout('path/to/dataset.csv', test_size=0.3, val_from_train=0.1)
"""
df_log = read_eventlog(dataset)
df_groupby = df_log.groupby(case_column)
cases = [case for _, case in df_groupby]
if (0 < val_from_train < 1) and (0 < test_size < 1):
first_cut = round(len(cases) * (1 - test_size) * (1 - val_from_train))
second_cut = round(len(cases) * (1 - test_size))
train_cases = cases[:first_cut]
val_cases = cases[first_cut:second_cut]
test_cases = cases[second_cut:]
elif val_from_train == 0 and (0 < test_size < 1):
unique_cut = round(len(cases) * (1 - test_size))
train_cases = cases[:unique_cut]
val_cases = None
test_cases = cases[unique_cut]
else:
raise ValueError(f'Wrong split percentages: val_from_train={val_from_train}, test_size={test_size}. '
f'val_from_train should be a number between 0 and 1 (0 included, 1 excluded) and '
f'test_size should be a number between 0 and 1 (both excluded).')
if not store_path:
store_path = DEFAULT_PATH
train_df = __save_split_to_file(train_cases, store_path, dataset_name, 'train')
if val_from_train != 0:
val_df = __save_split_to_file(val_cases, store_path, dataset_name, 'val')
else:
val_df = None
test_df = __save_split_to_file(test_cases, store_path, dataset_name, 'test')
return train_df, val_df, test_df
[docs]
def make_crossvalidation(dataset: Union[str, pd.DataFrame], dataset_name: str = 'Dataset', store_path: str = None,
cv_folds: int = 5, val_from_train: float = 0.2, case_column: str = XesFields.CASE_COLUMN,
seed: int = 42) -> Tuple[List[pd.DataFrame], List[pd.DataFrame], List[pd.DataFrame]]:
"""
Split a given dataset following a cross-validation scheme.
Args:
dataset (str | pd.DataFrame): If string, full path to the dataset to be split. Only csv, xes, and xes.gz
datasets are allowed. If Pandas DataFrame, the DataFrame containing the dataset.
dataset_name (str): Name of the dataset.
Default is ``Dataset``.
store_path (str, optional): Path where the splits will be stored. Defaults to the current working directory.
cv_folds (int, optional): Number of folds for the cross-validation split. Default is ``5``.
val_from_train (float, optional): Float value between 0 and 1 (0 included, 1 excluded), indicating the
percentage of traces reserved for the validation partition within the cases of the training partition.
Default is ``0.2``.
case_column (str, optional): Name of the case identifier in the original dataset file.
Default is ``XesFields.CASE_COLUMN``.
seed (int, optional): Set a seed for reproducibility.
Default is ``42``.
Returns:
Tuple[List[pd.DataFrame], List[pd.DataFrame], List[pd.DataFrame]]: Returns a tuple containing the lists of
DataFrames for the train, validation, and test splits.
Tip:
Leaving the default values for **cv_folds**, **val_from_train** and **seed** reproduces the expermiental
setup of [1].
[1] Rama-Maneiro, E., Vidal, J. C., & Lama, M. (2023). Deep Learning for Predictive Business Process Monitoring:
Review and Benchmark. IEEE Transactions on Services Computing, 16(1), 739-756. doi:10.1109/TSC.2021.3139807
Raises:
ValueError: If an invalid value for cv_folds or val_from_train is provided.
Examples:
>>> splits_paths = make_crossvalidation('path/to/dataset.csv')
"""
df_log = read_eventlog(dataset)
unique_case_ids = list(df_log[case_column].unique())
kfold = KFold(n_splits=cv_folds, random_state=seed, shuffle=True)
indexes = sorted(unique_case_ids)
splits = kfold.split(indexes)
train_folds = []
val_folds = []
test_folds = []
fold = 0
for train_index, test_index in splits:
if (0 < val_from_train < 1):
val_cut = round(len(train_index) * (1 - val_from_train))
val_index = train_index[val_cut:]
train_index = train_index[:val_cut]
train_cases = [df_log[df_log[case_column] == indexes[train_g]] for train_g in train_index]
val_cases = [df_log[df_log[case_column] == indexes[val_g]] for val_g in val_index]
test_cases = [df_log[df_log[case_column] == indexes[test_g]] for test_g in test_index]
elif val_from_train == 0:
train_cases = [df_log[df_log[case_column] == train_g] for train_g in train_index]
val_cases = None
test_cases = [df_log[df_log[case_column] == test_g] for test_g in test_index]
else:
raise ValueError(f'Wrong split percentage: val_from_train={val_from_train}. '
f'val_from_train should be a number between 0 and 1 (0 included, 1 excluded).')
train_path = __save_split_to_file(train_cases, store_path, dataset_name, 'train', fold)
train_folds.append(train_path)
if val_from_train != 0:
val_path = __save_split_to_file(val_cases, store_path, dataset_name, 'val', fold)
val_folds.append(val_path)
test_path = __save_split_to_file(test_cases, store_path, dataset_name, 'test', fold)
test_folds.append(test_path)
fold += 1
return train_folds, val_folds, test_folds
def __save_split_to_file(cases: Union[list, pd.DataFrame], store_path: str, dataset_name: str,
split: Literal['train', 'val', 'test'], fold: int = None) -> pd.DataFrame:
if type(cases) is list:
df_split = pd.concat(cases)
else:
df_split = cases
if fold is not None:
filename = f'fold{int(fold)}_{split}_{dataset_name}'
else:
filename = f'{split}_{dataset_name}'
Path(store_path).mkdir(parents=True, exist_ok=True)
full_path = store_path + filename + '.csv'
df_split.to_csv(full_path, index=False)
return df_split