verona.data package

verona.data.download module

verona.data.download.get_available_datasets()[source]

Display the list of available datasets from the official repository and return their identifiers.

This function prints out the list of available datasets along with their names, as defined in the DATASETS_LIST dictionary.

Returns:

List of available dataset identifiers.

Return type:

list

Examples

>>> available_datasets = get_available_datasets()
verona.data.download.get_dataset(dataset_name: str, store_path: str = None, extension: Literal['xes', 'csv', 'both'] = 'xes') Tuple[str, DataFrame][source]

Download a specified dataset from the official repository and store it in a designated path.

This function downloads the dataset in either ‘xes.gz’ or ‘csv’ format, based on the ‘extension’ argument.

Parameters:
  • dataset_name (str) – Identifier of the dataset to download.

  • store_path (Optional[str], optional) – The directory path where the dataset will be stored. If not specified, the dataset will be stored in the folder ~/.verona_datasets/.

  • extension (Literal['xes', 'csv', 'both'], optional) – The format in which to save the dataset. Choose from ‘xes’ for ‘xes.gz’ format, ‘csv’ for ‘csv’ format, or ‘both’ to download both formats. Default is xes.

Returns:

A string indicating the full path where the dataset is stored and a

Pandas DataFrame with the dataset.

Return type:

Tuple[str, pd.DataFrame]

Examples

>>> dataset_path, df_dataset = get_dataset('bpi2012a', store_path=None, extension='csv')

verona.data.extractor module

verona.data.extractor.get_prefixes_and_targets(dataset: ~pandas.core.frame.DataFrame, prediction_task: ~typing.Literal['next_activity', 'activity_suffix', 'next_timestamp', 'remaining_time', 'next_attribute', 'attribute_suffix'], prefix_size: int = None, case_id: str = 'CaseID', activity_id: str = None, timestamp_id: str = None, attribute_id: str = None) -> (dict[slice(<class 'int'>, <class 'pandas.core.frame.DataFrame'>, None)], dict[slice(<class 'int'>, <built-in function array>, None)])[source]

Extract prefixes and corresponding targets from a given dataset based on the prediction task.

The function extracts prefixes of the specified or all possible sizes from the dataset, and returns targets corresponding to the selected prediction task.

Parameters:
  • dataset (pd.DataFrame) – DataFrame containing the event log.

  • prediction_task (Literal['next_activity', 'activity_suffix', 'next_timestamp', 'remaining_time', 'next_attribute', 'attribute_suffix']) –

    Specifies the type of prediction task.

    • 'next_activity': Predict the next activity.

    • 'activity_suffix': Predict the remaining sequence of activities.

    • 'next_timestamp': Predict the next event timestamp.

    • 'remaining_time': Predict the remaining time for the case to complete.

    • 'next_attribute': Predict the next attribute.

    • 'attribute_suffix': Predict the remaining sequence of attributes.

  • prefix_size (int, optional) – Length of the prefix to be used. If None, uses all possible sizes.

  • case_id (str, optional) – Column name for the case identifier. Default is DataFrameFields.CASE_COLUMN.

  • activity_id (str, optional) – Column name for the activity. Needed for ‘next_activity’ and ‘activity_suffix’.

  • timestamp_id (str, optional) – Column name for the timestamp. Needed for ‘next_timestamp’ and ‘remaining_time’.

  • attribute_id (str, optional) – Column name for the attribute. Needed for ‘next_attribute’ and ‘attribute_suffix’.

Tip

Leaving the default values for prefix_size reproduces the expermiental setup of [1].

[1] Rama-Maneiro, E., Vidal, J. C., & Lama, M. (2023). Deep Learning for Predictive Business Process Monitoring: Review and Benchmark. IEEE Transactions on Services Computing, 16(1), 739-756. doi:10.1109/TSC.2021.3139807

Returns:

Returns two dictionaries:
  1. Mapping from prefix size to the DataFrame of prefixes.

  2. Mapping from prefix size to the corresponding targets in NumPy array format.

Return type:

Tuple[Dict[int, pd.DataFrame], Dict[int, np.array]]

Raises:

ValueError – If the required column for a prediction task is not specified.

Examples

>>> prefixes, targets = get_prefixes_and_targets(df_dataset, 'next_activity', prefix_size=5)

verona.data.results module

class verona.data.results.AvailableMetrics[source]

Bases: object

Container class that holds available metrics for various predictive tasks in process mining.

This class defines different metric types that can be calculated for different predictive tasks in process mining such as predicting the next activity, activity suffix, next timestamp, and remaining time.

class ActivitySuffix[source]

Bases: object

Metrics available for the task of predicting the suffix (sequence of remaining activities) in a process instance.

DAMERAU_LEVENSHTEIN = MetricValue(value='damerau_levenshtein', parent='suffix')
class NextActivity[source]

Bases: object

