Дрейф данных в ML-моделях: детектирование на Python с примерами кода | AiManual
AiManual Logo Ai / Manual.
06 Янв 2026 Гайд

Ваша ML-модель врёт: как найти и доказать дрейф данных до того, как клиенты заметят

Практическое руководство по обнаружению дрейфа данных и концептов в ML-системах. Статистические тесты, мониторинг распределений и код для продакшена.

Тихий убийца ML-систем: почему дрейф не виден в метриках

Выучили модель на исторических данных. Развернули в продакшн. Первые недели — accuracy стабилен, бизнес доволен. Проходит три месяца. Клиенты начинают жаловаться: "Ваша рекомендательная система предлагает мне зимнюю одежду в июле".

Вы открываете метрики. Accuracy всё ещё 94%. Precision, recall — в норме. Но что-то явно сломалось.

Добро пожаловать в мир дрейфа данных — ситуации, когда распределение входных данных меняется, а метрики модели какое-то время остаются стабильными. Модель продолжает работать, но уже на других данных. Как будто пилот самолёта летит по старым картам, не замечая, что рельеф местности изменился.

Дрейф — это не баг, а фундаментальное свойство реального мира. Пользовательское поведение меняется, экономические условия колеблются, сезоны сменяют друг друга. Если ваша модель этого не учитывает — она обречена на деградацию.

Два типа дрейфа, которые разрушают модели

Прежде чем искать дрейф, нужно понять, что именно искать. Есть два основных типа:

Дрейф данных (Data Drift)

Распределение входных признаков P(X) меняется со временем. Например:

  • Средний возраст пользователей вашего приложения снизился на 5 лет
  • Средняя сумма транзакций выросла в 2 раза из-за инфляции
  • Географическое распределение пользователей сместилось в другой регион

Концепт дрейф (Concept Drift)

Связь между признаками и целевой переменной P(Y|X) меняется. Модель продолжает получать те же данные, но они теперь имеют другое значение:

  • Во время пандемии "частые поездки на такси" перестали означать "богатый клиент", а стали означать "необходимость передвижения"
  • Летом "высокая температура в помещении" — это нормально, зимой — проблема с отоплением
  • После изменения законодательства некоторые транзакции перестали быть мошенническими

Концепт дрейф опаснее. Его сложнее обнаружить, потому что входные данные могут не меняться. Модель продолжает уверенно делать предсказания — просто они стали неверными.

Статистический арсенал: какие тесты использовать и когда

Наивный подход — сравнивать средние значения. "Средний возраст был 35 лет, стал 34.8 — всё в порядке". Так не работает. Нужно сравнивать распределения целиком.

Тип признака Статистический тест Когда использовать
Непрерывный (возраст, сумма) KS-тест (Колмогорова-Смирнова) Сравниваем две выборки, хотим понять, из одного ли они распределения
Категориальный (страна, пол) Chi-squared (χ²) Сравниваем частоты категорий в двух выборках
Любой тип PSI (Population Stability Index) Индустриальный стандарт для мониторинга дрейфа в продакшене
Многомерный MMD (Maximum Mean Discrepancy) Когда нужно сравнить распределения в высокоразмерном пространстве

1 Подготовка данных: что сохранить и как организовать

Первый шаг — смертельная ошибка большинства команд. Они не сохраняют эталонные данные. Вы обучили модель в январе, а в июне хотите проверить дрейф. Какими были данные в январе? Никто не помнит.

Решение простое до безобразия:

import pickle
import pandas as pd
from datetime import datetime

class ReferenceDataKeeper:
    def __init__(self, model_dir):
        self.model_dir = model_dir
        
    def save_reference_data(self, X_train, y_train=None):
        """Сохраняем данные, на которых обучали модель"""
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        
        # Сохраняем признаки
        with open(f"{self.model_dir}/reference_X_{timestamp}.pkl", 'wb') as f:
            pickle.dump(X_train, f)
            
        # Сохраняем метки, если есть
        if y_train is not None:
            with open(f"{self.model_dir}/reference_y_{timestamp}.pkl", 'wb') as f:
                pickle.dump(y_train, f)
                
        # Сохраняем метаданные
        metadata = {
            'timestamp': timestamp,
            'n_samples': len(X_train),
            'features': list(X_train.columns) if hasattr(X_train, 'columns') else [],
            'feature_stats': X_train.describe().to_dict() if hasattr(X_train, 'describe') else {}
        }
        
        with open(f"{self.model_dir}/reference_metadata_{timestamp}.json", 'w') as f:
            import json
            json.dump(metadata, f, indent=2)
        
        return timestamp

