"""
Series preprocessing and transformation module.
This module provides tools for preprocessing series data including:
- Categorization and binning
- Smoothing and imputation
- Numeric transformations
- String normalization
- Missing value handling
"""
import itertools
import re
import warnings
from typing import (
TYPE_CHECKING,
Callable,
Dict,
List,
Literal,
Optional,
Union,
)
import numpy as np
import pandas as pd
import plotly.express as px
from IPython.display import display
from plotly.subplots import make_subplots
from frameon.utils.plotting import CustomFigure
if TYPE_CHECKING: # pragma: no cover
from frameon.core.base import SeriesOn
__all__ = ["SeriesOnPreproc"]
class SeriesOnPreproc:
"""
Class containing methods for Series preprocessing.
"""
def __init__(self, series: "SeriesOn"):
self._series = series
[docs]
def to_categorical(
self,
method: Literal[
"equal_intervals", "quantiles", "custom_bins", "clustering", "rules"
] = "rules",
labels: Optional[List[str]] = None,
n_categories: Optional[int] = None,
bins: Optional[List[Union[float, int]]] = None,
right: bool = True,
fill_na_value: Optional[str] = None,
quantiles: Optional[List[float]] = [0, 0.25, 0.75, 1],
rules: Optional[
Dict[str, Union[Callable[[pd.Series], pd.Series], Literal["default"]]]
] = None,
ordered: bool = False,
as_category: bool = True,
show_value_counts: bool = True,
default_label: Optional[str] = None,
) -> pd.Series:
"""
Convert numerical series to categorical using specified method.
Parameters:
-----------
method : str, optional (default="rules")
Method for categorization. Options:
- "equal_intervals": equal width intervals
- "quantiles": equal frequency intervals
- "custom_bins": use custom bin edges
- "clustering": use clustering algorithm (k-means)
- "rules": use custom rules defined by lambda functions
labels : list of str, optional
Labels for categories. Length should be (n_categories) for equal_intervals/quantiles,
or (len(bins)-1) for custom_bins.
n_categories : int, optional
Number of categories to create (for equal_intervals/quantiles/clustering methods).
bins : list of float/int, optional
Bin edges for "custom_bins" method. Should be monotonically increasing.
right : bool, optional (default=True)
For interval-based methods, indicates whether bins include the right edge.
fill_na_value : str, optional
Explicit NA fill value (overrides automatic defaults if specified)
If None and "default" exists in rules, will use that label
quantiles : list of float, optional
Specific quantiles to use for "quantiles" method (e.g., [0, 0.25, 0.5, 0.75, 1.0]).
rules : dict, optional
For "rules" method - dictionary where:
- keys are category labels
- values can be either:
* lambda functions that take the series and return boolean Series
* special string "default" to mark this category as default
Advanced default handling:
1. If a value is "default", this label will be used for:
- default_label (if parameter not explicitly set)
- fill_na_value (if parameter not explicitly set)
2. Explicit parameters have higher priority than "default" in rules
3. If no default specified anywhere, "Unknown" will be used
Examples:
Automatic default from rules
{
"High": lambda x: x > 90,
"Low": lambda x: x < 10,
"Other": "default" # Auto-used for default_label and fill_na_value
}
Mixed with explicit parameters (explicit has priority)
{
"Valid": lambda x: x > 0,
"AutoDefault": "default" # Ignored due to explicit default_label
}
to_categorical(..., default_label="ManualDefault")
ordered : bool, optional (default=False)
Whether to create ordered categorical (respecting labels order)
as_category : bool, optional (default=True)
Whether to convert result to pandas.Categorical
show_value_counts : bool, optional (default=True)
Whether to display value counts of the resulting categories
default_label : str, optional
Explicit default label (overrides "default" in rules if specified)
Returns:
--------
pd.Series
Categorical series with the same index as input.
"""
series = self._series
if series.empty:
raise ValueError("Series is empty")
auto_default = None
if method == "rules":
if not rules:
raise ValueError(
"For 'rules' method, not empty'rules' dictionary must be provided"
)
auto_default = next(
(
label
for label, rule in rules.items()
if isinstance(rule, str) and rule == "default"
),
None,
)
if method == "equal_intervals":
if n_categories is None:
n_categories = len(labels) if labels is not None else 5
if bins is not None:
warnings.warn(
"'bins' parameter is ignored for 'equal_intervals' method"
)
min_val = series.min()
max_val = series.max()
epsilon = np.finfo(float).eps * 10
if pd.api.types.is_integer_dtype(series):
epsilon = 1
bins = np.linspace(min_val - epsilon, max_val + epsilon, n_categories + 1)
result = pd.cut(series, bins=bins, labels=labels, right=right)
elif method == "quantiles":
if quantiles is not None:
quantiles = sorted(set(quantiles))
if quantiles[0] != 0 or quantiles[-1] != 1:
raise ValueError("Quantiles must start with 0 and end with 1")
result = pd.qcut(
series.rank(method="first", na_option="keep"),
q=quantiles,
labels=labels,
duplicates="drop",
)
else:
if n_categories is None:
n_categories = len(labels) if labels is not None else 5
if series.nunique() < n_categories:
warnings.warn(
f"Number of unique values ({series.nunique()}) is less than n_categories ({n_categories})"
)
result = pd.qcut(
series.rank(method="first", na_option="keep"),
q=n_categories,
labels=labels,
duplicates="drop",
)
elif method == "custom_bins":
if bins is None:
raise ValueError(
"For 'custom_bins' method, 'bins' parameter must be provided"
)
if labels and len(labels) != len(bins) - 1:
raise ValueError("Length of labels must be equal to len(bins) - 1")
result = pd.cut(series, bins=bins, labels=labels, right=right)
elif method == "clustering":
if n_categories is None:
n_categories = len(labels) if labels is not None else 5
na_mask = series.isna()
if na_mask.any():
warnings.warn(f"Dropping {na_mask.sum()} NA values for clustering")
clean_series = series.dropna()
if labels:
if len(labels) != n_categories + 1:
raise ValueError(
f"With NA values, number of labels ({len(labels)}) "
f"must match n_categories + 1 ({n_categories + 1})"
)
else:
clean_series = series.copy()
if labels and len(labels) != n_categories:
raise ValueError(
f"Number of labels ({len(labels)}) must match n_categories ({n_categories})"
)
from sklearn.cluster import KMeans
kmeans = KMeans(n_clusters=n_categories, random_state=42)
clusters = kmeans.fit_predict(clean_series.values.reshape(-1, 1))
result = pd.Series(index=series.index, dtype="object")
result[clean_series.index] = clusters
if labels:
label_map = {i: labels[i] for i in range(n_categories)}
if na_mask.any():
label_map[-1] = labels[-1]
result = result.map(label_map)
elif method == "rules":
effective_default = (
default_label if default_label is not None else auto_default
)
effective_fill_na = (
fill_na_value if fill_na_value is not None else effective_default
)
filtered_rules = {
label: rule
for label, rule in rules.items()
if not (isinstance(rule, str) and rule == "default")
}
if not filtered_rules:
raise ValueError("Must provide at least one non-default rule")
processed_rules = {}
for label, rule in filtered_rules.items():
if isinstance(rule, pd.Series):
if not pd.api.types.is_bool_dtype(rule):
raise ValueError(f"Rule '{label}' should be boolean Series")
processed_rules[label] = rule
elif callable(rule):
processed_rules[label] = rule(series)
else:
raise ValueError(
f"Rule for {label} should be a function, boolean Series or default"
)
conditions = list(processed_rules.values())
choices = list(processed_rules.keys())
if fill_na_value is not None:
na_mask = series.isna()
conditions.append(na_mask)
choices.append(fill_na_value)
result = pd.Series(
np.select(
condlist=conditions,
choicelist=choices,
default=(
effective_default
if effective_default is not None
else "Unknown"
),
),
index=series.index,
)
else:
raise ValueError(
f"Unknown method: {method}. Available methods: 'equal_intervals', 'quantiles', "
"'custom_bins', 'clustering', 'rules'"
)
# Convert to categorical if requested
if as_category:
result = result.astype("category")
# Set order if requested and labels are provided
if ordered and labels is not None:
result = result.cat.set_categories(labels, ordered=True)
elif ordered and method == "rules" and rules is not None:
result = result.cat.set_categories(list(rules.keys()), ordered=True)
# Handle NA values
effective_fill_na = fill_na_value if fill_na_value is not None else auto_default
if effective_fill_na is not None and result.isna().any():
if as_category:
result = result.cat.add_categories([effective_fill_na]).fillna(
effective_fill_na
)
else:
result = result.fillna(effective_fill_na)
# Show value counts if requested
if show_value_counts:
display(
result.value_counts(dropna=False).to_frame("Count").rename_axis(None)
)
return result
def smooth_time_series(
self,
alpha: float = 0.3,
method: Literal[
"exponential", "moving_avg", "double", "triple", "median"
] = "exponential",
window: Optional[int] = None,
iterations: int = 1,
inplace: bool = False,
adjust_for_seasonality: bool = False,
seasonality_period: Optional[int] = None,
robust: bool = False,
min_periods: int = 1,
) -> Union[pd.Series, None]:
"""
Advanced time series smoothing with multiple methodological approaches.
Parameters:
-----------
alpha : float, optional (default=0.3)
Smoothing factor between 0 and 1.
Higher values preserve more original signal (less smoothing).
Typical range: 0.05-0.5 for most applications.
method : str, optional (default='exponential')
Smoothing algorithm selection:
- 'exponential': Basic exponential smoothing
- Best for: General purpose smoothing, real-time applications
- Formula: x[t] = alpha*x[t] + (1-alpha)*x[t-1]
- Pros: Simple, efficient, maintains recent trends
- Cons: Lags behind sudden changes
- When to use: Default choice for most non-seasonal data
- 'moving_avg': Hybrid moving average + exponential
- Best for: Noisy data with stable underlying pattern
- Pros: Reduces high-frequency noise effectively
- Cons: Can oversmooth sudden changes
- When to use: Sensor data, measurement smoothing
- 'double': Second-order exponential smoothing
- Best for: Data with trends but no seasonality
- Pros: Captures trend direction better than basic
- Cons: More sensitive to parameter tuning
- When to use: Economic indicators, trend analysis
- 'triple': Triple exponential (Holt-Winters) smoothing
- Best for: Data with both trends and seasonality
- Pros: Handles complex patterns well
- Cons: Computationally heavier, needs more data
- When to use: Sales forecasting, seasonal metrics
- 'median': Robust median-based smoothing
- Best for: Noisy data with outliers
- Pros: Resistant to extreme values
- Cons: Can create stair-step artifacts
- When to use: Sensor data with spikes, anomaly detection
window : int, optional
Rolling window size for moving_avg/median methods.
If None, auto-calculates as 5% of series length.
iterations : int, optional (default=1)
Number of smoothing passes (1-3 typically sufficient).
inplace : bool, optional (default=False)
Whether to modify the original series.
adjust_for_seasonality : bool, optional (default=False)
Auto-detect and adjust seasonal patterns.
seasonality_period : int, optional
Manual override for seasonal cycle length.
robust : bool, optional (default=False)
Use median instead of mean for noise resistance.
min_periods : int, optional (default=1)
Minimum observations required in each window.
Returns:
--------
Union[pd.Series, None]
Smoothed series with same index as input, or None if inplace=True
"""
series = self._series.copy()
# Auto-detect seasonality if requested
if adjust_for_seasonality and seasonality_period is None:
seasonality_period = self._detect_seasonality(series)
# Choose smoothing method
if method == "exponential":
result = self._exponential_smoothing(series, alpha, iterations)
elif method == "moving_avg":
result = self._moving_avg_smoothing(
series, alpha, window, iterations, robust, min_periods
)
elif method == "double":
result = self._double_exponential_smoothing(series, alpha, iterations)
elif method == "triple":
result = self._triple_exponential_smoothing(
series, alpha, seasonality_period, iterations
)
elif method == "median":
result = self._median_smoothing(series, window or len(series) // 10)
else:
raise ValueError(
"Invalid method. Choose: 'exponential', 'moving_avg', 'double', 'triple', 'median'"
)
if inplace:
self._series = result
# Update parent DataFrame if this series came from one
if (
hasattr(self._series, "_parent_df")
and self._series._parent_df is not None
):
parent = self._series._parent_df
col_name = self._series.name
parent[col_name] = result
return
return result
def _exponential_smoothing(
self, s: pd.Series, alpha: float, iterations: int
) -> pd.Series:
"""Enhanced exponential smoothing with edge handling"""
smoothed = s.copy()
for _ in range(iterations):
smoothed = alpha * smoothed + (1 - alpha) * smoothed.shift(
1, fill_value=smoothed.median()
)
return smoothed
def _double_exponential_smoothing(
self, s: pd.Series, alpha: float, iterations: int
) -> pd.Series:
"""Second-order smoothing for trended data"""
s1 = self._exponential_smoothing(s, alpha, iterations)
s2 = self._exponential_smoothing(s1, alpha, iterations)
return 2 * s1 - s2
def _moving_avg_smoothing(
self,
s: pd.Series,
alpha: float,
window: int,
iterations: int,
robust: bool,
min_periods: int,
) -> pd.Series:
"""Improved moving average with robustness options"""
if not window:
window = max(3, len(s) // 20)
if robust:
ma = s.rolling(window=window, center=True, min_periods=min_periods).median()
else:
ma = s.rolling(window=window, center=True, min_periods=min_periods).mean()
return self._exponential_smoothing(ma, alpha, iterations)
def _triple_exponential_smoothing(
self, s: pd.Series, alpha: float, seasonality: int, iterations: int
) -> pd.Series:
"""Holt-Winters inspired seasonal smoothing"""
if seasonality is None:
seasonality = 1
result = s.copy()
for _ in range(iterations):
# Level
level = alpha * (result - result.shift(seasonality)) + (1 - alpha) * result
# Trend
trend = alpha * (level - level.shift(1)) + (1 - alpha) * level.diff().mean()
# Seasonality
seasonal = (s - level).rolling(seasonality, center=True).mean()
result = level + trend + seasonal
return result
def _median_smoothing(self, s: pd.Series, window: int) -> pd.Series:
"""Robust median-based smoothing"""
return s.rolling(window=window, center=True, min_periods=1).median()
def _detect_seasonality(self, s: pd.Series, max_lag: int = 100) -> Optional[int]:
"""Auto-detect seasonality period using ACF"""
from statsmodels.tsa.stattools import acf
if len(s) < 10:
return None
max_lag = min(max_lag, len(s) // 2)
acf_values = acf(s.dropna(), nlags=max_lag)
# Find peaks by comparing with neighbors
peaks = []
for i in range(1, len(acf_values) - 1):
if acf_values[i] > acf_values[i - 1] and acf_values[i] > acf_values[i + 1]:
peaks.append(i)
return peaks[0] if len(peaks) > 0 else None
def _apply_transform(self, series: pd.Series, method: str, **kwargs) -> pd.Series:
"""Core transformation logic"""
eps = kwargs.get("eps", 1e-6)
if method in ["boxcox", "yeojohnson"] and series.isna().any():
raise ValueError(
f"The series contains NaN values which are not allowed for the {method} transformation."
)
if method == "log":
shift = kwargs.get("shift", 1)
if (series + shift <= 0).any():
shift = abs(series.min()) + eps
return np.log(series + shift)
elif method == "boxcox":
from scipy.stats import boxcox
shift = abs(series.min()) + eps if (series <= 0).any() else 0
display(series + shift)
transformed, _ = boxcox(series + shift, lmbda=kwargs.get("lmbda"))
return pd.Series(transformed, index=series.index)
elif method == "yeojohnson":
from scipy.stats import yeojohnson
transformed, _ = yeojohnson(series)
return pd.Series(transformed, index=series.index)
elif method == "sqrt":
shift = kwargs.get("shift", 0)
return np.sqrt(series + shift)
elif method == "reciprocal":
return 1 / (series + eps)
elif method == "zscore":
return (series - series.mean()) / series.std()
elif method == "robust":
iqr = series.quantile(0.75) - series.quantile(0.25)
return (series - series.median()) / (iqr + eps)
elif method == "quantile":
from scipy.stats import norm, uniform
ranks = series.rank(pct=True)
dist = kwargs.get("dist", "uniform")
return pd.Series(
norm.ppf(ranks) if dist == "normal" else uniform.ppf(ranks),
index=series.index,
)
elif method == "custom":
if "func" not in kwargs:
raise ValueError("Must provide 'func' parameter for custom transform")
return series.apply(kwargs["func"])
else:
raise ValueError(f"Unknown method: {method}")
def _plot_transform_comparison(
self, original: pd.Series, transformed: pd.Series, method: str
):
"""Interactive before/after visualization using Plotly"""
# Create figures with histograms and boxplots
labels = dict(x="Value")
fig_original = px.histogram(x=original, marginal="box", nbins=50, labels=labels)
fig_transformed = px.histogram(
x=transformed, marginal="box", nbins=50, labels=labels
)
# Create 2x2 subplot grid
fig_new = make_subplots(
rows=2,
cols=2,
row_heights=[0.1, 0.9],
vertical_spacing=0.05,
horizontal_spacing=0.07,
subplot_titles=(
"Original Boxplot",
"Transformed Boxplot",
"Original Histogram",
"Transformed Histogram",
),
)
# Add original plot traces
for trace in fig_original.data:
if trace.type == "box":
fig_new.add_trace(trace, row=1, col=1)
fig_new.update_xaxes(
showticklabels=False,
showline=False,
ticks="",
showgrid=True,
row=1,
col=1,
)
fig_new.update_yaxes(visible=False, row=1, col=1)
else:
trace.bingroup = None
fig_new.add_trace(trace, row=2, col=1)
# Add transformed plot traces
for trace in fig_transformed.data:
if trace.type == "box":
fig_new.add_trace(trace, row=1, col=2)
fig_new.update_xaxes(
showticklabels=False,
showline=False,
ticks="",
showgrid=True,
row=1,
col=2,
)
fig_new.update_yaxes(visible=False, row=1, col=2)
else:
trace.bingroup = None
fig_new.add_trace(trace, row=2, col=2)
# Style adjustments
fig_new.update_traces(
marker_line_color="white",
marker_line_width=0.3,
selector=dict(type="histogram"),
)
# Update layout with titles and labels
fig_new.update_layout(
title_text=f"Transformation: {method}",
margin=dict(l=50, r=50, b=50, t=70),
width=800,
height=400,
showlegend=False,
)
# Add axis labels
fig_new.update_xaxes(title_text="Value", row=2, col=1)
fig_new.update_xaxes(title_text="Value", row=2, col=2)
fig_new.update_yaxes(title_text="Count", row=2, col=1)
# Add skewness and kurtosis annotations
annotations = [
dict(
x=0.25,
y=1.01,
xref="paper",
yref="paper",
text=f"Original: Skew = {original.skew():.2f}, Kurtosis = {original.kurtosis():.2f}",
showarrow=False,
font=dict(size=12),
),
dict(
x=0.75,
y=1.01,
xref="paper",
yref="paper",
text=f"Transformed: Skew = {transformed.skew():.2f}, Kurtosis = {transformed.kurtosis():.2f}",
showarrow=False,
font=dict(size=12),
),
]
fig_new.update_layout(annotations=annotations)
CustomFigure(fig_new).show()
[docs]
def normalize_string_series(
self,
symbols: Optional[List[str]] = None,
case_format: Literal["title", "lower", "upper", "sentence", "none"] = "title",
remove_accents: bool = True,
replace_symbols_with: str = " ",
custom_replacements: Optional[Dict[str, str]] = None,
inplace: bool = False,
) -> Union[pd.Series, None]:
"""
Normalize a pandas Series of strings with comprehensive cleaning and standardization options.
Performs multiple text normalization operations including:
- Whitespace normalization (trimming, reducing multiple spaces)
- Symbol removal/replacement
- Case conversion
- Accent/diacritic removal
- Custom character replacements
Parameters:
-----------
symbols : list of str, optional
Symbols to remove/replace. Defaults to common punctuation.
case_format : str, optional (default='title')
Case conversion mode. Options:
- 'title': Title Case (default)
- 'lower': lowercase
- 'upper': UPPERCASE
- 'sentence': Sentence case
- 'none': No case conversion
remove_accents : bool, optional (default=True)
Whether to remove diacritics/accents
replace_symbols_with : str, optional (default=' ')
What to replace symbols with
custom_replacements : dict, optional
Custom character replacements mapping
inplace : bool, optional (default=False)
Modify series directly if True
Returns:
--------
pd.Series
Normalized string Series with same index as input
Raises:
-------
ValueError
If input is not a pandas Series or contains non-string values
ValueError
If invalid case_format is specified
"""
column = self._series
# Input validation
if not pd.api.types.is_string_dtype(column.dropna()):
raise ValueError("Series must contain strings")
# Default symbols if not provided
if symbols is None:
symbols = [
"_",
".",
",",
"«",
"»",
"(",
")",
'"',
"'",
"`",
"!",
"?",
"-",
"—",
"–",
]
# Preserve original categorical dtype if present
is_column_category = isinstance(column.dtype, pd.CategoricalDtype)
# Custom replacements
if custom_replacements:
for old, new in custom_replacements.items():
column = column.str.replace(re.escape(old), new, regex=True)
# Symbol replacement
if symbols:
symbols_pattern = "|".join(map(re.escape, symbols))
column = column.str.replace(
symbols_pattern, replace_symbols_with, regex=True
)
# Whitespace normalization
res = column.str.strip().str.replace(
r"\s+", " ", regex=True
) # Collapse multiple spaces
# Case conversion
case_format = case_format.lower()
if case_format == "title":
res = res.str.title()
elif case_format == "lower":
res = res.str.lower()
elif case_format == "upper":
res = res.str.upper()
elif case_format == "sentence":
res = res.str.capitalize()
elif case_format != "none":
raise ValueError(
f"Invalid case_format: {case_format}. Must be 'title', 'lower', 'upper', 'sentence', or 'none'"
)
# Remove accents/diacritics
if remove_accents:
res = (
res.str.normalize("NFKD")
.str.encode("ascii", errors="ignore")
.str.decode("utf-8")
)
# Restore categorical dtype if original was categorical
if is_column_category:
res = res.astype("category")
if inplace:
self._series = res
# Update parent DataFrame if this series came from one
if (
hasattr(self._series, "_parent_df")
and self._series._parent_df is not None
):
parent = self._series._parent_df
col_name = self._series.name
parent[col_name] = res
return
return res
[docs]
def fill_missing_by_category(
self,
category_columns: Union[str, List[str]],
strategy: Literal["simple", "hierarchical"] = "simple",
func: Union[str, Callable] = "median",
minimal_group_size: int = 5,
fill_unfilled: Union[str, float, None] = "global",
inplace: bool = False,
) -> Optional[pd.Series]:
"""
Fill missing values using category-based strategies
Parameters:
-----------
category_columns : str or list
Column name(s) to group by for calculating fill values
strategy : {'simple', 'hierarchical'}, optional (default='simple')
Filling strategy:
- 'simple': Fill using exact category groups
- 'hierarchical': Try broader category combinations
func : str or callable, optional (default="median")
Aggregation function for valid groups:
- "median", "mean", "max", "min", "mode"
- Custom function that reduces a Series
minimal_group_size : int, optional (default=5)
Minimum non-NA values required to use group statistic
fill_unfilled : str, float or None, optional (default="global")
Strategy for groups with insufficient data:
- "global": Use overall statistic
- numeric: Use specified constant value
- None: Leave as NA
inplace : bool, optional (default=False)
Modify the series in-place instead of returning a copy
Returns:
--------
pd.Series or None
Filled series unless inplace=True
"""
# Validate inputs
self._validate_inputs(category_columns, strategy, fill_unfilled)
# Get working copies
df, target = self._get_data_objects(inplace)
category_columns = self._normalize_categories(category_columns)
# Check for missing categories
self._check_missing_categories(df, category_columns)
# Get aggregation function
agg_func = self._resolve_agg_func(func)
# Main filling logic
if strategy == "simple":
filled = self._simple_strategy(
df, target, category_columns, agg_func, minimal_group_size
)
else:
filled = self._hierarchical_strategy(
df, target, category_columns, agg_func, minimal_group_size
)
# Handle remaining missing values
filled = self._handle_remaining_nas(filled, df[target], fill_unfilled, agg_func)
return self._return_result(filled, inplace)
def _validate_inputs(self, categories, strategy, fill_unfilled):
"""Validate all input parameters"""
valid_strategies = ["simple", "hierarchical"]
if strategy not in valid_strategies:
raise ValueError(f"Invalid strategy. Choose from {valid_strategies}")
if fill_unfilled not in ["global", None] and not isinstance(
fill_unfilled, (int, float)
):
raise TypeError("fill_unfilled must be 'global', None, or numeric value")
def _get_data_objects(self, inplace: bool):
"""Get DataFrame and series name"""
if self._series.parent_df is None:
raise ValueError("Series must belong to a DataFrame")
df = self._series.parent_df
target = self._series.name
return df, target
def _normalize_categories(self, categories):
"""Convert to list if single column name"""
return [categories] if isinstance(categories, str) else categories
def _check_missing_categories(self, df, categories):
"""Check for NaN in categorical columns"""
missing = df[categories].isna()
if missing.any().any():
bad_cols = missing.any()[missing.any()].index.tolist()
raise ValueError(
f"Missing values in categorical columns: {bad_cols}. "
"Handle missing categories before filling."
)
def _resolve_agg_func(self, func):
"""Get appropriate aggregation function"""
func_map = {
"median": pd.Series.median,
"mean": pd.Series.mean,
"max": pd.Series.max,
"min": pd.Series.min,
"mode": lambda x: x.mode()[0] if not x.mode().empty else np.nan,
}
if isinstance(func, str):
if func not in func_map:
raise ValueError(
f"Invalid function: {func}. Choose from {list(func_map)}"
)
return func_map[func]
if callable(func):
return func
raise TypeError("func must be string or callable")
def _simple_strategy(self, df, target, categories, agg_func, min_size):
"""Simple grouping strategy"""
groups = df.groupby(categories, observed=True)[target]
group_values = groups.transform(
lambda x: agg_func(x) if x.count() >= min_size else np.nan
)
return df[target].fillna(group_values)
def _hierarchical_strategy(self, df, target, categories, agg_func, min_size):
"""Hierarchical filling strategy"""
filled = df[target].copy()
remaining_na = filled.isna()
display(filled)
# Try different category combinations from specific to general
for level in range(len(categories), 0, -1):
for cols in itertools.combinations(categories, level):
if not remaining_na.any():
return filled
# Fill with current combination
temp_filled = self._simple_strategy(
df, target, list(cols), agg_func, min_size
)
filled.update(temp_filled[remaining_na])
print(level, cols)
display(filled)
remaining_na = filled.isna()
return filled
def _handle_remaining_nas(self, filled, original, fill_unfilled, agg_func):
"""Apply fill_unfilled strategy"""
na_mask = filled.isna()
if not na_mask.any() or fill_unfilled is None:
return filled
if fill_unfilled == "global":
fill_value = agg_func(original.dropna())
else:
fill_value = fill_unfilled
filled[na_mask] = fill_value
return filled
def _return_result(self, filled, inplace):
"""Handle in-place modification"""
if inplace:
self._series = filled
# Update parent DataFrame if this series came from one
if (
hasattr(self._series, "_parent_df")
and self._series._parent_df is not None
):
parent = self._series._parent_df
col_name = self._series.name
parent[col_name] = filled
return None
return filled
[docs]
def impute_missing(
self,
auxiliary_cols: Union[str, List[str]] = "all",
method: Literal["simple", "knn", "iterative"] = "simple",
strategy: Literal["mean", "median", "most_frequent", "constant"] = "median",
n_neighbors: int = 5,
sample_size: Optional[int] = None,
random_state: int = 42,
standardize: bool = False,
imputer_params: Optional[Dict] = None,
inplace: bool = False,
) -> Optional[pd.Series]:
"""
Perform missing value imputation on specified numerical columns.
Parameters:
-----------
target_cols : str or list
Numerical columns to impute (must contain missing values)
auxiliary_cols : str or list, default='all'
Columns to use as features for imputation. Can include:
- Numerical columns (used directly)
- Categorical columns (one-hot encoded)
- Datetime columns (feature engineered)
Does not include text columns.
method : {'simple', 'knn', 'iterative'}, default='simple'
Imputation strategy:
- simple: Fast univariate imputation
- knn: Nearest neighbors-based imputation
- iterative: Multivariate imputation using chained equations
strategy : str, default='median'
Strategy for SimpleImputer: ['mean', 'median', 'most_frequent', 'constant']
n_neighbors : int, default=5
Number of neighbors for KNNImputer
sample_size : int, optional
Subsample size for large datasets optimization
random_state : int, default=42
Random seed for reproducibility
standardize : bool, default=False
Whether to standardize features before imputation.
Recommended for knn and iterative methods.
imputer_params : dict, optional
Additional parameters for IterativeImputer:
- estimator: sklearn estimator (default=BayesianRidge())
- max_iter: int (default=10)
- tol: float (default=1e-3)
inplace : bool, default=False
Whether to modify the original DataFrame
Returns:
--------
pd.DataFrame or None
DataFrame with imputed values or None if inplace=True
"""
series_name = self._series.name
if self._series.parent_df is None:
raise ValueError("Series must have a parent DataFrame")
filled = self._series.parent_df.preproc.impute_missing(
target_cols=series_name,
auxiliary_cols=auxiliary_cols,
method=method,
strategy=strategy,
n_neighbors=n_neighbors,
sample_size=sample_size,
random_state=random_state,
standardize=standardize,
imputer_params=imputer_params,
inplace=inplace,
)
# Handle in-place modification
if inplace:
self._series = filled[series_name]
return None
return filled[series_name]
[docs]
def calc_target_category_share(
self,
target_category: Union[str, int, float],
group_columns: List[str],
resample_freq: str = "ME",
fill_missing_periods: bool = True,
min_group_size: int = 1,
) -> pd.DataFrame:
"""
Calculate the proportional share of a target category within grouped data,
with support for time-based resampling and comprehensive data validation.
This function:
1. Validates input data and parameters
2. Calculates the percentage share of a specified category
3. Supports both regular grouping and time-based resampling
4. Handles edge cases and provides meaningful error messages
Parameters:
-----------
target_category : str, int, or float
The specific category value to calculate the share for
group_columns : List[str]
List of columns to group by
resample_freq : str, optional
Pandas frequency string for time resampling (default 'ME' for month-end)
Only used if a datetime column is present in group_columns
Common options: 'D' (daily), 'W' (weekly), 'ME' (monthly), 'QE' (quarterly)
fill_missing_periods : bool, optional
Whether to fill missing time periods with 0 values (default True)
Only applies when using time-based grouping
min_group_size : int, optional
Minimum number of observations required per group (default 1)
Groups with fewer observations will be assigned NaN
Returns:
--------
pd.DataFrame
DataFrame containing the calculated shares with columns:
- All grouping columns
- 'target_share': The percentage share of the target category (0-1)
- 'total_count': The total observations per group (optional)
Raises:
-------
ValueError
If input validation fails (missing columns, invalid types, etc.)
"""
# ======================
# Input Validation
# ======================
series = self._series
series_name = series.name
df = series.parent_df
if not group_columns:
raise ValueError("group_columns must be define")
group_columns = (
[group_columns] if isinstance(group_columns, str) else group_columns
)
if series_name in group_columns:
raise ValueError("Current column should not be in group_columns")
if len(series) == 0:
raise ValueError("Series is empty")
if df is None:
raise ValueError("Series must belong to a DataFrame")
# Validate category column
if series.nunique() == 0:
raise ValueError("Current column has no unique values")
# Validate target category exists
if target_category not in series.unique():
raise ValueError(
f"Target category '{target_category}' not found in current column"
)
# Validate group columns
missing_group_cols = [col for col in group_columns if col not in df.columns]
if missing_group_cols:
raise ValueError(
f"Group columns not found in DataFrame: {missing_group_cols}"
)
# Check for datetime columns
datetime_cols = [
col
for col in group_columns
if pd.api.types.is_datetime64_any_dtype(df[col])
]
if len(datetime_cols) > 1:
raise ValueError("Only one datetime column allowed in group_columns")
time_column = datetime_cols[0] if datetime_cols else None
# Check for missing values
cols_to_check = [series_name] + group_columns
for col in cols_to_check:
if df[col].isna().any():
raise ValueError(f"Missing values found in column: '{col}'")
# ======================
# Data Preparation
# ======================
# Create working copy
df_work = df[cols_to_check].copy()
# Create target indicator
df_work["is_target"] = (df_work[series_name] == target_category).astype(int)
# ======================
# Grouping Logic
# ======================
# Prepare grouping columns
regular_group_cols = [col for col in group_columns if col != time_column]
# Handle time-based grouping
if time_column:
grouper = [
pd.Grouper(key=time_column, freq=resample_freq)
] + regular_group_cols
else:
grouper = regular_group_cols
# Calculate shares
result = df_work.groupby(grouper, observed=True, as_index=False).agg(
target_share=("is_target", "mean"), total_count=("is_target", "count")
)
# Filter small groups
if min_group_size > 1:
result.loc[result["total_count"] < min_group_size, "target_share"] = np.nan
# Fill missing time periods if requested
if time_column and fill_missing_periods and regular_group_cols:
# Create complete date range
date_range = pd.date_range(
start=result[time_column].min(),
end=result[time_column].max(),
freq=resample_freq,
)
# Create full multi-index
full_index = pd.MultiIndex.from_product(
[date_range, result[regular_group_cols[0]].unique()],
names=[time_column, regular_group_cols[0]],
)
# Reindex and fill missing
result = (
result.set_index([time_column, regular_group_cols[0]])
.reindex(full_index, fill_value=np.nan)
.reset_index()
)
return result.drop("total_count", axis=1)
[docs]
def check_group_counts(
self,
category_columns: Union[str, List[str]],
threshold_counts: List[int] = [5, 10, 20, 30, 40, 50],
return_report: bool = False,
) -> Union[dict, None]:
"""
Analyze group statistics to assess viability for missing value imputation.
Provides detailed metrics about group sizes and missing value distribution
to help determine appropriate imputation strategy parameters.
Parameters:
-----------
category_columns : str or list
Column name(s) used for grouping
threshold_counts : list of int, optional (default=[5, 10, 20, 30, 40, 50])
List of thresholds to evaluate group sizes against
return_report : bool, optional (default=False)
Whether to return metrics as a dictionary
If False, prints summary to stdout
Returns:
--------
Union[dict, None]
Dictionary with metrics if return_report=True, otherwise None
"""
# Validate inputs
series = self._series
value_column = series.name
df = series.parent_df
if df is None:
raise ValueError("Series must belong to a DataFrame")
if isinstance(category_columns, str):
category_columns = [category_columns]
missing_cols = [
col for col in category_columns + [value_column] if col not in df.columns
]
if missing_cols:
raise ValueError(f"Columns not found in DataFrame: {missing_cols}")
# Calculate basic group statistics
group_stats = df.groupby(category_columns, observed=False)[value_column].agg(
count="count", missing_count=lambda x: x.isna().sum()
)
# Calculate metrics
metrics = {
"groups_total": len(group_stats),
"groups_with_missing": (group_stats["missing_count"] > 0).mean(),
"missing_values_total": group_stats["missing_count"].sum(),
"groups_all_missing": (group_stats["count"] == 0).mean(),
"missing_in_complete_groups": group_stats.loc[
group_stats["count"] > 0, "missing_count"
].sum(),
"group_size_stats": {
"mean": group_stats["count"].mean(),
"median": group_stats["count"].median(),
"min": group_stats["count"].min(),
"max": group_stats["count"].max(),
"std": group_stats["count"].std(),
},
"missing_distribution": {
"groups_with_1_missing": (group_stats["missing_count"] == 1).mean(),
"groups_with_2-5_missing": (
(group_stats["missing_count"] >= 2)
& (group_stats["missing_count"] <= 5)
).mean(),
"groups_with_5+_missing": (group_stats["missing_count"] > 5).mean(),
},
"threshold_stats": {},
}
# Calculate threshold statistics
for threshold in sorted(threshold_counts):
valid_groups = group_stats[group_stats["missing_count"] > 0]
threshold_pct = (valid_groups["count"] >= threshold).mean()
metrics["threshold_stats"][f"{threshold}"] = threshold_pct
# Generate report
self._print_group_report(metrics, category_columns, value_column)
if return_report:
return metrics
def _print_group_report(
self, metrics: dict, category_columns: List[str], value_column: str
) -> None:
"""Print formatted group analysis report"""
print(f"\n{' Group Analysis Report ':=^80}")
print(f"Grouping columns: {', '.join(category_columns)}")
print(f"Value column: {value_column}\n")
print(f"{'Total groups:':<40} {metrics['groups_total']:,}")
print(
f"{'Groups with missing values:':<40} {metrics['groups_with_missing']:.1%}"
)
print(
f"{'Groups with ALL values missing:':<40} {metrics['groups_all_missing']:.1%}"
)
print(f"{'Total missing values:':<40} {metrics['missing_values_total']:,}")
print(
f"{'Missing in non-empty groups:':<40} {metrics['missing_in_complete_groups']:,}\n"
)
print(f"{' Group Size Statistics ':-^80}")
stats = metrics["group_size_stats"]
print(f"{'Mean group size:':<30} {stats['mean']:.1f}")
print(f"{'Median group size:':<30} {stats['median']:.1f}")
print(f"{'Minimum group size:':<30} {stats['min']:,}")
print(f"{'Maximum group size:':<30} {stats['max']:,}")
print(f"{'Standard deviation:':<30} {stats['std']:.1f}\n")
print(f"{' Missing Value Distribution ':-^80}")
dist = metrics["missing_distribution"]
print(
f"{'Groups with 1 missing value:':<30} {dist['groups_with_1_missing']:.1%}"
)
print(
f"{'Groups with 2-5 missing values:':<30} {dist['groups_with_2-5_missing']:.1%}"
)
print(
f"{'Groups with 5+ missing values:':<30} {dist['groups_with_5+_missing']:.1%}\n"
)
print(f"{' Threshold Analysis (ontly groups with missings)':-^80}")
for threshold, pct in metrics["threshold_stats"].items():
print(f"{f'Groups with {threshold}+ elements:':<30} {pct:.1%}")
print("=" * 80)