Metrics available for the task of predicting the next activity in a process instance.

ACCURACY = MetricValue(value='accuracy', parent='next_activity')
BRIER_SCORE = MetricValue(value='brier_score', parent='next_activity')
F1 = MetricValue(value='f1', parent='next_activity')
MCC = MetricValue(value='mcc', parent='next_activity')
PRECISION = MetricValue(value='precision', parent='next_activity')
RECALL = MetricValue(value='recall', parent='next_activity')
class NextTimestamp[source]

Bases: object

Metrics available for the task of predicting the next timestamp in a process instance.

MAE = MetricValue(value='mae', parent='next_timestamp')
class RemainingTime[source]

Bases: object

Metrics available for the task of predicting the remaining time for completion of a process instance.

MAE = MetricValue(value='mae', parent='remaining_time')
class verona.data.results.MetricValue(value: str, parent: str)[source]

Bases: object

parent: str
value: str
class verona.data.results.MissingResultStrategy(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Bases: Enum

Enum for specifying the strategy to use for handling missing data (NaNs) in the dataset when applying Bayesian models.

This enum provides options for how to deal with missing data (NaN values) in the dataset when preparing data for Bayesian models. Options include deleting the entire dataset associated with the missing data, deleting only the approach (algorithm/method) associated with the missing data, or taking no action.

DELETE_APPROACH = 'delete_approach'
DELETE_DATASET = 'delete_dataset'
NONE = 'none'
verona.data.results.load_results_hierarchical(approach_1: str = 'Tax', approach_2: str = 'TACO', metric: MetricValue = MetricValue(value='accuracy', parent='next_activity'), even_strategy: MissingResultStrategy = MissingResultStrategy.DELETE_DATASET)[source]

Load and preprocess the results of two approaches for comparison using a hierarchical test.

This function fetches the raw results from CSV files based on the selected metric, filters the data for the two approaches specified, and handles missing data according to the provided even_strategy.

Parameters:
  • approach_1 (str, optional) – The name of the first approach for which to load results. Default is 'Tax'.

  • approach_2 (str, optional) – The name of the second approach for which to load results. Default is 'TACO'.

  • metric (MetricValue, optional) – An enum specifying the metric on which the approaches should be compared. Default is AvailableMetrics.NextActivity.ACCURACY.

  • even_strategy (MissingResultStrategy, optional) – Enum specifying the strategy to apply for handling missing data. Default is MissingResultStrategy.DELETE_DATASET.

Note

For the metrics “next_activity” and “suffix”, the values are multiplied by 100 so that they are consistent

with the default rope values.

Returns:

Two DataFrames containing the preprocessed results of the two approaches, and a list of common dataset names (indices).

Return type:

pd.DataFrame, pd.DataFrame, List[str]

Raises:
  • ValueError – If an unsupported metric or even_strategy is passed.

  • AssertionError – If the specified approaches are not available in the data.

Examples

>>> approach_1_df, approach_2_df, common_datasets = load_results_hierarchical("Tax", "TACO", metric=AvailableMetrics.NextActivity.ACCURACY, even_strategy=EvenStrategy.DELETE_DATASET)
>>> print(approach_1_df.head())
>>> print(approach_2_df.head())
>>> print(common_datasets)
verona.data.results.load_results_non_hierarchical(approach_1: str = 'Tax', approach_2: str = 'TACO', metric: MetricValue = MetricValue(value='accuracy', parent='next_activity'), even_strategy: MissingResultStrategy = MissingResultStrategy.DELETE_DATASET)[source]

Load and preprocess results for non-hierarchical statistical comparison of two approaches.

This function initially loads results for all available approaches using the load_results_plackett_luce function. It then filters the results to include only the specified approach_1 and approach_2, and applies an evenizing strategy to handle any missing values.

Parameters:
  • approach_1 (str, optional) – The name of the first approach to compare. Default is 'Tax'.

  • approach_2 (str, optional) – The name of the second approach to compare. Default is 'TACO'.

  • metric (AvailableMetrics, optional) – The metric to consider for loading results. Default is AvailableMetrics.NextActivity.ACCURACY.

  • even_strategy (MissingResultStrategy, optional) – Strategy to apply when missing values are encountered. Determines whether rows (datasets) or columns (approaches) should be dropped. Default is MissingResultStrategy.DELETE_DATASET.

Returns:

A NumPy array containing the filtered results for approach_1. np.ndarray: A NumPy array containing the filtered results for approach_2.

Return type:

np.ndarray

Examples

>>> results_tax, results_taco = load_results_non_hierarchical("Tax", "TACO", AvailableMetrics.NextActivity.ACCURACY, MissingResultStrategy.DELETE_DATASET)
verona.data.results.load_results_plackett_luce(metric: MetricValue = MetricValue(value='accuracy', parent='next_activity'), even_strategy: MissingResultStrategy = MissingResultStrategy.DELETE_DATASET)[source]

Load and preprocess the results for applying the Plackett-Luce model.

This function loads a CSV file containing the raw results based on the given metric. It then computes the mean result for each pair of (approach, dataset), and finally applies an evenizing strategy to handle missing data, if any.

Parameters:
  • metric (AvailableMetrics, optional) – The metric for which results should be loaded. Default is AvailableMetrics.NextActivity.ACCURACY.

  • even_strategy (MissingResultStrategy, optional) – Strategy to apply when missing values are encountered. Determines whether rows (datasets) or columns (approaches) should be dropped. Default is MissingResultStrategy.DELETE_DATASET.

Returns:

A DataFrame containing the mean results, where each row represents a dataset and each column an

approach.

list: A list of approach names.

Return type:

pd.DataFrame

Examples

>>> mean_results, approaches = load_results_plackett_luce(AvailableMetrics.NextActivity.ACCURACY, MissingResultStrategy.DELETE_DATASET)

verona.data.split module

verona.data.split.make_crossvalidation(dataset: str | DataFrame, dataset_name: str = 'Dataset', store_path: str = None, cv_folds: int = 5, val_from_train: float = 0.2, case_column: str = 'case:concept:name', seed: int = 42) Tuple[List[DataFrame], List[DataFrame], List[DataFrame]][source]

Split a given dataset following a cross-validation scheme.

Parameters:
  • dataset (str | pd.DataFrame) – If string, full path to the dataset to be split. Only csv, xes, and xes.gz datasets are allowed. If Pandas DataFrame, the DataFrame containing the dataset.

  • dataset_name (str) – Name of the dataset. Default is Dataset.

  • store_path (str, optional) – Path where the splits will be stored. Defaults to the current working directory.

  • cv_folds (int, optional) – Number of folds for the cross-validation split. Default is 5.

  • val_from_train (float, optional) – Float value between 0 and 1 (0 included, 1 excluded), indicating the percentage of traces reserved for the validation partition within the cases of the training partition. Default is 0.2.

  • case_column (str, optional) – Name of the case identifier in the original dataset file. Default is XesFields.CASE_COLUMN.

  • seed (int, optional) – Set a seed for reproducibility. Default is 42.

Returns:

Returns a tuple containing the lists of DataFrames for the train, validation, and test splits.

Return type:

Tuple[List[pd.DataFrame], List[pd.DataFrame], List[pd.DataFrame]]

Tip

Leaving the default values for cv_folds, val_from_train and seed reproduces the expermiental setup of [1].

[1] Rama-Maneiro, E., Vidal, J. C., & Lama, M. (2023). Deep Learning for Predictive Business Process Monitoring: Review and Benchmark. IEEE Transactions on Services Computing, 16(1), 739-756. doi:10.1109/TSC.2021.3139807

Raises:

ValueError – If an invalid value for cv_folds or val_from_train is provided.

Examples

>>> splits_paths = make_crossvalidation('path/to/dataset.csv')
verona.data.split.make_holdout(dataset: str | DataFrame, dataset_name: str = 'Dataset', store_path: str = None, test_size: float = 0.2, val_from_train: float = 0.2, case_column: str = 'case:concept:name') Tuple[DataFrame, DataFrame, DataFrame][source]

Split a given dataset following a holdout scheme (train-validation-test).

Parameters:
  • dataset (str | pd.DataFrame) – If string, full path to the dataset to be split. Only csv, xes, and xes.gz datasets are allowed. If Pandas DataFrame, the DataFrame containing the dataset.

  • dataset_name (str) – Name of the dataset. Default is Dataset.

  • store_path (str, optional) – Path where the splits will be stored. Defaults to the DEFAULT_PATH

  • test_size (float, optional) – Float value between 0 and 1 (both excluded), indicating the percentage of traces reserved for the test partition. Default is 0.2.

  • val_from_train (float, optional) – Float value between 0 and 1 (0 included, 1 excluded), indicating the percentage of traces reserved for the validation partition within the cases of the training partition. Default is 0.2.

  • case_column (str, optional) – Name of the case identifier in the original dataset file. Default is XesFields.CASE_COLUMN.

Note

The default values for test_size and val_from_train are based on the experimental setup from the first version of [1].

[1] Rama-Maneiro, E., Vidal, J. C., & Lama, M. (2021). Deep Learning for Predictive Business Process Monitoring: Review and Benchmark. https://arxiv.org/abs/2009.13251v1.

Returns:

Returns a tuple containing the DataFrames for the train, validation, and test splits.

Return type:

Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]