# Использование:
keeper = ReferenceDataKeeper("./models/model_v1")
# После обучения модели:
timestamp = keeper.save_reference_data(X_train, y_train)
print(f"Эталонные данные сохранены с меткой: {timestamp}")

Теперь у вас есть снимок данных на момент обучения. Это ваш эталон. Без него все последующие проверки — гадание на кофейной гуще.

2 Реализация PSI — индустриального стандарта

PSI работает так: разбиваем признак на бакеты (bin), считаем процент наблюдений в каждом бакете для эталонных и текущих данных, сравниваем. Формула простая, но эффективная.

import numpy as np
from scipy import stats
import warnings
warnings.filterwarnings('ignore')

def calculate_psi(expected, actual, bucket_type='bins', buckets=10):
    """Рассчитывает PSI для одного признака"""
    
    # Удаляем NaN
    expected = expected[~np.isnan(expected)]
    actual = actual[~np.isnan(actual)]
    
    if len(expected) == 0 or len(actual) == 0:
        return np.nan
    
    # Определяем границы бакетов по эталонным данным
    if bucket_type == 'bins':
        breakpoints = np.percentile(expected, [100/buckets * i for i in range(buckets+1)])
        breakpoints = np.unique(breakpoints)  # Убираем дубликаты
    elif bucket_type == 'quantiles':
        breakpoints = np.percentile(expected, np.linspace(0, 100, buckets+1))
    else:
        raise ValueError("bucket_type должен быть 'bins' или 'quantiles'")
    
    # Добавляем -inf и +inf для крайних бакетов
    breakpoints = np.concatenate([[-np.inf], breakpoints[1:-1], [np.inf]])
    
    # Распределение по бакетам
    expected_hist, _ = np.histogram(expected, bins=breakpoints)
    actual_hist, _ = np.histogram(actual, bins=breakpoints)
    
    # Преобразуем в проценты (добавляем маленькое значение, чтобы избежать деления на 0)
    expected_perc = expected_hist / len(expected)
    actual_perc = actual_hist / len(actual)
    
    # Добавляем эпсилон, чтобы избежать log(0)
    epsilon = 1e-10
    expected_perc = np.where(expected_perc == 0, epsilon, expected_perc)
    actual_perc = np.where(actual_perc == 0, epsilon, actual_perc)
    
    # Рассчитываем PSI
    psi_values = (actual_perc - expected_perc) * np.log(actual_perc / expected_perc)
    psi_total = np.sum(psi_values)
    
    return psi_total

# Интерпретация результатов PSI
def interpret_psi(psi_value):
    """Интерпретируем значение PSI"""
    if psi_value < 0.1:
        return "Нет значимого дрейфа", "green"
    elif psi_value < 0.2:
        return "Умеренный дрейф, требует наблюдения", "yellow"
    else:
        return "Значительный дрейф, требуется действие", "red"

# Пример использования:
expected_age = np.random.normal(35, 10, 1000)  # Эталон: средний возраст 35
actual_age = np.random.normal(40, 10, 1000)    # Текущие данные: средний возраст 40

psi = calculate_psi(expected_age, actual_age)
interpretation, color = interpret_psi(psi)
print(f"PSI: {psi:.4f} - {interpretation}")
💡
PSI имеет интуитивную интерпретацию: 0.1 — порог для предупреждения, 0.2 — порог для срочных действий. В банковской сфере эти пороги могут быть ещё строже — 0.05 и 0.1 соответственно.

3 Детектирование концепт дрейфа: когда модель врёт незаметно

Концепт дрейф — это сложнее. Признаки те же, но их смысл изменился. Как это поймать?

Самый надёжный способ — отслеживать performance модели в реальном времени. Но есть проблема: в продакшене у нас часто нет истинных меток (ground truth). Клиент купил товар — мы узнаем сразу. Клиент НЕ купил товар — мы узнаем... никогда.

Выход — косвенные метрики и статистические уловки:

import pandas as pd
from sklearn.ensemble import IsolationForest
from scipy.stats import ks_2samp

