-
Notifications
You must be signed in to change notification settings - Fork 3
Dev drift analysis #7
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
24 commits
Select commit
Hold shift + click to select a range
9506a8f
draft structure of DataDriftAnalyzer
alelavml3 d1e5ef0
first ks implementation
alelavml3 028c872
ks after test
alelavml3 a026027
just test rule accept testWorkers parameter
alelavml3 3d5106e
new drift info
alelavml3 ea30f43
testing for analyzer
alelavml3 70cb647
formatting with bonferroni, new monitoring specs
alelavml3 8023a91
abstract class for data batch analyzer
alelavml3 945127b
detection is performed in online or offline according to the monitori…
alelavml3 27c4733
fix test import but still wrong because it is streaming
alelavml3 5fca60f
batch drift analyzer
alelavml3 df80ef2
doc strings
alelavml3 768b681
test support with polars and pandas
alelavml3 549a515
Handle new extras in tests; linting according to previous py versions
GiovanniGiacometti 7d1e5db
Parametrize for tests done in a loop
GiovanniGiacometti fce49d1
Refactor scan method and tests of batch-analyzer
GiovanniGiacometti fddd086
Improvements to Monitoring Algorithm base class
GiovanniGiacometti 72dd083
Default algorithms builders
GiovanniGiacometti 0b84b97
Refactor classes to accept an instance of algorithms rather than buil…
GiovanniGiacometti 765eb4f
HuggingFace integration uses monitoring modules
GiovanniGiacometti 1eaad45
wip in sklearn general detector
alelavml3 25869a0
fix tests
alelavml3 7cbaf46
sklearn detector uses standard monitoring algorithms
alelavml3 c69b39d
better comment
alelavml3 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
File renamed without changes.
File renamed without changes.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,178 @@ | ||
| from abc import ABC, abstractmethod | ||
| import numpy as np | ||
| from typing import TYPE_CHECKING, Union | ||
| from typing_extensions import TypeIs | ||
|
|
||
| from ml3_drift.analysis.report import Report | ||
| from ml3_drift.monitoring.base import MonitoringAlgorithm | ||
| from ml3_drift.monitoring.multivariate.bonferroni import BonferroniCorrectionAlgorithm | ||
| from ml3_drift.monitoring.univariate.continuous.ks import KSAlgorithm | ||
| from ml3_drift.monitoring.univariate.discrete.chi_square import ( | ||
| ChiSquareAlgorithm, | ||
| ) | ||
|
|
||
| if TYPE_CHECKING: | ||
| import pandas as pd | ||
| import polars as pl | ||
|
|
||
| POLARS = True | ||
| try: | ||
| import polars as pl | ||
| except ModuleNotFoundError: | ||
| POLARS = False | ||
|
|
||
|
|
||
| PANDAS = True | ||
| try: | ||
| import pandas as pd | ||
| except ModuleNotFoundError: | ||
| PANDAS = False | ||
|
|
||
|
|
||
| class DataDriftAnalyzer(ABC): | ||
| """ | ||
| Analyze a dataset identifying the sequence of distributions due to data drifts. | ||
|
|
||
| Parameters | ||
| ---------- | ||
| continuous_monitoring_algorithm: MonitoringAlgorithm | None | ||
| Algorithm used to monitor continuous data. If None, a default algorithm is used. | ||
| categorical_monitoring_algorithm: MonitoringAlgorithm | None | ||
| Algorithm used to monitor categorical data. If None, a default algorithm is used. | ||
| """ | ||
|
|
||
| def __init__( | ||
| self, | ||
| continuous_monitoring_algorithm: MonitoringAlgorithm | None = None, | ||
| categorical_monitoring_algorithm: MonitoringAlgorithm | None = None, | ||
| ): | ||
| # We use default algorithms if None is provided. | ||
| if continuous_monitoring_algorithm is None: | ||
| continuous_monitoring_algorithm = BonferroniCorrectionAlgorithm( | ||
| algorithm=KSAlgorithm(), | ||
| ) | ||
| if categorical_monitoring_algorithm is None: | ||
| categorical_monitoring_algorithm = BonferroniCorrectionAlgorithm( | ||
| algorithm=ChiSquareAlgorithm(), | ||
| ) | ||
|
|
||
| self.continuous_monitoring_algorithm = continuous_monitoring_algorithm | ||
| self.categorical_monitoring_algorithm = categorical_monitoring_algorithm | ||
|
|
||
| def _is_list_str(self, columns: list[str] | list[int]) -> TypeIs[list[str]]: | ||
| """Verify if the input variable is a list of str in any element""" | ||
|
|
||
| return all(isinstance(elem, str) for elem in columns) | ||
|
|
||
| def _to_index( | ||
| self, | ||
| X: Union[np.ndarray, "pd.DataFrame", "pl.DataFrame"], | ||
| columns: list[str] | list[int] | None, | ||
| ) -> list[int]: | ||
| """Translate the list of columns in list of indices. | ||
|
|
||
| If columns is None then all the indexes are returned. | ||
| If columns is list[int] then it is directly returned. | ||
| If columns is list[str] then the indexes are retrieved from column names, | ||
| in this case X must be a DataFrame.""" | ||
|
|
||
| if columns is None: | ||
| return list(range(X.shape[0])) | ||
|
|
||
| if self._is_list_str(columns): | ||
| if POLARS and isinstance(X, pl.DataFrame): | ||
| return [i for (i, c) in enumerate(X.columns) if c in columns] | ||
| elif PANDAS and isinstance(X, pd.DataFrame): | ||
| return [i for (i, c) in enumerate(X.columns) if c in columns] | ||
| else: | ||
| raise ValueError( | ||
| f"Type not valid, expecting polars DataFrame or pandas DataFrame when columns has string values. Got {type(X)}" | ||
| ) | ||
| return columns | ||
|
|
||
| def _to_numpy( | ||
| self, X: Union[np.ndarray, "pd.DataFrame", "pl.DataFrame"] | ||
| ) -> np.ndarray: | ||
| """Transform input data into numpy array""" | ||
|
|
||
| if POLARS and isinstance(X, pl.DataFrame): | ||
| return X.to_numpy() | ||
| elif PANDAS and isinstance(X, pd.DataFrame): | ||
| return X.to_numpy() | ||
| elif isinstance(X, np.ndarray): | ||
| return X | ||
| else: | ||
| raise ValueError( | ||
| f"Type not valid, expecting numpy array, polars DataFrame or pandas DataFrame. Got {type(X)}" | ||
| ) | ||
|
|
||
| @abstractmethod | ||
| def _scan_data( | ||
| self, | ||
| X: np.ndarray, | ||
| y: np.ndarray | None, | ||
| continuous_columns_ids: list[int], | ||
| categorical_columns_ids: list[int], | ||
| y_categorical: bool, | ||
| ) -> Report: | ||
| """Scan the data to identify different data partitions according to monitoring algorithm.""" | ||
|
|
||
| def analyze( | ||
| self, | ||
| X: Union[np.ndarray, "pd.DataFrame", "pl.DataFrame"], | ||
| y: Union[None, np.ndarray, "pd.DataFrame", "pl.DataFrame"], | ||
| continuous_columns: list[str] | list[int] | None, | ||
| categorical_columns: list[str] | list[int] | None, | ||
| y_categorical: bool, | ||
| ) -> Report: | ||
| """Analyze the data to split them into different distribution according to drift detectors. | ||
|
|
||
| If target is provided then concept drift is used as split criterion, otherwise, it uses input drift. | ||
|
|
||
| Parameters | ||
| ---------- | ||
| X: input data. Can be numpy array, pandas dataframe or polars dataframe | ||
| y: target data. It is optional and can be numpy array, pandas dataframe or polars dataframe | ||
| continuous_columns: if not None it is the indices or names of the columns that are continuous | ||
| categorical_columns: if not None it is the indices or names of the columns that are categorical | ||
| y_categorical: if True, then the target is categorical, otherwise it is considered as continuous | ||
|
|
||
| Output | ||
| ------ | ||
| Report object containing information about identified data groups | ||
| """ | ||
| # Shape check | ||
| if y is not None and X.shape[0] != y.shape[0]: | ||
| raise ValueError( | ||
| f"When target y is not None it must have the same rows of input X. Got X: {X.shape} and y: {y.shape}" | ||
| ) | ||
|
|
||
| # Continuous and categorical columns to canonical form | ||
| if continuous_columns is not None: | ||
| continuous_columns_ids = self._to_index(X, continuous_columns) | ||
| else: | ||
| continuous_columns_ids = [] | ||
|
|
||
| if categorical_columns is not None: | ||
| categorical_columns_ids = self._to_index(X, categorical_columns) | ||
|
GiovanniGiacometti marked this conversation as resolved.
|
||
| else: | ||
| categorical_columns_ids = [] | ||
|
GiovanniGiacometti marked this conversation as resolved.
|
||
|
|
||
| # Input and target in canonical form | ||
| array_X = self._to_numpy(X) | ||
|
|
||
| if y is not None: | ||
| array_y = self._to_numpy(y) | ||
| else: | ||
| array_y = None | ||
|
|
||
| # Data analysis | ||
| report = self._scan_data( | ||
| array_X, | ||
| array_y, | ||
| continuous_columns_ids, | ||
| categorical_columns_ids, | ||
| y_categorical, | ||
| ) | ||
|
|
||
| return report | ||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.