Raises:

ValueError – If an invalid value for test_size or val_from_train is provided.

Examples

>>> train_df, val_df, test_df = make_holdout('path/to/dataset.csv', test_size=0.3, val_from_train=0.1)
verona.data.split.make_temporal_split(dataset: str | DataFrame, dataset_name: str = 'Dataset', store_path: str = None, test_offset: Timedelta = Timedelta('365 days 00:00:00'), val_offset: Timedelta = None, timestamp_column: str = 'time:timestamp', case_column: str = 'case:concept:name') Tuple[DataFrame, DataFrame, DataFrame][source]

Split a given dataset following a temporal scheme. Traces starting on a date equal to or greater than the date of the first trace plus test_offset form the test partition. Optionally, traces starting on a date equal to or greater than the date of the first trace plus val_offset but less than the date of the first trace plus test_offset form the validation partition. The remaining traces form the training partition.

Parameters:
  • dataset (str | pd.DataFrame) – If string, full path to the dataset to be split. Only csv, xes, and xes.gz datasets are allowed. If Pandas DataFrame, the DataFrame containing the dataset.

  • dataset_name (str) – Name of the dataset. Default is Dataset.

  • store_path (str, optional) – Path where the splits will be stored. Defaults to the DEFAULT_PATH

  • test_offset (pd.Timedelta, optional) – Time difference with respect to the starting timestamp of the first trace, from which any trace with the same or a later starting timestamp is added to the test partition.

  • val_offset (pd.Timedelta, optional) – Time difference with respect to the start timestamp of the first trace, from which any trace with a start timestamp equal to or later, but less than the start timestamp plus test_offset, is added to the validation partition.

  • timestamp_column (str, optional) – Name of the timestamp column in the original dataset file. Default is XesFields.TIMESTAMP_COLUMN.

  • case_column (str, optional) – Name of the case identifier in the original dataset file. Default is XesFields.CASE_COLUMN.