class ConceptDriftDetector:
    def __init__(self, reference_predictions, reference_labels):
        """
        reference_predictions: предсказания модели на эталонных данных
        reference_labels: истинные метки на эталонных данных
        """
        self.reference_predictions = reference_predictions
        self.reference_labels = reference_labels
        
        # Рассчитываем распределение ошибок на эталонных данных
        self.reference_errors = np.abs(reference_predictions - reference_labels)
        
    def detect_via_error_distribution(self, current_predictions, current_labels):
        """Обнаруживаем дрейф через изменение распределения ошибок"""
        if current_labels is None or len(current_labels) < 50:
            return None, "Недостаточно данных для анализа"
            
        current_errors = np.abs(current_predictions - current_labels)
        
        # Сравниваем распределения ошибок с помощью KS-теста
        ks_stat, p_value = ks_2samp(self.reference_errors, current_errors)
        
        return p_value, ks_stat
    
    def detect_via_confidence_shifts(self, current_predictions_proba):
        """
        Для классификации: отслеживаем сдвиги в уверенности модели
        current_predictions_proba: вероятности классов
        """
        # Рассчитываем энтропию предсказаний как меру неопределённости
        def calculate_entropy(probas):
            # Добавляем epsilon, чтобы избежать log(0)
            probas = np.clip(probas, 1e-10, 1 - 1e-10)
            return -np.sum(probas * np.log2(probas), axis=1)
        
        if hasattr(self, 'reference_entropy'):
            current_entropy = calculate_entropy(current_predictions_proba)
            ks_stat, p_value = ks_2samp(self.reference_entropy, current_entropy)
            return p_value
        else:
            # Сохраняем энтропию эталонных данных при первом вызове
            self.reference_entropy = calculate_entropy(self.reference_predictions)
            return None
    
    def detect_via_feature_importance_shift(self, model, X_current):
        """
        Для tree-based моделей: отслеживаем изменение важности признаков
        """
        try:
            # Получаем важность признаков на текущих данных
            # (упрощённо, в реальности нужен перерасчёт)
            current_importance = model.feature_importances_
            
            # Сравниваем с эталонной важностью (должна быть сохранена заранее)
            if hasattr(self, 'reference_importance'):
                # Корреляция важностей признаков
                correlation = np.corrcoef(self.reference_importance, current_importance)[0, 1]
                return correlation
            else:
                self.reference_importance = current_importance
                return None
        except:
            return None

# Пример использования для регрессии:
# (в реальности вам нужны будут истинные метки для текущих данных)
detector = ConceptDriftDetector(reference_preds, reference_labels)
p_value, ks_stat = detector.detect_via_error_distribution(current_preds, current_labels)

if p_value < 0.05:
    print(f"Обнаружен концепт дрейф! p-value: {p_value:.4f}, KS-stat: {ks_stat:.4f}")

Практический пайплайн мониторинга: от кода до алертов

Теперь соберём всё вместе в рабочий пайплайн. Это не академическое упражнение — это код, который можно запустить завтра в продакшене.

