"""
The statistics module has several functionalities that calculate kinematic
statistics of the trajectory, split trajectories, pivot dataframes etc.
The main purpose of this module is to get the dataframe ready for Machine
Learning tasks such as clustering, calssification etc.
| Author: Yaksh J Haranwala
"""
import itertools
from typing import Optional
import pandas as pd
from ptrail.core.TrajectoryDF import PTRAILDataFrame
from ptrail.features.kinematic_features import KinematicFeatures
from ptrail.preprocessing.helpers import Helpers as helpers
import ptrail.utilities.constants as const
from math import ceil
import multiprocessing
import os
num = os.cpu_count()
NUM_CPU = ceil((num * 2) / 3)
[docs]class Statistics:
[docs] @staticmethod
def segment_traj_by_days(dataframe: PTRAILDataFrame, num_days):
"""
Given a dataframe containing trajectory data, segment all
the trajectories by each week.
Parameters
----------
df: PTRAILDataFrame
The dataframe containing trajectory data.
num_days: int
The number of days that each segment is supposed to have.
Returns
-------
pandas.core.dataframe.DataFrame:
The dataframe containing segmented trajectories
with a new column added called segment_id
"""
# splitting the dataframe according to trajectory ids
df_chunks = helpers._df_split_helper(dataframe=dataframe.reset_index())
# Here, create 2/3rds number of processes as there are in the system. Some CPUs are
# kept free at all times in order to not block up the system.
# (Note: The blocking of system is mostly prevalent in Windows and does not happen very often
# in Linux. However, out of caution some CPUs are kept free regardless of the system.)
pool = multiprocessing.Pool(NUM_CPU)
results = pool.starmap(helpers.split_traj_helper, zip(df_chunks, itertools.repeat(num_days)))
pool.close()
pool.join()
to_return = pd.concat(results).reset_index().set_index(['traj_id', 'seg_id', 'DateTime'])
return to_return.drop(columns=['index'])
[docs] @staticmethod
def generate_kinematic_stats(dataframe: PTRAILDataFrame, target_col_name: str, segmented: Optional[bool] = False):
"""
Generate the statistics of kinematic features for each unique trajectory in
the dataframe.
Parameters
----------
dataframe: PTRAILDataFrame
The dataframe containing the trajectory data.
target_col_name: str
This is the 'y' value that is used for ML tasks, this is
asked to append the target_col back at the end.
segmented: Optional[bool]
Indicate whether the trajectory has segments or not.
Returns
-------
pandas.core.dataframe.DataFrame:
A pandas dataframe containing stats for all kinematic features for
each unique trajectory in the dataframe.
"""
# Generate kinematic features on the entire dataframe.
ptdf = KinematicFeatures.generate_kinematic_features(dataframe)
# Then, lets break down the entire dataframe into pieces containing data of
# 1 trajectory in each piece.
ids_ = list(dataframe.reset_index().traj_id.value_counts().keys())
df_chunks = []
if segmented:
segments = ptdf.reset_index()['seg_id'].unique()
for i in range(len(ids_)):
for seg in segments:
df = ptdf.reset_index().loc[(ptdf.reset_index()['traj_id'] == ids_[i])
& (ptdf.reset_index()['seg_id'] == seg)]
if len(df) > 0:
df_chunks.append(df)
else:
continue
else:
for i in range(len(ids_)):
small_df = ptdf.reset_index().loc[ptdf.reset_index()[const.TRAJECTORY_ID] == ids_[i]]
df_chunks.append(small_df)
# Here, create 2/3rds number of processes as there are in the system. Some CPUs are
# kept free at all times in order to not block up the system.
# (Note: The blocking of system is mostly prevalent in Windows and does not happen very often
# in Linux. However, out of caution some CPUs are kept free regardless of the system.)
mp_pool = multiprocessing.Pool(NUM_CPU)
results = mp_pool.starmap(helpers.stats_helper, zip(df_chunks,
itertools.repeat(target_col_name),
itertools.repeat(segmented)))
mp_pool.close()
mp_pool.join()
return pd.concat(results)
[docs] @staticmethod
def pivot_stats_df(dataframe, target_col_name: str, segmented: Optional[bool] = False):
"""
Given a dataframe with stats present in it, melt the dataframe to make it
ready for ML tasks. This is specifically for melting the type of dataframe
generated by the generate_kinematic_stats() function of the kinematic_features
module.
Check the kinematic_features module for further details about the dataframe
expected.
Parameters
----------
dataframe: pd.core.dataframe.DataFrame
The dataframe containing stats.
target_col_name: str
This is the 'y' value that is used for ML tasks, this is
asked to append the target_col back at the end.
segmented: Optional[bool]
Indicate whether the trajectory has segments or not.
Returns
-------
pd.core.dataframe.DataFrame:
The dataframe above which is pivoted and has rows converted to columns.
"""
# Get all the unique trajectory IDs.
ids_ = list(dataframe.index.get_level_values('traj_id').unique())
final_chunks = []
if not segmented:
for val in ids_:
# separated the data for each trajectory id.
small = dataframe.loc[dataframe.index.get_level_values('traj_id') == val].reset_index().set_index(
'traj_id')
# Get the target value out and drop the target column.
target = small[target_col_name].iloc[0]
small = small.drop(columns=[target_col_name])
# Pivot the table now and adjust the column names.
pivoted = small.reset_index().pivot_table(index='traj_id', columns='Columns')
pivoted.columns = pivoted.columns.map('_'.join).str.strip('|')
# Assign the target column again.
pivoted[target_col_name] = target
final_chunks.append(pivoted)
else:
for val in ids_:
segments = dataframe.index.get_level_values('seg_id').unique()
for seg in segments:
small = dataframe.loc[(dataframe.index.get_level_values('traj_id') == val) &
(dataframe.index.get_level_values('seg_id') == seg)].reset_index().set_index(['traj_id', 'seg_id'])
if len(small) <= 0:
continue
# Get the target value out and drop the target column.
target = small[target_col_name].iloc[0]
small = small.drop(columns=[target_col_name])
# Pivot the table now and adjust the column names.
pivoted = small.reset_index().pivot_table(index=['traj_id', 'seg_id'], columns='Columns')
pivoted.columns = pivoted.columns.map('_'.join).str.strip('|')
# Assign the target column again.
pivoted[target_col_name] = target
final_chunks.append(pivoted)
# Concatenate the smaller chunks and reorder the columns.
to_return = pd.concat(final_chunks)
# Store the correct order of the columns to a variable and add the name
# of the target column to the end of it.
cols = const.ORDERED_COLS
cols.append(target_col_name)
# Reorder the final DF, drop duplicated columns and return it.
to_return = to_return[cols]
to_return = to_return.loc[:, ~to_return.columns.duplicated()]
return to_return