Returns:

Returns a tuple containing the DataFrames for the train, validation, and test splits.

Return type:

Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]

Raises:

ValueError – If an invalid value for test_offset or val_offset is provided.

Examples

>>> train_df, _, test_df = make_temporal_split('path/to/dataset.csv', test_offset=pd.Timedelta(days=730))

verona.data.statistics module

verona.data.statistics.get_activities_list(dataset: DataFrame, activity_id: str = 'Activity') list[source]

Returns the list of unique activities in the dataset.

Parameters:
  • dataset (pd.DataFrame) – DataFrame containing the dataset to be analyzed.

  • activity_id (str, optional) – Name of the activity column in the DataFrame. Default is DataFrameFields.ACTIVITY_COLUMN.

Returns:

A list containing unique activities in the dataset.

Return type:

list

Raises:

ValueError – If the dataset is empty or the activity column does not exist.

Examples

>>> df = pd.DataFrame({'activity': ['A', 'B', 'A', 'C']})
>>> activities_list = get_activities_list(df)
>>> print(activities_list)
['A', 'B', 'C']
verona.data.statistics.get_avg_duration_case(dataset: DataFrame, case_id: str = 'CaseID', timestamp_id: str = 'Timestamp') Timedelta[source]

Returns the average case temporal duration in the dataset.

Parameters:
  • dataset (pd.DataFrame) – DataFrame containing the dataset to be analyzed.

  • case_id (str, optional) – Name of the case identifier column in the DataFrame. Default is DataFrameFields.CASE_COLUMN.

  • timestamp_id (str, optional) – Name of the timestamp column in the DataFrame. Default is DataFrameFields.TIMESTAMP_COLUMN.

Returns:

The average temporal duration of a case in the dataset.

Return type:

pd.Timedelta

Raises:

ValueError – If the dataset is empty, or the case identifier or timestamp columns do not exist.

Examples

>>> df = pd.read_csv('../../BPI_Challenge_2012_A.csv')
>>> min_dur_case = get_avg_duration_case(df, 'CaseID', 'Timestamp')
>>> print(min_dur_case)
8 days 01:55:14.860649805
verona.data.statistics.get_avg_duration_event(dataset: DataFrame, case_id: str = 'CaseID', timestamp_id: str = 'Timestamp') Timedelta[source]

Returns the average event temporal duration in the dataset.

Parameters:
  • dataset (pd.DataFrame) – DataFrame containing the dataset to be analyzed.

  • case_id (str, optional) – Name of the case identifier column in the DataFrame. Default is DataFrameFields.CASE_COLUMN.

  • timestamp_id (str, optional) – Name of the timestamp column in the DataFrame. Default is DataFrameFields.TIMESTAMP_COLUMN.

Returns:

The average temporal duration of an event in the dataset.

Return type:

pd.Timedelta

Raises:

ValueError – If the dataset is empty, or the case identifier or timestamp columns do not exist.

Examples

>>> df = pd.read_csv('../../BPI_Challenge_2012_A.csv')
>>> avg_dur_event = get_avg_duration_event(df, 'CaseID', 'Timestamp')
>>> print(avg_dur_event)
2 days 05:08:06.570523093
verona.data.statistics.get_avg_len_case(dataset: DataFrame, case_id: str = 'CaseID') float[source]