import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import json
import logging
from typing import Dict, List, Optional

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class DriftMonitoringPipeline:
    def __init__(self, model_name: str, config_path: str):
        self.model_name = model_name
        self.load_config(config_path)
        self.drift_history = []
        
    def load_config(self, config_path: str):
        """Загружаем конфигурацию мониторинга"""
        with open(config_path, 'r') as f:
            self.config = json.load(f)
        
        # Загружаем эталонные данные
        self.reference_data = pd.read_pickle(self.config['reference_data_path'])
        self.reference_predictions = np.load(self.config['reference_predictions_path'])
        
        # Пороги для алертов
        self.psi_threshold_warning = self.config.get('psi_threshold_warning', 0.1)
        self.psi_threshold_critical = self.config.get('psi_threshold_critical', 0.2)
        
    def monitor_batch(self, current_data: pd.DataFrame, 
                      current_predictions: Optional[np.ndarray] = None,
                      batch_id: Optional[str] = None) -> Dict:
        """Мониторим дрейф для батча данных"""
        
        if batch_id is None:
            batch_id = datetime.now().strftime("%Y%m%d_%H%M%S")
        
        results = {
            'batch_id': batch_id,
            'timestamp': datetime.now().isoformat(),
            'n_samples': len(current_data),
            'features_checked': [],
            'drift_detected': False,
            'alerts': []
        }
        
        # 1. Проверяем дрейф данных для каждого признака
        features_to_check = self.config.get('features_to_monitor', 
                                           self.reference_data.columns.tolist())
        
        for feature in features_to_check:
            if feature in self.reference_data.columns and feature in current_data.columns:
                psi_value = calculate_psi(
                    self.reference_data[feature].values,
                    current_data[feature].values,
                    buckets=self.config.get('psi_buckets', 10)
                )
                
                if not np.isnan(psi_value):
                    results['features_checked'].append({
                        'feature': feature,
                        'psi': float(psi_value),
                        'status': 'critical' if psi_value > self.psi_threshold_critical 
                                 else 'warning' if psi_value > self.psi_threshold_warning 
                                 else 'ok'
                    })
                    
                    if psi_value > self.psi_threshold_warning:
                        results['drift_detected'] = True
                        alert_msg = f"Дрейф данных в признаке {feature}: PSI={psi_value:.3f}"
                        results['alerts'].append({
                            'level': 'critical' if psi_value > self.psi_threshold_critical else 'warning',
                            'message': alert_msg,
                            'feature': feature,
                            'psi_value': float(psi_value)
                        })
        
        # 2. Проверяем концепт дрейф, если есть текущие предсказания
        if current_predictions is not None and 'reference_labels' in self.config:
            reference_labels = np.load(self.config['reference_labels_path'])
            
            # Упрощённая проверка: сравниваем распределение предсказаний
            ks_stat, p_value = ks_2samp(self.reference_predictions, current_predictions)
            
            if p_value < 0.05:
                results['concept_drift'] = {
                    'ks_statistic': float(ks_stat),
                    'p_value': float(p_value),
                    'detected': True
                }
                results['drift_detected'] = True
                results['alerts'].append({
                    'level': 'critical',
                    'message': f"Возможный концепт дрейф: p-value={p_value:.4f}",
                    'test': 'ks_test',
                    'p_value': float(p_value)
                })
        
        # 3. Сохраняем результаты
        self.drift_history.append(results)
        self.save_results(results)
        
        # 4. Отправляем алерты, если нужно
        if results['alerts']:
            self.send_alerts(results['alerts'])
        
        logger.info(f"Мониторинг завершён для батча {batch_id}. "
                   f"Дрейф обнаружен: {results['drift_detected']}. "
                   f"Алертов: {len(results['alerts'])}")
        
        return results
    
    def save_results(self, results: Dict):
        """Сохраняем результаты мониторинга"""
        output_dir = self.config.get('output_dir', './drift_monitoring')
        os.makedirs(output_dir, exist_ok=True)
        
        filename = f"{output_dir}/drift_report_{results['batch_id']}.json"
        with open(filename, 'w') as f:
            json.dump(results, f, indent=2, default=str)
    
    def send_alerts(self, alerts: List[Dict]):
        """Отправляем алерты (заглушка, реализуйте под свою инфраструктуру)"""
        for alert in alerts:
            if alert['level'] == 'critical':
                # Отправляем в Slack/Telegram/Email
                logger.error(f"КРИТИЧЕСКИЙ АЛЕРТ: {alert['message']}")
            elif alert['level'] == 'warning':
                logger.warning(f"ПРЕДУПРЕЖДЕНИЕ: {alert['message']}")
    
    def generate_dashboard_data(self, days: int = 30) -> Dict:
        """Генерируем данные для дашборда мониторинга"""
        cutoff_date = datetime.now() - timedelta(days=days)
        
        recent_reports = [
            r for r in self.drift_history
            if datetime.fromisoformat(r['timestamp'].replace('Z', '')) > cutoff_date
        ]
        
        if not recent_reports:
            return {}
        
        # Агрегируем статистику по признакам
        feature_stats = {}
        for report in recent_reports:
            for feature_check in report['features_checked']:
                feature = feature_check['feature']
                if feature not in feature_stats:
                    feature_stats[feature] = {'psi_values': [], 'alert_count': 0}
                
                feature_stats[feature]['psi_values'].append(feature_check['psi'])
                if feature_check['status'] != 'ok':
                    feature_stats[feature]['alert_count'] += 1
        
        # Рассчитываем средний PSI для каждого признака
        for feature in feature_stats:
            psi_values = feature_stats[feature]['psi_values']
            feature_stats[feature]['avg_psi'] = np.mean(psi_values)
            feature_stats[feature]['max_psi'] = np.max(psi_values)
            feature_stats[feature]['std_psi'] = np.std(psi_values)
        
        return {
            'time_period': f"last_{days}_days",
            'total_batches': len(recent_reports),
            'batches_with_drift': sum(1 for r in recent_reports if r['drift_detected']),
            'total_alerts': sum(len(r['alerts']) for r in recent_reports),
            'feature_statistics': feature_stats,
            'most_drifting_features': sorted(
                [(f, stats['avg_psi']) for f, stats in feature_stats.items()],
                key=lambda x: x[1],
                reverse=True
            )[:10]
        }

