"""
Dataframe preprocessing module.
This module provides tools for cleaning and preparing dataframe data,
including missing value imputation, index restoration, and other preprocessing tasks.
"""
import warnings
from datetime import timedelta
from typing import (
TYPE_CHECKING,
Any,
Dict,
List,
Literal,
Optional,
Sequence,
Tuple,
Union,
)
import numpy as np
import pandas as pd
from joblib import Parallel, delayed
from pandas.tseries.offsets import DateOffset
from sklearn.experimental import enable_iterative_imputer # noqa: F401
from sklearn.impute import IterativeImputer, KNNImputer, SimpleImputer
from sklearn.linear_model import BayesianRidge
from sklearn.metrics import silhouette_score
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from sklearn.utils import resample
from frameon.utils.miscellaneous import (
is_categorical_column,
is_text_column,
)
if TYPE_CHECKING: # pragma: no cover
from frameon.core.base import FrameOn
__all__ = ["FrameOnPreproc"]
class FrameOnPreproc:
"""
Class containing methods for Dataframe preprocessing.
"""
def __init__(self, df: "FrameOn"):
self._df = df
[docs]
def impute_missing(
self,
target_cols: Union[str, Sequence[str]],
auxiliary_cols: Union[Literal["all"], str, Sequence[str]],
method: Literal["simple", "knn", "iterative"] = "simple",
strategy: Literal["mean", "median", "most_frequent", "constant"] = "median",
n_neighbors: int = 5,
sample_size: Optional[int] = None,
random_state: int = 42,
standardize: bool = False,
imputer_params: Optional[Dict[str, Any]] = None,
inplace: bool = False,
) -> Union[None, pd.DataFrame]:
"""
Perform missing value imputation on specified numerical columns.
Parameters:
-----------
target_cols : str or list
Numerical columns to impute (must contain missing values)
auxiliary_cols : str or list, default='all'
Columns to use as features for imputation. Can include:
- Numerical columns (used directly)
- Categorical columns (one-hot encoded)
- Datetime columns (feature engineered)
Does not include text columns.
method : {'simple', 'knn', 'iterative'}, default='simple'
Imputation strategy:
- simple: Fast univariate imputation
- knn: Nearest neighbors-based imputation
- iterative: Multivariate imputation using chained equations
strategy : str, default='median'
Strategy for SimpleImputer: ['mean', 'median', 'most_frequent', 'constant']
n_neighbors : int, default=5
Number of neighbors for KNNImputer
sample_size : int, optional
Subsample size for large datasets optimization
random_state : int, default=42
Random seed for reproducibility
standardize : bool, default=False
Whether to standardize features before imputation.
Recommended for knn and iterative methods.
imputer_params : dict, optional
Additional parameters for IterativeImputer:
- estimator: sklearn estimator (default=BayesianRidge())
- max_iter: int (default=10)
- tol: float (default=1e-3)
inplace : bool, default=False
Whether to modify the original DataFrame
Returns:
--------
pd.DataFrame or None
DataFrame with imputed values or None if inplace=True
"""
# Validate not empty DataFrame
if self._df.empty:
raise ValueError("DataFrame is empty.")
# Validate method
valid_methods = {"simple", "knn", "iterative"}
if method not in valid_methods:
raise ValueError(
f"Invalid method '{method}'. Must be one of: {valid_methods}"
)
# Validate sample size
if sample_size and sample_size > len(self._df):
sample_size = len(self._df)
# Valid strategies
valid_strategies = {"mean", "median", "most_frequent", "constant"}
if strategy not in valid_strategies:
raise ValueError(
f"Invalid strategy '{strategy}'. Must be one of: {valid_strategies}"
)
# Validate and prepare input parameters
target_cols = self._validate_target_cols(target_cols)
auxiliary_cols = self._prepare_auxiliary_cols(auxiliary_cols, target_cols)
# Check at least one non-NA value exists in each target column
for col in target_cols:
if self._df[col].notna().sum() == 0:
raise ValueError(
f"Target column '{col}' contains only NA values - nothing to impute from"
)
# Prepare data matrix for imputation
full_data, target_idx = self._prepare_data(
target_cols, auxiliary_cols, method, standardize
)
# Prepare TRAINING data (with sampling if specified)
if sample_size and len(self._df) > sample_size:
train_data = resample(
full_data, n_samples=sample_size, random_state=random_state
)
else:
train_data = full_data.copy()
# Create and configure imputer
imputer = self._create_imputer(
method, strategy, n_neighbors, random_state, imputer_params or {}
)
# Perform imputation
imputer.fit(train_data)
imputed_values = imputer.transform(full_data)
# Merge results back into original DataFrame
result_df = self._merge_results(
imputed_values[:, target_idx], target_cols, full_data.index
)
return self._return_result(result_df, inplace)
[docs]
def restore_full_index(
self,
date_cols: Union[str, List[str]],
group_cols: Union[str, List[str]],
freq: Optional[
Union[
str, timedelta, DateOffset, Dict[str, Union[str, timedelta, DateOffset]]
]
] = None,
fill_value: Optional[Union[str, int, float]] = None,
inplace: bool = False,
) -> Union[None, pd.DataFrame]:
"""
Restores a full index for a DataFrame by filling in missing dates and categories.
This function takes a DataFrame, a date column, and a list of grouping columns.
It creates a full MultiIndex by generating all possible combinations of dates
(within the range of the date column) and unique values of the grouping columns.
Missing values are filled with the specified fill_value.
Parameters:
-----------
date_col : str
The name of the column in `df` that contains the dates.
group_cols : list of str
A list of column names in `df` that are used for grouping.
freq : str, optional
The frequency for the date range. Default is 'ME' (month end).
fill_value : str, int, float, optional
The value to fill missing entries with. Default is 0.
inplace : bool, default=False
Whether to modify the original DataFrame
Returns:
--------
pd.DataFrame or None
DataFrame with imputed values or None if inplace=True
"""
df = self._df
# Convert to lists if single values passed
date_cols = [date_cols] if isinstance(date_cols, str) else date_cols
group_cols = [group_cols] if isinstance(group_cols, str) else group_cols
# Check all columns exist in DataFrame
missing_cols = [col for col in date_cols + group_cols if col not in df.columns]
if missing_cols:
raise ValueError(f"Columns not found in DataFrame: {missing_cols}")
# Convert to lists if single values passed
date_cols = [date_cols] if isinstance(date_cols, str) else date_cols
group_cols = [group_cols] if isinstance(group_cols, str) else group_cols
index_cols = date_cols + group_cols
duplicates = df[index_cols].duplicated(keep=False)
if duplicates.any():
error_msg = (
f"Found {duplicates.sum()} duplicate rows in index columns:\n"
f"Index columns: {index_cols}\n\n"
"Solutions:\n"
"1. Use groupby().agg() to aggregate duplicates\n"
"2. Add unique identifier column\n"
"3. Use drop_duplicates() if appropriate"
)
raise ValueError(error_msg)
# Validate all columns exist
for col in date_cols + group_cols:
if col not in df.columns:
raise ValueError(
f'Date column "{col}" not found for time-based operations'
)
# Generate date ranges for each date column
date_ranges = []
for col in date_cols:
date_ranges.append(pd.date_range(df[col].min(), df[col].max(), freq=freq))
# Create full index
if group_cols:
# Case with grouping columns
full_index = pd.MultiIndex.from_product(
date_ranges + [df[col].unique() for col in group_cols],
names=date_cols + group_cols,
)
else:
# Case with only date columns
if len(date_ranges) == 1:
full_index = date_ranges[0]
else:
full_index = pd.MultiIndex.from_product(date_ranges, names=date_cols)
if len(date_cols) == 1:
full_index.name = date_cols[0]
# Reindex to the full index
result_df = (
df.set_index(date_cols + group_cols)
.reindex(full_index, fill_value=fill_value)
.reset_index()
)
return self._return_result(result_df, inplace)
def _validate_target_cols(
self, target_cols: Union[str, Sequence[str]]
) -> List[str]:
"""Validate and normalize target columns."""
# Convert to list if single column
if isinstance(target_cols, str):
target_cols = [target_cols]
# Check all target columns exist and are numerical
numeric_cols = self._df.select_dtypes(include=np.number).columns
missing = [col for col in target_cols if col not in self._df.columns]
if missing:
raise ValueError(f"Target columns not found in DataFrame: {missing}")
non_numeric = [col for col in target_cols if col not in numeric_cols]
if non_numeric:
raise ValueError(f"Target columns must be numerical: {non_numeric}")
# Check for missing values
if self._df[target_cols].notna().all().all():
warnings.warn("No missing values found in target columns!")
return target_cols
def _prepare_auxiliary_cols(
self, auxiliary_cols: Union[str, Sequence[str]], target_cols: Sequence[str]
) -> List[str]:
"""Prepare and validate auxiliary columns."""
# Handle 'all' case
if auxiliary_cols == "all":
auxiliary_cols = [
col
for col in self._df.columns
if col not in target_cols and not is_text_column(self._df[col])
]
# Convert to list if single column
elif isinstance(auxiliary_cols, str):
auxiliary_cols = [auxiliary_cols]
# Remove any target columns that were accidentally included
auxiliary_cols = [col for col in auxiliary_cols if col not in target_cols]
# Validate columns exist
missing = [col for col in auxiliary_cols if col not in self._df.columns]
if missing:
raise ValueError(f"Auxiliary columns not found in DataFrame: {missing}")
return auxiliary_cols
def _prepare_data(
self,
target_cols: Sequence[str],
auxiliary_cols: Sequence[str],
method: str,
standardize: bool,
) -> Tuple[pd.DataFrame, Union[slice, List[int]]]:
"""
Prepare data matrix for imputation.
Returns:
--------
tuple: (processed_data, target_indices)
"""
# Separate numerical and categorical auxiliary columns
num_cols = [
col
for col in auxiliary_cols
if pd.api.types.is_numeric_dtype(self._df[col])
]
cat_cols = [
col for col in auxiliary_cols if is_categorical_column(self._df[col])
]
# For simple method, only use target columns
if method == "simple":
processed = self._df[target_cols].copy()
target_idx = slice(None) # All columns are targets
else:
# Process numerical features
num_data = (
self._df[num_cols].values if num_cols else np.empty((len(self._df), 0))
)
# Process categorical features with one-hot encoding
if cat_cols:
encoder = OneHotEncoder(handle_unknown="ignore", sparse_output=False)
cat_data = encoder.fit_transform(self._df[cat_cols])
else:
cat_data = np.empty((len(self._df), 0))
# Combine all features
processed = np.hstack(
[
self._df[target_cols].values, # Target columns first
num_data,
cat_data,
]
)
# Target columns are always first in the array
target_idx = slice(0, len(target_cols))
# Standardize if requested
if standardize:
processed = StandardScaler().fit_transform(processed)
# Convert to DataFrame for sampling
processed = pd.DataFrame(processed, index=self._df.index)
return processed, target_idx
def _create_imputer(
self,
method: str,
strategy: str,
n_neighbors: int,
random_state: int,
imputer_params: Dict[str, Any],
):
"""Create and configure the appropriate imputer."""
if method == "simple":
return SimpleImputer(strategy=strategy, **imputer_params)
elif method == "knn":
return KNNImputer(n_neighbors=n_neighbors)
elif method == "iterative":
estimator = imputer_params.get("estimator", BayesianRidge())
return IterativeImputer(
estimator=estimator,
max_iter=imputer_params.get("max_iter", 10),
random_state=random_state,
**{
k: v
for k, v in imputer_params.items()
if k not in ["estimator", "max_iter"]
},
)
raise ValueError(
f"Invalid method: {method}. Choose from ['simple', 'knn', 'iterative']"
)
def _merge_results(
self, imputed_values: np.ndarray, target_cols: Sequence[str], index: pd.Index
) -> pd.DataFrame:
"""
Efficiently merge imputed values back into original DataFrame.
Only updates missing values in target columns while preserving:
- Original data types
- Non-target columns
- Index and column order
"""
# Create copy of original data (shallow copy for efficiency)
result_df = self._df.copy()
# Convert imputed values to DataFrame for alignment
imputed_df = pd.DataFrame(imputed_values, columns=target_cols, index=index)
# Vectorized update of only missing values
for col in target_cols:
mask = self._df[col].isna()
result_df.loc[mask, col] = imputed_df.loc[mask, col]
return result_df
def _return_result(
self, result_df: pd.DataFrame, inplace: bool
) -> Union[None, pd.DataFrame]:
"""Handle in-place modification and return results."""
if inplace:
self._df = result_df
return None
return result_df
def find_optimal_k_for_knn_imputer(
self,
target_cols: Union[str, List[str]],
auxiliary_cols: Union[str, List[str]] = "all",
max_k: int = 15,
n_jobs: int = -1,
metric: str = "nan_euclidean",
) -> Dict[str, Union[int, float]]:
"""
Find optimal number of neighbors for KNNImputer using advanced metrics.
Parameters:
-----------
target_cols : str or list
Numerical columns to impute (must contain missing values)
auxiliary_cols : str or list, default='all'
Columns to use as features for imputation. Can include:
- Numerical columns (used directly)
- Categorical columns (one-hot encoded)
- Datetime columns (feature engineered)
Does not include text columns.
max_k : int, default=15
Maximum number of neighbors to test
n_jobs : int, default=-1
Number of parallel jobs to run
metric : str, default='nan_euclidean'
Distance metric to use ('nan_euclidean', 'nan_manhattan')
Returns:
--------
dict
Contains:
- 'optimal_k': Best K value
- 'best_score': Corresponding score
- 'all_scores': Scores for all tested K values
- 'elbow_point': Alternative K suggestion from elbow method
"""
# Validate and prepare input parameters
target_cols = self._validate_target_cols(target_cols)
auxiliary_cols = self._prepare_auxiliary_cols(auxiliary_cols, target_cols)
# Prepare standardized data
processed, _ = self._prepare_data(
target_cols=target_cols,
auxiliary_cols=auxiliary_cols,
method="knn",
standardize=True,
)
# Initialize results storage
results = {"scores": [], "optimal_k": 2, "best_score": -1, "elbow_point": None}
# Parallel K evaluation
with Parallel(n_jobs=n_jobs) as parallel:
scores = parallel(
delayed(self._evaluate_k)(k, processed, metric)
for k in range(2, max_k + 1)
)
results["scores"] = scores
# Find best K by silhouette
results["best_score"] = max(results["scores"])
results["optimal_k"] = results["scores"].index(results["best_score"]) + 2
# Calculate elbow point
results["elbow_point"] = self._find_elbow_point(results["scores"])
return {
"optimal_k": results["optimal_k"],
"best_score": results["best_score"],
"all_scores": dict(zip(range(2, max_k + 1), results["scores"])),
"elbow_point": results["elbow_point"],
}
def _evaluate_k(
self, k: int, data: Union[pd.DataFrame, np.ndarray], metric: str
) -> Union[int, float]:
"""Evaluate single K value with multiple metrics"""
try:
# Impute with current K
imputer = KNNImputer(n_neighbors=k, metric=metric)
imputed = imputer.fit_transform(data)
# Calculate multiple quality metrics
sil_score = silhouette_score(imputed, metric=metric)
variance = np.nanvar(imputed, axis=0).mean()
# Combined score (adjust weights as needed)
return 0.7 * sil_score + 0.3 * variance
except Exception:
return -1
def _find_elbow_point(self, scores: List[float]) -> int:
"""
Find elbow point in the scores curve using the maximum distance method.
Parameters:
-----------
scores : List[float]
List of evaluation scores for different K values
Returns:
--------
int
The K value at which the elbow occurs (1-based index)
"""
if any(np.isnan(s) for s in scores):
warnings.warn("NaN values found in scores, using default K=2")
return 2
if not scores or len(scores) < 2:
return 2 # Default minimal K value
try:
# Convert to numpy arrays
x = np.arange(len(scores))
y = np.array(scores)
# Get coordinates of the line from first to last point
first_point = np.array([x[0], y[0]])
last_point = np.array([x[-1], y[-1]])
line_vec = last_point - first_point
# Normalize the line vector
line_vec_norm = line_vec / np.linalg.norm(line_vec)
# Vector from first point to each point on the curve
vec_from_first = np.column_stack((x - first_point[0], y - first_point[1]))
# Scalar projection of each point onto the line
scalar_prod = np.sum(vec_from_first * line_vec_norm, axis=1)
# Vector from each point to its projection on the line
vec_to_line = vec_from_first - np.outer(scalar_prod, line_vec_norm)
# Calculate distances and find maximum
distances = np.sqrt(np.sum(vec_to_line**2, axis=1))
elbow_index = np.argmax(distances)
# Return K value (add 2 because K starts at 2)
return elbow_index + 2
except Exception as e:
warnings.warn(f"Elbow point detection failed: {str(e)}. Using default K=2")
return 2