Returns the average case length in the dataset.

Parameters:
  • dataset (pd.DataFrame) – DataFrame containing the dataset to be analyzed.

  • case_id (str, optional) – Name of the case identifier column in the DataFrame. Default is DataFrameFields.CASE_COLUMN.

Returns:

The average case length in the dataset.

Return type:

float

Raises:

ValueError – If the dataset is empty or the case identifier column does not exist.

Examples

>>> df = pd.DataFrame({'case': [1, 1, 2, 2, 3], 'activity': ['A', 'B', 'A', 'C', 'D']})
>>> avg_len = get_avg_len_case(df, 'case')
>>> print(avg_len)
1.6666666666666667
verona.data.statistics.get_count_variants(dataset: DataFrame, case_id: str = 'CaseID', activity_id: str = 'Activity') dict[source]

Returns the number of times each variant appears in the dataset.

Parameters:
  • dataset (pd.DataFrame) – DataFrame containing the dataset to be analyzed.

  • case_id (str, optional) – Name of the case identifier column in the DataFrame. Default is DataFrameFields.CASE_COLUMN.

  • activity_id (str, optional) – Name of the activity column in the DataFrame. Default is DataFrameFields.ACTIVITY_COLUMN.

Returns:

Dictionary where the keys are the variants and the values are the count of occurrences of each variant

in the dataset.

Return type:

dict

Raises:

ValueError – If the dataset is empty, or the case identifier or activity columns do not exist.

Examples

>>> df = pd.DataFrame({'case': [1, 1, 2, 2, 3], 'activity': ['A', 'B', 'A', 'C', 'D']})
>>> count_variants = get_count_variants(df, 'case', 'activity')
>>> print(count_variants)
{'A->B': 1, 'A->C': 1, 'D': 1}
verona.data.statistics.get_max_duration_case(dataset: DataFrame, case_id: str = 'CaseID', timestamp_id: str = 'Timestamp') Timedelta[source]

Returns the maximum case temporal duration in the dataset.

Parameters:
  • dataset (pd.DataFrame) – DataFrame containing the dataset to be analyzed.

  • case_id (str, optional) – Name of the case identifier column in the DataFrame. Default is DataFrameFields.CASE_COLUMN.

  • timestamp_id (str, optional) – Name of the timestamp column in the DataFrame. Default is DataFrameFields.TIMESTAMP_COLUMN.

Returns:

The maximum temporal duration of a case in the dataset.

Return type:

pd.Timedelta

Raises:

ValueError – If the dataset is empty, or the case identifier or timestamp columns do not exist.

Examples

>>> df = pd.read_csv('../../BPI_Challenge_2012_A.csv')
>>> max_dur_case = get_max_duration_case(df, 'CaseID', 'Timestamp')
>>> print(max_dur_case)
91 days 10:55:36.161000
verona.data.statistics.get_max_duration_event(dataset: DataFrame, case_id: str = 'CaseID', timestamp_id: str = 'Timestamp') Timedelta[source]

Returns the maximum event temporal duration in the dataset.

Parameters:
  • dataset (pd.DataFrame) – DataFrame containing the dataset to be analyzed.

  • case_id (str, optional) – Name of the case identifier column in the DataFrame. Default is DataFrameFields.CASE_COLUMN.

  • timestamp_id (str, optional) – Name of the timestamp column in the DataFrame. Default is DataFrameFields.TIMESTAMP_COLUMN.

Returns:

The maximum temporal duration of an event in the dataset.

Return type:

pd.Timedelta

Raises:

ValueError – If the dataset is empty, or the case identifier or timestamp columns do not exist.

Examples

>>> df = pd.read_csv('../../BPI_Challenge_2012_A.csv')
>>> max_dur_event = get_max_duration_event(df, 'CaseID', 'Timestamp')
>>> print(max_dur_event)
89 days 13:10:06.164000
verona.data.statistics.get_max_len_case(dataset: DataFrame, case_id: str = 'CaseID') int[source]

Returns the maximum case length in the dataset.

Parameters:
  • dataset (pd.DataFrame) – DataFrame containing the dataset to be analyzed.

  • case_id (str, optional) – Name of the case identifier column in the DataFrame. Default is DataFrameFields.CASE_COLUMN.

Returns:

The maximum case length in the dataset.

Return type:

int

Raises:

ValueError – If the dataset is empty or the case identifier column does not exist.

Examples

>>> df = pd.DataFrame({'case': [1, 1, 2, 2, 3], 'activity': ['A', 'B', 'A', 'C', 'D']})
>>> max_len = get_max_len_case(df, 'case')
>>> print(max_len)
2
verona.data.statistics.get_min_duration_case(dataset: DataFrame, case_id: str = 'CaseID', timestamp_id: str = 'Timestamp') Timedelta[source]

Returns the minimum case temporal duration in the dataset.