# Пример конфигурации (config.json):
config_example = {
    "reference_data_path": "./data/reference/reference_data.pkl",
    "reference_predictions_path": "./data/reference/reference_predictions.npy",
    "reference_labels_path": "./data/reference/reference_labels.npy",
    "psi_threshold_warning": 0.1,
    "psi_threshold_critical": 0.2,
    "psi_buckets": 10,
    "features_to_monitor": ["age", "income", "transaction_amount", "session_duration"],
    "output_dir": "./monitoring_reports",
    "alert_channels": ["slack", "email"]
}

# Запуск пайплайна:
# pipeline = DriftMonitoringPipeline("fraud_detection_v1", "config.json")
# current_batch = pd.read_parquet("./data/current_batch.parquet")
# results = pipeline.monitor_batch(current_batch)

Ошибки, которые совершают все (и как их избежать)

Я видел десятки реализаций мониторинга дрейфа. Почти все совершают одни и те же ошибки.

Ошибка 1: Проверять все признаки одинаково. Не делайте так. Признаки имеют разную важность. Дрейф в ключевом признаке (например, "сумма транзакции" для fraud detection) критичен. Дрейф во вспомогательном признаке ("цвет кнопки, на которую кликнул") — менее важен. Взвешивайте PSI по важности признака.

Ошибка 2: Использовать фиксированные пороги для всех моделей. Порог PSI 0.2 может быть нормальным для рекомендательной системы, но смертельным для медицинской диагностики. Калибруйте пороги на исторических данных: смотрите, при каком PSI начинается деградация метрик.

Ошибка 3: Игнорировать сезонность. Продажи мороженого летом и зимой — это не дрейф, это сезонность. Решение: сравнивайте данные не с общим эталоном, а с эталоном для того же сезона/дня недели/часа.

Ошибка 4: Отправлять алерты без контекста. "PSI для признака age = 0.15" — бесполезно. Добавляйте: "PSI для признака age = 0.15 (было: среднее 35, стало: среднее 42, +20%)".

Когда переобучать модель: экономика против точности

Обнаружили дрейф. Что дальше? Автоматически переобучать модель? Ни в коем случае.

Переобучение — это дорого. Нужно:

  1. Собрать новые размеченные данные
  2. Переобучить модель (GPU время, специалисты)
  3. Протестировать новую модель
  4. Развернуть в продакшен (риск)

Решение — многоуровневая стратегия:

