Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions examples/huggingface/text_embedding_monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
from ml3_drift.huggingface.drift_detection_pipeline import (
HuggingFaceDriftDetectionPipeline,
)
from ml3_drift.monitoring.multivariate.bonferroni import BonferroniCorrectionAlgorithm
from ml3_drift.monitoring.univariate.continuous.ks import KSAlgorithm
from ml3_drift.monitoring.algorithms.batch.bonferroni import (
BonferroniCorrectionAlgorithm,
)
from ml3_drift.monitoring.algorithms.batch.ks import KSAlgorithm
from ml3_drift.callbacks.base import logger_callback


Expand Down
4 changes: 3 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@ dependencies = [

sklearn = ["scikit-learn>=1.6.1"]

huggingface = ["scipy>=1.15.2", "transformers[torch]>=4.52.3"]
huggingface = ["transformers[torch]>=4.52.3"]

polars = ["polars>=1.31.0"]

pandas = ["pandas>=2.2.3"]

river = ["river>=0.22.0"]


# -------------------------------------------------

Expand Down
14 changes: 10 additions & 4 deletions src/ml3_drift/analysis/analyzer/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@
from typing_extensions import TypeIs

from ml3_drift.analysis.report import Report
from ml3_drift.monitoring.base import MonitoringAlgorithm
from ml3_drift.monitoring.multivariate.bonferroni import BonferroniCorrectionAlgorithm
from ml3_drift.monitoring.univariate.continuous.ks import KSAlgorithm
from ml3_drift.monitoring.univariate.discrete.chi_square import (
from ml3_drift.monitoring.algorithms.batch.bonferroni import (
BonferroniCorrectionAlgorithm,
)
from ml3_drift.monitoring.algorithms.batch.ks import KSAlgorithm
from ml3_drift.monitoring.algorithms.batch.chi_square import (
ChiSquareAlgorithm,
)
from ml3_drift.monitoring.base.base import MonitoringAlgorithm

if TYPE_CHECKING:
import pandas as pd
Expand Down Expand Up @@ -158,6 +160,10 @@ def analyze(
else:
categorical_columns_ids = []

if not continuous_columns_ids and not categorical_columns_ids:
raise ValueError(
"At least one of continuous_columns or categorical_columns must be provided."
Comment thread
GiovanniGiacometti marked this conversation as resolved.
)
# Input and target in canonical form
array_X = self._to_numpy(X)

Expand Down
37 changes: 23 additions & 14 deletions src/ml3_drift/analysis/analyzer/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from ml3_drift.analysis.analyzer.base import DataDriftAnalyzer
from ml3_drift.analysis.report import Report
from ml3_drift.monitoring.base import MonitoringAlgorithm
from ml3_drift.monitoring.base.base import MonitoringAlgorithm

from ml3_drift.models.monitoring import (
MonitoringOutput,
Expand Down Expand Up @@ -74,7 +74,7 @@ def _single_scan_data(
y_categorical: bool,
first_batch_indexes: tuple[int, int],
second_batch_indexes: tuple[int, int],
) -> tuple[MonitoringOutput, MonitoringOutput]:
) -> tuple[MonitoringOutput | None, MonitoringOutput | None]:
"""
Inner helper method that performs a single scan of two batches
"""
Expand All @@ -95,16 +95,20 @@ def _single_scan_data(
y_categorical,
second_batch_indexes,
)

cont_algorithm = deepcopy(self.continuous_monitoring_algorithm).fit(
first_batch_cont
)
cat_algorithm = deepcopy(self.categorical_monitoring_algorithm).fit(
first_batch_cat
)

cont_output = cont_algorithm.detect(second_batch_cont)[0]
cat_output = cat_algorithm.detect(second_batch_cat)[0]
if len(continuous_columns_ids) > 0:
cont_algorithm = deepcopy(self.continuous_monitoring_algorithm).fit(
first_batch_cont
)
cont_output = cont_algorithm.detect(second_batch_cont)[0]
else:
cont_output = None
if len(categorical_columns_ids) > 0:
cat_algorithm = deepcopy(self.categorical_monitoring_algorithm).fit(
first_batch_cat
)
cat_output = cat_algorithm.detect(second_batch_cat)[0]
else:
cat_output = None
Comment thread
GiovanniGiacometti marked this conversation as resolved.

return cont_output, cat_output

Expand Down Expand Up @@ -149,7 +153,9 @@ def _scan_data(
next_batch_indexes,
)

if cont_output.drift_detected | cat_output.drift_detected:
if (cont_output is not None and cont_output.drift_detected) | (
cat_output is not None and cat_output.drift_detected
):
# if a drift is detected then, we close the current batch and open a new one
merged_batches.append(
(current_batch_start, current_batch_indexes[1] - 1)
Expand Down Expand Up @@ -181,7 +187,10 @@ def _scan_data(

# if no drift is detected the two batches are considered to belong to the same distribution
# and are added to the same distribution list
if not (cont_output.drift_detected | cat_output.drift_detected):
if not (
(cont_output is not None and cont_output.drift_detected)
| (cat_output is not None and cat_output.drift_detected)
):
same_distributions[pair[0]].append(pair[1])

return Report(concepts=merged_batches, same_distributions=same_distributions)
2 changes: 1 addition & 1 deletion src/ml3_drift/analysis/analyzer/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from ml3_drift.analysis.analyzer.base import DataDriftAnalyzer
from ml3_drift.analysis.report import Report
from ml3_drift.monitoring.base import MonitoringAlgorithm
from ml3_drift.monitoring.base.base import MonitoringAlgorithm


class StreamDataDriftAnalyzer(DataDriftAnalyzer):
Expand Down
2 changes: 1 addition & 1 deletion src/ml3_drift/huggingface/drift_detection_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import torch
from transformers import Pipeline, pipeline

from ml3_drift.monitoring.base import MonitoringAlgorithm
from ml3_drift.monitoring.base.base import MonitoringAlgorithm


class HuggingFaceDriftDetectionPipeline:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,18 @@
MonitoringAlgorithmSpecs,
MonitoringOutput,
)
from ml3_drift.monitoring.base import MonitoringAlgorithm
from ml3_drift.monitoring.univariate.base import UnivariateMonitoringAlgorithm
from ml3_drift.monitoring.base.base_multivariate import MultivariateMonitoringAlgorithm
from ml3_drift.monitoring.base.base_univariate import UnivariateMonitoringAlgorithm
from ml3_drift.monitoring.base.batch_monitoring_algorithm import (
BatchMonitoringAlgorithm,
)

T = TypeVar("T", bound=UnivariateMonitoringAlgorithm)


class BonferroniCorrectionAlgorithm(MonitoringAlgorithm):
class BonferroniCorrectionAlgorithm(
BatchMonitoringAlgorithm, MultivariateMonitoringAlgorithm
):
"""
Extension of p-value based univariate algorithms with Bonferroni correction
to handle multivariate data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,13 @@
MonitoringAlgorithmSpecs,
MonitoringOutput,
)
from ml3_drift.monitoring.univariate.base import UnivariateMonitoringAlgorithm
from ml3_drift.monitoring.base.base_univariate import UnivariateMonitoringAlgorithm
from ml3_drift.monitoring.base.batch_monitoring_algorithm import (
BatchMonitoringAlgorithm,
)


class ChiSquareAlgorithm(UnivariateMonitoringAlgorithm):
class ChiSquareAlgorithm(BatchMonitoringAlgorithm, UnivariateMonitoringAlgorithm):
"""Monitoring algorithm based on the Chi Square statistic test.

Parameters
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,13 @@
MonitoringAlgorithmSpecs,
MonitoringOutput,
)
from ml3_drift.monitoring.univariate.base import UnivariateMonitoringAlgorithm
from ml3_drift.monitoring.base.base_univariate import UnivariateMonitoringAlgorithm
from ml3_drift.monitoring.base.batch_monitoring_algorithm import (
BatchMonitoringAlgorithm,
)


class KSAlgorithm(UnivariateMonitoringAlgorithm):
class KSAlgorithm(BatchMonitoringAlgorithm, UnivariateMonitoringAlgorithm):
"""Monitoring algorithm based on the Kolmogorov-Smirnov statistic test.

Parameters
Expand Down
77 changes: 77 additions & 0 deletions src/ml3_drift/monitoring/algorithms/online/adwin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
from typing import Callable

import numpy as np
from ml3_drift.enums.monitoring import DataDimension, DataType, MonitoringType
from ml3_drift.models.monitoring import (
DriftInfo,
MonitoringAlgorithmSpecs,
MonitoringOutput,
)
from ml3_drift.monitoring.base.base_univariate import UnivariateMonitoringAlgorithm
from ml3_drift.monitoring.base.online_monitorning_algorithm import (
OnlineMonitorningAlgorithm,
)

RIVER = True
try:
from river.drift.adwin import ADWIN as RiverADWIN
except ModuleNotFoundError:
RIVER = False


class ADWIN(OnlineMonitorningAlgorithm, UnivariateMonitoringAlgorithm):
@classmethod
def specs(cls) -> MonitoringAlgorithmSpecs:
return MonitoringAlgorithmSpecs(
data_dimension=DataDimension.MULTIVARIATE,
data_type=DataType.MIX,
monitoring_type=MonitoringType.ONLINE,
)

def __init__(
self,
callbacks: list[Callable[[DriftInfo | None], None]] | None = None,
p_value: float = 0.002,
clock: float = 32,
max_buckets: int = 5,
min_window_length: int = 5,
grace_period: int = 10,
*args,
**kwargs,
) -> None:
if not RIVER:
raise ModuleNotFoundError(
"River library is required for ADWIN algorithm. Please install it using pip install/ uv add ml3-drift[river]"
)
self.p_value = p_value
self.clock = clock
self.max_buckets = max_buckets
self.min_window_length = min_window_length
self.grace_period = grace_period
self._args = args
self._kwargs = kwargs
super().__init__(
comparison_size=1, callbacks=callbacks
) # since we add only one sample per step and river handles building the window internally we set comparison_size to 1

def _reset_internal_parameters(self):
self.drift_agent = RiverADWIN(
delta=self.p_value,
clock=self.clock,
max_buckets=self.max_buckets,
min_window_length=self.min_window_length,
grace_period=self.grace_period,
*self._args,
**self._kwargs,
)

def _fit(self, X: np.ndarray):
"""Fit the KSWIN algorithm to the data."""
self._validate(X)
self.reset_internal_parameters()
self.is_fitted = True

def _detect(self):
self.drift_agent.update(self.comparison_data)
drift_detected = self.drift_agent.drift_detected
return MonitoringOutput(drift_detected=drift_detected, drift_info=None)
74 changes: 74 additions & 0 deletions src/ml3_drift/monitoring/algorithms/online/kswin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
from typing import Callable

import numpy as np
from ml3_drift.enums.monitoring import DataDimension, DataType, MonitoringType
from ml3_drift.models.monitoring import (
DriftInfo,
MonitoringAlgorithmSpecs,
MonitoringOutput,
)
from ml3_drift.monitoring.base.base_univariate import UnivariateMonitoringAlgorithm
from ml3_drift.monitoring.base.online_monitorning_algorithm import (
OnlineMonitorningAlgorithm,
)

RIVER = True
try:
from river.drift.kswin import KSWIN as RiverKSWIN
except ModuleNotFoundError:
RIVER = False


class KSWIN(OnlineMonitorningAlgorithm, UnivariateMonitoringAlgorithm):
@classmethod
def specs(cls) -> MonitoringAlgorithmSpecs:
return MonitoringAlgorithmSpecs(
data_dimension=DataDimension.MULTIVARIATE,
data_type=DataType.MIX,
monitoring_type=MonitoringType.ONLINE,
)

def __init__(
self,
callbacks: list[Callable[[DriftInfo | None], None]] | None = None,
p_value: float = 0.00,
window_size: int = 100,
stat_size: int = 30,
seed: int | None = None,
*args,
**kwargs,
) -> None:
if not RIVER:
raise ModuleNotFoundError(
"River library is required for KSWIN algorithm. Please install it using pip install/ uv add ml3-drift[river]"
)
self.p_value = p_value
self.window_size = window_size
self.stat_size = stat_size
self.seed = seed
self._args = args
self._kwargs = kwargs
super().__init__(
comparison_size=1, callbacks=callbacks
) # since we add only one sample per step and river handles building the window internally we set comparison_size to 1

def _reset_internal_parameters(self):
self.drift_agent = RiverKSWIN(
alpha=self.p_value,
window_size=self.window_size,
stat_size=self.stat_size,
seed=self.seed,
*self._args,
**self._kwargs,
)

def _fit(self, X: np.ndarray):
"""Fit the KSWIN algorithm to the data."""
self._validate(X)
self.reset_internal_parameters()
self.is_fitted = True

def _detect(self):
self.drift_agent.update(self.comparison_data)
drift_detected = self.drift_agent.drift_detected
return MonitoringOutput(drift_detected=drift_detected, drift_info=None)
Loading