Parameters:
  • dataset (pd.DataFrame) – DataFrame containing the dataset to be analyzed.

  • case_id (str, optional) – Name of the case identifier column in the DataFrame. Default is DataFrameFields.CASE_COLUMN.

  • timestamp_id (str, optional) – Name of the timestamp column in the DataFrame. Default is DataFrameFields.TIMESTAMP_COLUMN.

Returns:

The minimum temporal duration of a case in the dataset.

Return type:

pd.Timedelta

Raises:

ValueError – If the dataset is empty, or the case identifier or timestamp columns do not exist.

Examples

>>> df = pd.read_csv('../../BPI_Challenge_2012_A.csv')
>>> min_dur_case = get_max_duration_case(df, 'CaseID', 'Timestamp')
>>> print(min_dur_case)
0 days 00:00:01.855000
verona.data.statistics.get_min_duration_event(dataset: DataFrame, case_id: str = 'CaseID', timestamp_id: str = 'Timestamp') Timedelta[source]

Returns the minimum event temporal duration in the dataset.

Parameters:
  • dataset (pd.DataFrame) – DataFrame containing the dataset to be analyzed.

  • case_id (str, optional) – Name of the case identifier column in the DataFrame. Default is DataFrameFields.CASE_COLUMN.

  • timestamp_id (str, optional) – Name of the timestamp column in the DataFrame. Default is DataFrameFields.TIMESTAMP_COLUMN.

Returns:

The minimum temporal duration of an event in the dataset.

Return type:

pd.Timedelta

Raises:

ValueError – If the dataset is empty, or the case identifier or timestamp columns do not exist.

Examples

>>> df = pd.read_csv('../../BPI_Challenge_2012_A.csv')
>>> min_dur_event = get_min_duration_event(df, 'CaseID', 'Timestamp')
>>> print(min_dur_event)
0 days 00:00:00
verona.data.statistics.get_min_len_case(dataset: DataFrame, case_id: str = 'CaseID') int[source]

Returns the minimum case length in the dataset.

Parameters:
  • dataset (pd.DataFrame) – DataFrame containing the dataset to be analyzed.

  • case_id (str, optional) – Name of the case identifier column in the DataFrame. Default is DataFrameFields.CASE_COLUMN.

Returns:

The minimum case length in the dataset.

Return type:

int

Raises:

ValueError – If the dataset is empty or the case identifier column does not exist.

Examples

>>> df = pd.DataFrame({'case': [1, 1, 2, 2, 3], 'activity': ['A', 'B', 'A', 'C', 'D']})
>>> min_len = get_min_len_case(df, 'case')
>>> print(min_len)
1
verona.data.statistics.get_num_activities(dataset: DataFrame, activity_id: str = 'Activity') int[source]

Returns the number of unique activities in the dataset.

Parameters:
  • dataset (pd.DataFrame) – DataFrame containing the dataset to be analyzed.

  • activity_id (str, optional) – Name of the activity column in the DataFrame. Default is DataFrameFields.ACTIVITY_COLUMN.

Returns:

The number of unique activities in the dataset.

Return type:

int

Raises:

ValueError – If the dataset is empty or the activity column does not exist.

Examples

>>> df = pd.DataFrame({'activity': ['A', 'B', 'A', 'C']})
>>> num_activities = get_num_activities(df)
>>> print(num_activities)
3
verona.data.statistics.get_num_cases(dataset: DataFrame, case_id: str = 'CaseID') int[source]

Returns the number of unique cases in the dataset.

Parameters:
  • dataset (pd.DataFrame) – DataFrame containing the dataset to be analyzed.

  • case_id (str, optional) – Name of the case identifier column in the DataFrame. Default is DataFrameFields.CASE_COLUMN.

Returns:

The number of unique cases in the dataset.

Return type:

int

Raises:

ValueError – If the dataset is empty or the case identifier column does not exist.

Examples

>>> df = pd.DataFrame({'case': [1, 1, 2, 2, 3], 'activity': ['A', 'B', 'A', 'C', 'D']})
>>> num_cases = get_num_cases(df, 'case')
>>> print(num_cases)
3
verona.data.statistics.get_num_values(dataset: DataFrame, attribute_id: str) int[source]

Returns the number of unique values for the specified attribute in the dataset.

Parameters:
  • dataset (pd.DataFrame) – DataFrame containing the dataset to be analyzed.

  • attribute_id (str) – Name of the attribute column in the DataFrame.

Returns:

The number of unique values for the specified attribute in the dataset.

Return type:

int

Raises:

ValueError – If the dataset is empty or the attribute column does not exist.

Examples

>>> df = pd.DataFrame({'attribute': [1, 2, 2, 3]})
>>> num_values = get_num_values(df, 'attribute')
>>> print(num_values)
3
verona.data.statistics.get_num_variants(dataset: DataFrame, case_id: str = 'CaseID', activity_id: str = 'Activity') int[source]