class RetrainingDecisionMaker:
    def __init__(self, model, cost_parameters):
        self.model = model
        self.cost_parameters = cost_parameters  # Стоимость переобучения, ущерб от ошибок и т.д.
        
    def should_retrain(self, drift_report, performance_metrics=None) -> Dict:
        """Принимаем решение о переобучении"""
        
        decision = {
            'retrain': False,
            'confidence': 0.0,
            'reason': '',
            'recommended_action': 'monitor'
        }
        
        # 1. Критический дрейф в ключевых признаках
        critical_features = self.cost_parameters.get('critical_features', [])
        critical_drift = False
        
        for feature_check in drift_report.get('features_checked', []):
            if (feature_check['feature'] in critical_features and 
                feature_check.get('status') == 'critical'):
                critical_drift = True
                decision['reason'] += f"Критический дрейф в {feature_check['feature']}. "
                
        # 2. Снижение производительности
        if performance_metrics:
            accuracy_drop = performance_metrics.get('accuracy_drop', 0)
            if accuracy_drop > 0.05:  # Падение accuracy на 5%
                decision['reason'] += f"Падение accuracy на {accuracy_drop*100:.1f}%. "
                critical_drift = True
        
        # 3. Экономический расчёт
        if critical_drift:
            # Оцениваем ожидаемый ущерб от дрейфа
            expected_damage = self.estimate_damage(drift_report)
            retraining_cost = self.cost_parameters['retraining_cost']
            
            if expected_damage > retraining_cost * 2:  # Ущерб в 2 раза больше стоимости переобучения
                decision['retrain'] = True
                decision['confidence'] = 0.8
                decision['recommended_action'] = 'retrain_immediately'
            elif expected_damage > retraining_cost:
                decision['retrain'] = True
                decision['confidence'] = 0.6
                decision['recommended_action'] = 'schedule_retraining'
            else:
                decision['reason'] += "Ущерб от дрейфа меньше стоимости переобучения. "
                decision['recommended_action'] = 'monitor_closely'
        
        # 4. Концепт дрейф с подтверждённым снижением качества
        if drift_report.get('concept_drift', {}).get('detected', False):
            if drift_report['concept_drift']['p_value'] < 0.01:
                decision['retrain'] = True
                decision['confidence'] = 0.9
                decision['reason'] += "Сильный концепт дрейф (p < 0.01). "
                decision['recommended_action'] = 'retrain_immediately'
        
        return decision
    
    def estimate_damage(self, drift_report) -> float:
        """Оцениваем финансовый ущерб от дрейфа"""
        # Упрощённая модель
        base_damage = self.cost_parameters.get('base_damage_per_error', 10.0)
        expected_errors = 0
        
        # Чем сильнее дрейф, тем больше ошибок
        for feature_check in drift_report.get('features_checked', []):
            psi = feature_check.get('psi', 0)
            feature_importance = self.cost_parameters.get('feature_importance', {}).get(
                feature_check['feature'], 1.0
            )
            expected_errors += psi * feature_importance * 100  # Коэффициенты нужно калибровать
        
        return base_damage * expected_errors

Теперь у вас не просто "есть дрейф", а "дрейф в признаке X с ожидаемым ущербом $Y, рекомендуется действие Z".

Интеграция с MLOps: куда встроить мониторинг

Мониторинг дрейфа — не отдельный скрипт, а часть MLOps-пайплайна. Вот как это выглядит в реальной системе:

  1. Сбор данных: Все предсказания модели логируются вместе с входными данными и timestamp. Используйте интеграцию ML-моделей в продакшн как основу.
  2. Эталонные снимки: При каждом переобучении автоматически сохраняются эталонные данные.
  3. Периодический мониторинг: Каждые N часов/дней запускается джоба мониторинга.
  4. Дашборды: Результаты визуализируются в Grafana или аналоги.
  5. Автоматические действия: При критическом дрейфе — алерты, при подтверждённом снижении качества — автоматическое переобучение.

Если вы только начинаете строить ML-инфраструктуру, посмотрите как построить ML-песочницу — это хорошая отправная точка.

Самый опасный миф о дрейфе

"Если нет дрейфа, модель можно не трогать".

Ложь. Даже без дрейфа данных и концептов, модель деградирует. Почему? Потому что конкуренты улучшают свои модели. Потому что пользователи становятся умнее. Потому что появляются новые паттерны, которых не было в обучающих данных.

Мониторинг дрейфа — это необходимый минимум. Но недостаточный. Нужно также:

  • Следить за performance относительно бейзлайнов (например, простой эвристики)
  • Проводить A/B тесты новых моделей постоянно
  • Собирать feedback loop: почему пользователи отклоняют рекомендации?

Дрейф — это симптом. Болезнь — это отставание от реального мира. Лечение — постоянное обновление, а не только реагирование на критические изменения.

💡
Начните с самого простого: сохраните эталонные данные сегодня. Через месяц у вас будет что сравнивать. Через три месяца вы поймёте, насколько стабильна ваша модель. Через полгода — будете знать её сезонные паттерны. Главное — начать.

Код из этой статьи — рабочий. Возьмите, адаптируйте под свои данные. Но помните: идеального детектора дрейфа не существует. Всегда будет компромисс между false positives (ложные срабатывания) и false negatives (пропущенные дрейфы).

Ваша задача — не построить идеальную систему, а построить систему, которая даёт достаточно сигналов, чтобы принимать обоснованные решения. Иногда ложное срабатывание лучше, чем пропущенный дрейф, который приведёт к потере клиентов.

И последнее: не пытайтесь мониторить всё сразу. Начните с 3-5 ключевых признаков. Когда наладите процесс — расширяйтесь. Как и в любом инжиниринге, итеративность побеждает perfectionism.