Utility Functions (Structured Data Processing)
Utility Functions (Structured Data Processing)#
(Last updated: Jan 29, 2025)
import pandas as pd
import numpy as np
import pytz
import matplotlib.pyplot as plt
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.inspection import permutation_importance
from sklearn.metrics import (
confusion_matrix,
precision_recall_fscore_support,
accuracy_score
)
from sklearn.model_selection import cross_validate
import seaborn as sns
from sklearn.decomposition import PCA
from copy import deepcopy
def check_answer_df(df_result, df_answer, n=1):
"""
This function checks if two output dataframes are the same.
Parameters
----------
df_result : pandas.DataFrame
The result from the output of a function.
df_answer: pandas.DataFrame
The expected output of the function.
n : int
The numbering of the test case.
"""
try:
assert df_answer.equals(df_result)
print("Test case %d passed." % n)
except:
print("Test case %d failed." % n)
print("")
print("Your output is:")
print(df_result)
print("")
print("Expected output is:")
print(df_answer)
def compute_feature_importance(model, df_x, df_y, scoring="f1"):
"""
Compute feature importance of a model.
Parameters
----------
model : a sklearn model object
The classifier model.
df_x : pandas.DataFrame
The dataframe with features.
df_y : pandas.DataFrame
The dataframe with labels.
scoring : str
A scoring function as documented below.
https://scikit-learn.org/stable/modules/model_evaluation.html#scoring-parameter
"""
if model is None:
model = RandomForestClassifier()
print("Computer feature importance using", model)
model.fit(df_x, df_y.squeeze())
result = permutation_importance(model, df_x, df_y, n_repeats=10, random_state=0, scoring=scoring)
feat_names = df_x.columns.copy()
feat_ims = np.array(result.importances_mean)
sorted_ims_idx = np.argsort(feat_ims)[::-1]
feat_names = feat_names[sorted_ims_idx]
feat_ims = np.round(feat_ims[sorted_ims_idx], 5)
df = pd.DataFrame()
df["feature_importance"] = feat_ims
df["feature_name"] = feat_names
print("=====================================================================")
print(df)
print("=====================================================================")
def scorer(model, x, y):
"""
A customized scoring function to evaluate a classifier.
Parameters
----------
model : a sklearn model object
The classifier model.
x : pandas.DataFrame
The feature matrix.
y : pandas.Series
The label vector.
Returns
-------
dict of int or float
A dictionary of evaluation metrics.
"""
y_pred = model.predict(x)
c = confusion_matrix(y, y_pred, labels=[0,1])
p = precision_recall_fscore_support(y, y_pred, average="binary", zero_division=0)
a = accuracy_score(y, y_pred)
return {"tn": c[0,0], "fp": c[0,1], "fn": c[1,0], "tp": c[1,1],
"precision": p[0], "recall": p[1], "f1": p[2], "accuracy": a}
def train_and_evaluate(model, df_x, df_y, train_size=336, test_size=168):
"""
Parameters
----------
model : a sklearn model object
The classifier model.
df_x : pandas.DataFrame
The dataframe with features.
df_y : pandas.DataFrame
The dataframe with labels.
train_size : int
Number of samples for training.
test_size : int
Number of samples for testing.
"""
print("Use model", model)
print("Perform cross-validation, please wait...")
# Create time series splits for cross-validation.
splits = []
dataset_size = df_x.shape[0]
for i in range(train_size, dataset_size, test_size):
start = i - train_size
end = i + test_size
if (end >= dataset_size): break
train_index = range(start, i)
test_index = range(i, end)
splits.append((list(train_index), list(test_index)))
# Perform cross-validation.
cv_res = cross_validate(model, df_x, df_y.squeeze(), cv=splits, scoring=scorer)
# Print evaluation metrics.
print("================================================")
print("average f1-score:", round(np.mean(cv_res["test_f1"]), 2))
print("average precision:", round(np.mean(cv_res["test_precision"]), 2))
print("average recall:", round(np.mean(cv_res["test_recall"]), 2))
print("average accuracy:", round(np.mean(cv_res["test_accuracy"]), 2))
print("================================================")
def is_datetime_obj_tz_aware(dt):
"""
Find if the datetime object is timezone aware.
Parameters
----------
dt : pandas.DatetimeIndex
A datatime index object.
"""
return dt.tz is not None
def convert_wind_direction(df):
"""
Convert wind directions to sine and cosine components.
Parameters
----------
df : pandas.DataFrame
The data frame that contains the wind direction data.
Returns
-------
pandas.DataFrame
The transformed data frame.
"""
# Copy data frame to prevent editing the original one.
df_cp = df.copy(deep=True)
# Convert columns with wind directions.
for c in df.columns:
if "SONICWD_DEG" in c:
df_c = df[c]
df_c_cos = np.cos(np.deg2rad(df_c))
df_c_sin = np.sin(np.deg2rad(df_c))
df_c_cos.name += "_cosine"
df_c_sin.name += "_sine"
df_cp.drop([c], axis=1, inplace=True)
df_cp[df_c_cos.name] = df_c_cos
df_cp[df_c_sin.name] = df_c_sin
return df_cp
def plot_smell_by_day_and_hour(df):
"""
Plot the average number of smell reports by the day of week and the hour of day.
Parameters
----------
df : pandas.DataFrame
The preprocessed smell data.
"""
# Copy the data frame to prevent editing the original one.
df = df.copy(deep=True)
# Convert timestamps to the local time in Pittsburgh.
if is_datetime_obj_tz_aware(df.index):
df.index = df.index.tz_convert(pytz.timezone("US/Eastern"))
else:
df.index = df.index.tz_localize(pytz.utc, ambiguous="infer").tz_convert(pytz.timezone("US/Eastern"))
# Compute the day of week and the hour of day.
df["day_of_week"] = df.index.dayofweek
df["hour_of_day"] = df.index.hour
# Plot the graph.
y_l = ["Sun", "Mon", "Tue", "Wed", "Thu", "Fri", "Sat"]
df_pivot = pd.pivot_table(df, values="smell_value", index=["day_of_week"], columns=["hour_of_day"], aggfunc="mean")
f, ax = plt.subplots(figsize=(14, 6))
sns.heatmap(df_pivot, annot=False, cmap="Blues", fmt="g", linewidths=0.1, yticklabels=y_l, ax=ax)
def insert_previous_data_to_cols(df, n_hr=0):
"""
Insert columns to indicate the data from the previous hours.
Parameters
----------
df : pandas.DataFrame
The preprocessed sensor data.
n_hr : int
Number of hours that we want to insert the previous sensor data.
Returns
-------
pandas.DataFrame
The transformed sensor data.
"""
# Copy data frame to prevent editing the original one.
df = df.copy(deep=True)
# Add the data from the previous hours.
df_all = []
for h in range(1, n_hr + 1):
# Shift the data frame to get previous data.
df_pre = df.shift(h)
# Edit the name to indicate it is previous data.
# The orginal data frame already has data from the previous 1 hour.
# (as indicated in the preprocessing phase of sensor data)
# So we need to add 1 here.
df_pre.columns += "_pre_" + str(h+1) + "h"
# Add the data to an array for merging.
df_all.append(df_pre)
# Rename the columns in the original data frame.
# The orginal data frame already has data from the previous 1 hour.
# (as indicated in the preprocessing phase of sensor data)
df.columns += "_pre_1h"
# Merge all data.
df_merge = df
for d in df_all:
# The join function merges dataframes by index.
df_merge = df_merge.join(d)
# Delete the first n_hr rows.
# These n_hr rows have no data due to data shifting.
df_merge = df_merge.iloc[n_hr:]
return df_merge
def get_pca_result(x, y, n=3):
"""
Get the result of Principal Component Analysis.
Parameters
----------
x : pandas.DataFrame
The features.
y : pandas.DataFrame
The labels.
n : int
Number of principal components.
Returns
-------
df_pc : pandas.DataFrame
A data frame with information about principal components.
df_r : pandas.DataFrame
A data frame with information about ratios of explained variances.
"""
# Copy the data to prevent editing it.
x, y = deepcopy(x), deepcopy(y)
# Run the principal component analysis.
pca = PCA(n_components=n)
# Compute the eigenvectors, which are the principal components.
pc = pca.fit_transform(x)
columns = ["PC" + str(i) for i in range(1,1+n)]
df_pc = pd.DataFrame(data=pc, columns=columns)
# Set the label for plotting.
df_pc["y"] = y.astype(str)
# Set the marker size for plotting.
df_pc["size"] = 15
# Get eigenvalues (i.e., variances explained by principal components).
r = np.round(pca.explained_variance_ratio_, 3)
df_r = pd.DataFrame({"var":r, "pc":columns})
return df_pc, df_r