Returns the number of unique cases (different sequences of activities) in the dataset.

Parameters:
  • dataset (pd.DataFrame) – DataFrame containing the dataset to be analyzed.

  • case_id (str, optional) – Name of the case identifier column in the DataFrame. Default is DataFrameFields.CASE_COLUMN.

  • activity_id (str, optional) – Name of the activity column in the DataFrame. Default is DataFrameFields.ACTIVITY_COLUMN.

Returns:

The number of variants (cases with different sequences of activities).

Return type:

int

Raises:

ValueError – If the dataset is empty, or the case identifier or activity columns do not exist.

Examples

>>> df = pd.DataFrame({'case': [1, 1, 2, 2, 3], 'activity': ['A', 'B', 'A', 'C', 'D']})
>>> num_variants = get_num_variants(df, 'case', 'activity')
>>> print(num_variants)
3
verona.data.statistics.get_values_list(dataset: DataFrame, attribute_id: str = 'Activity') list[source]

Returns the list of unique values for the specified attribute in the dataset.

Parameters:
  • dataset (pd.DataFrame) – DataFrame containing the dataset to be analyzed.

  • attribute_id (str, optional) – Name of the attribute column in the DataFrame. Default is DataFrameFields.ACTIVITY_COLUMN.

Returns:

The list of unique values for the specified attribute in the dataset.

Return type:

list

Raises:

ValueError – If the dataset is empty or the attribute column does not exist.

Examples

>>> df = pd.DataFrame({'attribute': [1, 2, 2, 3]})
>>> values_list = get_values_list(df, 'attribute')
>>> print(values_list)
[1, 2, 3]

verona.data.utils module

class verona.data.utils.DataFrameFields[source]

Bases: object

Common column names that may be present in a csv log.

ACTIVITY_COLUMN = 'Activity'
CASE_COLUMN = 'CaseID'
RESOURCE_COLUMN = 'Resource'
TIMESTAMP_COLUMN = 'Timestamp'
class verona.data.utils.XesFields[source]

Bases: object

Common xes fields that may be present in a xes log.

ACTIVITY_COLUMN = 'concept:name'
CASE_COLUMN = 'case:concept:name'
LIFECYCLE_COLUMN = 'lifecycle:transition'
RESOURCE_COLUMN = 'org:resource'
TIMESTAMP_COLUMN = 'time:timestamp'
verona.data.utils.categorize_attribute(attr: ~pandas.core.series.Series) -> (<class 'pandas.core.series.Series'>, <class 'dict'>, <class 'dict'>)[source]

Convert the attribute column type in the Pandas DataFrame dataset to categorical (integer indexes).

Parameters:

attr (pd.Series) – Pandas Series of the attribute column in the dataset.

Returns:

Pandas Series representing the attribute column with the integer indexes

instead of the original values.

dict: A dictionary with the conversions (key: categorical index, value: original value). dict: The reverse dictionary (key: original value, value: categorical index).

Return type:

pd.Series

verona.data.utils.get_aggregation_representation(prefix: DataFrame, unique_activities: array, numeric_columns: array = None, numeric_aggr_func: Literal['max', 'min', 'avg', 'sum'] = 'avg', activity_column: str = 'concept:name', relative_freq: bool = False) array[source]

Gets the aggregation sequence encoding described in [1]. Activities are represented by their frequency (absolute or relativea) of occurrence in the prefix. Numerical variables are represented by general statistics such as maximum, minimum, mean or sum.

Parameters:
  • prefix (pd.DataFrame) – DataFrame containing the events of the prefix.

  • unique_activities (np.array) – NumPy Array of unique activities labels.

  • numeric_columns (np.array, optional) – NumPy Array of names of the numerical columns to be represented. If any columns with time data are included, make sure they are correctly converted to numeric value.

  • numeric_aggr_func (Literal['max', 'min', 'avg', 'sum']) –

    Statistical function to be used to obtain the representative value of the numerical variables.

    • 'max': Uses the maximum value of the numerical attribute in the prefix.

    • 'min': Uses the minimum value of the numerical attribute in the prefix.

    • 'avg': Uses the mean value of the numerical attribute in the prefix.

    • 'sum': Uses the sum of the values of the numerical attribute in the prefix.

    Default is 'avg'.

  • activity_column (str, optional) – Name of the activity column. Only numeric labels are allowed. Default is XesFields.ACTIVITY_COLUMN.

  • relative_freq (bool, optional) – Whether to use absolute frequency (False) or relative (True) to prefix length to represent activities. Default is False.

Returns:

NumPy Array containing the aggregation representation of the input prefix.

Raises:

ValueError – If an invalid value of numeric_aggr_func is provided.

References

[1] Teinemaa, I., Dumas, M., Rosa, M. L., & Maggi, F. M. (2019). Outcome-oriented predictive process

