Source code for verona.data.utils

from typing import Literal, Tuple, List, Union
import numpy as np
import pandas as pd
import pm4py


[docs] class XesFields: """ Common xes fields that may be present in a xes log. """ CASE_COLUMN = "case:concept:name" ACTIVITY_COLUMN = "concept:name" TIMESTAMP_COLUMN = "time:timestamp" LIFECYCLE_COLUMN = "lifecycle:transition" RESOURCE_COLUMN = "org:resource"
[docs] class DataFrameFields: """ Common column names that may be present in a csv log. """ CASE_COLUMN = "CaseID" ACTIVITY_COLUMN = "Activity" TIMESTAMP_COLUMN = "Timestamp" RESOURCE_COLUMN = "Resource"
[docs] def read_eventlog(dataset: Union[str, pd.DataFrame], sort_events_in_trace: bool = False, sort_traces: bool = False, timestamp_column: str = XesFields.TIMESTAMP_COLUMN, case_column: str = XesFields.CASE_COLUMN) -> pd.DataFrame: """ Reads the event log and returns it as a Pandas DataFrame. Optionally, temporally sorts the events within a case and the cases within the eventlog by their start timestamp. Args: dataset (str | pd.DataFrame): If string, full path to the dataset to be split. Only csv, xes, and xes.gz datasets are allowed. If Pandas DataFrame, the DataFrame containing the dataset. sort_events_in_trace (bool, optional): If True, sort the events within each case by their timestamp. Default is ``False``. sort_traces (bool, optional): If True, sort cases by their start timestamp (the timestamp of their first event). Default is ``False``. timestamp_column (str, optional): Name of the timestamp column in the eventlog. Default is ``XesFields.TIMESTAMP_COLUMN``. case_column (str, optional): Name of the case identifier in the eventlog. Default is ``XesFields.CASE_COLUMN``. Returns: pd.DataFrame: A Pandas DataFrame containing the eventlog. Raises: ValueError: If an invalid extension is provided when calling the function with **dataset** as a string. TypeError: If **dataset** is neither a string nor a Pandas DataFrame """ if type(dataset) is str: filename = dataset.split('/')[-1] if len(filename.split('.')) == 1: filename += '.csv' input_extension = filename.split('.')[-1] if input_extension == 'gz': input_extension = '.'.join(filename.split('.')[-2:]) if input_extension == "xes" or input_extension == "xes.gz": df_log = pm4py.read_xes(dataset) elif input_extension == "csv": df_log = pd.read_csv(dataset) else: raise ValueError(f'Wrong dataset extension: {input_extension}. ' f'Only .csv, .xes and .xes.gz datasets are allowed.') elif type(dataset) is pd.DataFrame: df_log = dataset else: raise TypeError(f'Wrong type for parameter dataset: {type(dataset)}. ' f'Only str and pd.DataFrame types are allowed.') if sort_events_in_trace: df_log = sort_events(df_log, timestamp_column, case_column) if sort_traces: df_log = sort_dataset(df_log, timestamp_column, case_column) return df_log
[docs] def categorize_attribute(attr: pd.Series) -> (pd.Series, dict, dict): """ Convert the attribute column type in the Pandas DataFrame dataset to categorical (integer indexes). Args: attr (pd.Series): Pandas Series of the attribute column in the dataset. Returns: pd.Series: Pandas Series representing the attribute column with the integer indexes instead of the original values. dict: A dictionary with the conversions (key: categorical index, value: original value). dict: The reverse dictionary (key: original value, value: categorical index). """ uniq_attr = attr.unique() attr_dict = {idx: value for idx, value in enumerate(uniq_attr)} reverse_dict = {value: key for key, value in attr_dict.items()} attr_cat = pd.Series(map(lambda x: reverse_dict[x], attr.values)) return attr_cat, attr_dict, reverse_dict
[docs] def unify_activity_and_lifecycle(dataset: pd.DataFrame, activity_id: str = XesFields.ACTIVITY_COLUMN, lifecycle_id: str = XesFields.LIFECYCLE_COLUMN, drop_lifecycle_column: bool = True) -> pd.DataFrame: """ Gets real activities by unifying the values in the activity and lifecycle columns, like it's done in [1]. Args: dataset (pd.DataFrame): DataFrame containing the dataset. activity_id (str, optional): Name of the activity column in the DataFrame. Default is ``XesFields.ACTIVITY_COLUMN``. lifecycle_id (str, optional): Name of the lifecycle column in the DataFrame. Default is ``XesFields.LIFECYCLE_COLUMN``. drop_lifecycle_column (bool, optional): Delete the lifecycle column after the conversion. Default is ``True``. Returns: pd.DataFrame: The dataset, as Pandas DataFrame, updated. References: [1] Rama-Maneiro, E., Vidal, J. C., & Lama, M. (2023). Deep Learning for Predictive Business Process Monitoring: Review and Benchmark. IEEE Transactions on Services Computing, 16(1), 739-756. doi:10.1109/TSC.2021.3139807 """ if lifecycle_id not in dataset: raise ValueError(f'Wrong lifecycle identifier: {lifecycle_id} is not a column in the dataframe.') dataset.loc[:, activity_id] = dataset[activity_id].astype(str) + '+' + dataset[lifecycle_id].astype(str) if drop_lifecycle_column: dataset.drop(lifecycle_id, axis=1) return dataset
[docs] def sort_events(dataset: pd.DataFrame, timestamp_column: str = XesFields.TIMESTAMP_COLUMN, case_column: str = XesFields.CASE_COLUMN) -> pd.DataFrame: """ Sort events within each case by timestamp. Args: dataset (pd.DataFrame): DataFrame containing all the events. timestamp_column (str, optional): Name of the timestamp column in the DataFrame. Default is ``XesFields.TIMESTAMP_COLUMN``. case_column (str, optional): Name of the case identifier column in the DataFrame. Default is ``XesFields.CASE_COLUMN``. Returns: pd.DataFrame: The events of each case, as Pandas DataFrame, sorted by timestamp. """ dataset[timestamp_column] = pd.to_datetime(dataset[timestamp_column]) sorted_events = (dataset.groupby(case_column).apply(lambda case: case.sort_values(by=timestamp_column)) .reset_index(drop=True)) return sorted_events
[docs] def sort_dataset(dataset: pd.DataFrame, timestamp_column: str = XesFields.TIMESTAMP_COLUMN, case_column: str = XesFields.CASE_COLUMN) -> pd.DataFrame: """ Sort the cases of the dataset by their first timestamp. Args: dataset (pd.DataFrame): DataFrame containing all the events. timestamp_column (str, optional): Name of the timestamp column in the DataFrame. Default is ``XesFields.TIMESTAMP_COLUMN``. case_column (str, optional): Name of the case identifier column in the DataFrame. Default is ``XesFields.CASE_COLUMN``. Returns: pd.DataFrame: The cases, as Pandas DataFrame, sorted by their first timestamp. """ dataset[timestamp_column] = pd.to_datetime(dataset[timestamp_column]) dataset['min_timestamp'] = dataset.groupby(case_column)[timestamp_column].transform('min') sorted_dataset = dataset.sort_values(by=['min_timestamp', case_column, timestamp_column]) sorted_dataset = sorted_dataset.drop(columns='min_timestamp') return sorted_dataset
[docs] def get_onehot_representation(attribute: np.array, num_elements: int) -> np.array: """ Gets attribute values as labels and converts them to their one-hot representation. Args: attribute (np.array): NumPy Array containing the values of the categorical attribute. Only numeric labels are allowed. num_elements (int): Integer indicating the number of unique values of the attribute, which is the size of the one-hot vector. If not specified, the vector size is calculated from the number of unique elements in 'attribute'. Returns: np.array: NumPy Array containing the one-hot vectors. """ if not num_elements: num_elements = np.unique(attribute).size if attribute.ndim > 1: attribute = attribute.flatten() onehot_attr = np.zeros((attribute.size, num_elements)) onehot_attr[np.arange(attribute.size), attribute] = 1 return onehot_attr
[docs] def get_aggregation_representation(prefix: pd.DataFrame, unique_activities: np.array, numeric_columns: np.array = None, numeric_aggr_func: Literal['max', 'min', 'avg', 'sum'] = 'avg', activity_column: str = XesFields.ACTIVITY_COLUMN, relative_freq: bool = False) -> np.array: """ Gets the aggregation sequence encoding described in [1]. Activities are represented by their frequency (absolute or relativea) of occurrence in the prefix. Numerical variables are represented by general statistics such as maximum, minimum, mean or sum. Args: prefix (pd.DataFrame): DataFrame containing the events of the prefix. unique_activities (np.array): NumPy Array of unique activities labels. numeric_columns (np.array, optional): NumPy Array of names of the numerical columns to be represented. If any columns with time data are included, make sure they are correctly converted to numeric value. numeric_aggr_func (Literal['max', 'min', 'avg', 'sum']): Statistical function to be used to obtain the representative value of the numerical variables. - ``'max'``: Uses the maximum value of the numerical attribute in the prefix. - ``'min'``: Uses the minimum value of the numerical attribute in the prefix. - ``'avg'``: Uses the mean value of the numerical attribute in the prefix. - ``'sum'``: Uses the sum of the values of the numerical attribute in the prefix. Default is ``'avg'``. activity_column (str, optional): Name of the activity column. Only numeric labels are allowed. Default is ``XesFields.ACTIVITY_COLUMN``. relative_freq (bool, optional): Whether to use absolute frequency (``False``) or relative (``True``) to prefix length to represent activities. Default is ``False``. Returns: NumPy Array containing the aggregation representation of the input prefix. Raises: ValueError: If an invalid value of ``numeric_aggr_func`` is provided. References: [1] Teinemaa, I., Dumas, M., Rosa, M. L., & Maggi, F. M. (2019). Outcome-oriented predictive process monitoring: Review and benchmark. ACM Transactions on Knowledge Discovery from Data (TKDD), 13(2), 1-57. """ activity_counts = prefix[activity_column].value_counts() act_freq_array = np.zeros(len(unique_activities)) for i, activity in enumerate(unique_activities): if activity in activity_counts: act_freq_array[i] = activity_counts[activity] if relative_freq: act_freq_array = act_freq_array / len(prefix) if numeric_columns: numeric_array = np.zeros(len(numeric_columns)) for i, numeric_column in enumerate(numeric_columns): if numeric_aggr_func == 'max': numeric_array[i] = prefix[numeric_column].max() elif numeric_aggr_func == 'min': numeric_array[i] = prefix[numeric_column].min() elif numeric_aggr_func == 'avg': numeric_array[i] = prefix[numeric_column].mean() elif numeric_aggr_func == 'sum': numeric_array[i] = prefix[numeric_column].sum() else: raise ValueError(f'Wrong numeric aggregation function: {numeric_aggr_func}. ' f'Only max, min, avg and sum are allowed.') aggr_representation = np.concatenate([act_freq_array, numeric_array]) else: aggr_representation = act_freq_array return aggr_representation
[docs] def get_labels_from_onehot(onehots: np.array) -> np.array: """ Gets the labels represented in the one-hot vectors passed as input. Args: onehots (np.array): NumPy Array containing the one-hot vectors. Returns: np.array: NumPy Array containing the labels extracted from the one-hot vectors. """ return onehots.argmax(axis=-1)