monitoring: Review and benchmark. ACM Transactions on Knowledge Discovery from Data (TKDD), 13(2), 1-57.

verona.data.utils.get_labels_from_onehot(onehots: array) array[source]

Gets the labels represented in the one-hot vectors passed as input.

Parameters:

onehots (np.array) – NumPy Array containing the one-hot vectors.

Returns:

NumPy Array containing the labels extracted from the one-hot vectors.

Return type:

np.array

verona.data.utils.get_onehot_representation(attribute: array, num_elements: int) array[source]

Gets attribute values as labels and converts them to their one-hot representation.

Parameters:
  • attribute (np.array) – NumPy Array containing the values of the categorical attribute. Only numeric labels are allowed.

  • num_elements (int) – Integer indicating the number of unique values of the attribute, which is the size of the one-hot vector. If not specified, the vector size is calculated from the number of unique elements in ‘attribute’.

Returns:

NumPy Array containing the one-hot vectors.

Return type:

np.array

verona.data.utils.read_eventlog(dataset: str | DataFrame, sort_events_in_trace: bool = False, sort_traces: bool = False, timestamp_column: str = 'time:timestamp', case_column: str = 'case:concept:name') DataFrame[source]

Reads the event log and returns it as a Pandas DataFrame. Optionally, temporally sorts the events within a case and the cases within the eventlog by their start timestamp.

Parameters:
  • dataset (str | pd.DataFrame) – If string, full path to the dataset to be split. Only csv, xes, and xes.gz datasets are allowed. If Pandas DataFrame, the DataFrame containing the dataset.

  • sort_events_in_trace (bool, optional) – If True, sort the events within each case by their timestamp. Default is False.

  • sort_traces (bool, optional) – If True, sort cases by their start timestamp (the timestamp of their first event). Default is False.

  • timestamp_column (str, optional) – Name of the timestamp column in the eventlog. Default is XesFields.TIMESTAMP_COLUMN.

  • case_column (str, optional) – Name of the case identifier in the eventlog. Default is XesFields.CASE_COLUMN.

Returns:

A Pandas DataFrame containing the eventlog.

Return type:

pd.DataFrame

Raises:
  • ValueError – If an invalid extension is provided when calling the function with dataset as a string.

  • TypeError – If dataset is neither a string nor a Pandas DataFrame

verona.data.utils.sort_dataset(dataset: DataFrame, timestamp_column: str = 'time:timestamp', case_column: str = 'case:concept:name') DataFrame[source]

Sort the cases of the dataset by their first timestamp.

Parameters:
  • dataset (pd.DataFrame) – DataFrame containing all the events.

  • timestamp_column (str, optional) – Name of the timestamp column in the DataFrame. Default is XesFields.TIMESTAMP_COLUMN.

  • case_column (str, optional) – Name of the case identifier column in the DataFrame. Default is XesFields.CASE_COLUMN.

Returns:

The cases, as Pandas DataFrame, sorted by their first timestamp.

Return type:

pd.DataFrame

verona.data.utils.sort_events(dataset: DataFrame, timestamp_column: str = 'time:timestamp', case_column: str = 'case:concept:name') DataFrame[source]

Sort events within each case by timestamp.

Parameters:
  • dataset (pd.DataFrame) – DataFrame containing all the events.

  • timestamp_column (str, optional) – Name of the timestamp column in the DataFrame. Default is XesFields.TIMESTAMP_COLUMN.

  • case_column (str, optional) – Name of the case identifier column in the DataFrame. Default is XesFields.CASE_COLUMN.

Returns:

The events of each case, as Pandas DataFrame, sorted by timestamp.

Return type:

pd.DataFrame

verona.data.utils.unify_activity_and_lifecycle(dataset: DataFrame, activity_id: str = 'concept:name', lifecycle_id: str = 'lifecycle:transition', drop_lifecycle_column: bool = True) DataFrame[source]

Gets real activities by unifying the values in the activity and lifecycle columns, like it’s done in [1].

Parameters:
  • dataset (pd.DataFrame) – DataFrame containing the dataset.

  • activity_id (str, optional) – Name of the activity column in the DataFrame. Default is XesFields.ACTIVITY_COLUMN.

  • lifecycle_id (str, optional) – Name of the lifecycle column in the DataFrame. Default is XesFields.LIFECYCLE_COLUMN.

  • drop_lifecycle_column (bool, optional) – Delete the lifecycle column after the conversion. Default is True.

Returns:

The dataset, as Pandas DataFrame, updated.

Return type:

pd.DataFrame

References

[1] Rama-Maneiro, E., Vidal, J. C., & Lama, M. (2023). Deep Learning for Predictive Business Process Monitoring:

Review and Benchmark. IEEE Transactions on Services Computing, 16(1), 739-756. doi:10.1109/TSC.2021.3139807