Когда 420 ядер простаивают: почему ваш ML inference тормозит
Представьте картину: у вас 1000 ML моделей, 100 миллионов пользователей и кластер Databricks на 420 ядер. Запускаете инференс - и он выполняется 24 часа. CPU utilization 15%. Деньги горят, время утекает, менеджеры нервничают.
Я столкнулся с этим на проекте кредитного скоринга. Ежедневно нужно было обновлять предсказания для всех клиентов по всем моделям. Исходное решение использовало классические partitioned tables по дате. Результат - катастрофический.
Главная ошибка: partitioning по дате когда фильтруете по user_id. Spark читает ВСЕ партиции, потому что данные разбросаны по всем датам. 1000 партиций, 100 узлов - 900 простаивают.
Три подхода к оптимизации: от костылей до магии
Мы тестировали три стратегии на одинаковом железе (30 узлов, 14 ядер каждый, Databricks Runtime 15.0). Исходные данные: 1 TB, 100 млн строк, фичи для 1000 моделей.
Partitioned Tables: старый добрый провал
Классика, которую все используют по привычке. Создаем таблицу, партицированную по model_id:
CREATE TABLE features_partitioned
USING delta
PARTITIONED BY (model_id)
AS SELECT * FROM raw_features;
Кажется логичным? На практике получили data skew. У одной модели 50 млн пользователей, у другой - 10 тысяч. Одна партиция обрабатывается 10 часов, остальные - 5 минут. Ресурсы простаивают.
Salting: хакерский трюк с побочными эффектами
Искусственно создаем дополнительные ключи для равномерного распределения:
CREATE TABLE features_salted
USING delta
PARTITIONED BY (model_id, salt)
AS
SELECT
*,
CAST(rand() * 100 AS INT) AS salt -- 100 бакетов
FROM raw_features;
Теперь каждая модель разбита на 100 частей. Можем запускать 100 параллельных задач. Но появились проблемы:
- Дополнительные 10% объема из-за колонки salt
- Маленькие файлы (small files problem) - 1000 моделей × 100 salt = 100k файлов
- Усложнение запросов: нужно итерировать по salt
Liquid Clustering: почему я не начал с этого раньше
Новая фича Databricks (стабильна с версии 13.0, в 2026 - уже стандарт). Вместо жестких партиций - умное распределение данных:
CREATE TABLE features_clustered
USING delta
CLUSTER BY (model_id, user_id)
AS SELECT * FROM raw_features;
Магия в том, что Databricks сам оптимизирует физическое расположение. Данные с одинаковым model_id лежат рядом, но без жестких границ партиций.
Цифры не врут: сравнительная таблица производительности
| Метод | Время инференса | CPU utilization | Shuffle data | Стоимость (относительно) |
|---|---|---|---|---|
| Partitioned (model_id) | 24 часа | 15% | 2.1 TB | 100% |
| Salted (model_id + salt) | 10 часов | 85% | 120 GB | 45% |
| Liquid Clustering | 6 часов | 90% | 15 GB | 25% |
Shuffle data - это объем данных, которые перемещаются между узлами. В partitioned варианте Spark читал все партиции и перетасовывал данные для группировки по model_id. Liquid Clustering дал минимальный shuffle - данные уже лежали оптимально.
Пошаговый переход на Liquid Clustering
1 Анализ workload и выбор колонок для кластеризации
Сначала смотрим, по каким полям фильтруете. В нашем случае WHERE model_id = X AND user_id IN (...). Значит, кластеризуем по model_id, user_id.
-- Смотрим распределение запросов
DESCRIBE HISTORY features_partitioned;
-- Анализируем частые фильтры
SHOW PARTITIONS features_partitioned;
2 Создание clustered таблицы с правильными настройками
CREATE TABLE features_optimized
USING delta
CLUSTER BY (model_id, user_id)
TBLPROPERTIES (
'delta.targetFileSize' = '256mb',
'delta.autoOptimize.optimizeWrite' = 'true',
'delta.autoOptimize.autoCompact' = 'true'
)
AS SELECT * FROM raw_features;
-- Оптимизируем сразу
OPTIMIZE features_optimized ZORDER BY (model_id, user_id);
Важно: ZORDER BY работает совместно с CLUSTER BY. ZORDER упорядочивает данные внутри файлов, CLUSTER BY группирует файлы. На 2026 год это стандартная рекомендация.
3 Настройка инференс-джобы под новую структуру
from pyspark.sql import functions as F
from pyspark.sql.window import Window
import numpy as np
# Старая версия с динамическим partitioning
# df.repartition(1000, 'model_id') - УЖАСНО МЕДЛЕННО
# Новая версия - читаем с предикатным фильтром
df = spark.table("features_optimized")
.filter(F.col("model_id").isin(model_ids))
.filter(F.col("date") == current_date)
# Pandas UDF для инференса - актуальный синтаксис 2026
@pandas_udf("user_id string, prediction float")
def batch_predict(iterator):
for batch in iterator:
# batch - pandas DataFrame
model = load_model(batch["model_id"].iloc[0])
batch["prediction"] = model.predict(batch["features"])
yield batch[["user_id", "prediction"]]
results = df.mapInPandas(batch_predict)
# Сохраняем с liquid clustering для результатов
results.write \
.format("delta") \
.mode("append") \
.option("clusterBy", "model_id") \
.save("/mnt/predictions")
4 Мониторинг и тонкая настройка
После запуска смотрим Spark UI:
- Количество задач: должно быть близко к количеству ядер
- Input Size / Records: равномерное распределение между задачами
- Shuffle Read/Write: минимизировано
-- Анализируем эффективность кластеризации
ANALYZE TABLE features_optimized COMPUTE STATISTICS;
-- Смотрим размер файлов
SELECT
count(*) as num_files,
avg(file_size) as avg_size,
min(file_size) as min_size,
max(file_size) as max_size
FROM (SELECT distinct input_file_name() as file_name,
size(*) as file_size
FROM features_optimized);
Типичные ошибки и как их избежать
Ошибка 1: Кластеризация по слишком многим колонкам. Databricks рекомендует 1-4 колонки. Больше - деградация производительности.
Видел таблицу с CLUSTER BY (date, user_id, model_id, country, device_type). Результат - OPTIMIZE работал 8 часов и ничего не улучшил.
Ошибка 2: Игнорирование ZORDER при частых range запросах. Если фильтруете по диапазону (date BETWEEN ...), добавляйте ZORDER BY date.
-- Правильно для range queries
OPTIMIZE features_optimized
ZORDER BY (date, model_id);
-- Но в CLUSTER BY оставляем только high-cardinality колонки
CLUSTER BY (model_id, user_id);
Ошибка 3: Запуск OPTIMIZE слишком часто. Каждый OPTIMIZE переписывает файлы - это деньги. Используйте autoOptimize для инкрементальных изменений.
Когда что выбирать: decision tree для 2026 года
1. Liquid Clustering - всегда по умолчанию для новых таблиц. Особенно если:
- Данные > 500 GB
- Частые queries с разными фильтрами
- Нужна автоматическая оптимизация
2. Salting - только при extreme data skew, который liquid clustering не решает. Например, одна модель - 90% данных. Добавляете salt вручную.
3. Partitioned Tables - для архивных данных или compliance требований (удаление партиций по дате). Или если ваша нагрузка - исключительно full scan по partition key.
Интеграция с ML pipeline: полная картина
Liquid clustering - только часть оптимизации. Вместе с этим нужно:
- Кэширование моделей на workers - не загружать модель для каждой строки
- Векторизованные UDF вместо row-by-row processing
- Правильный выбор instance types для инференса (CPU-оптимизированные для традиционных ML, GPU для глубоких сетей)
Для LLM инференса на Databricks смотрите отдельную архитектуру - там другие оптимизации, про которые мы писали в статье про LLM и Spark.
Наш финальный стек для ML inference на 2026:
- Хранилище: Delta Lake с Liquid Clustering
- Оркестрация: Databricks Workflows с ML pipelines
- Мониторинг: Databricks Lakehouse Monitoring + custom метрики
- Кэширование: Redis для часто используемых моделей
- Сервинг: для real-time - отдельный сервис, для batch - оптимизированные Spark джобы
И последнее: не залипайте на одной технологии. Тот, кто сегодня использует partitioned tables потому что "так всегда делали", завтра будет платить в 4 раза больше за инференс. Liquid clustering в 2026 - это не "передовая технология", это baseline. Если вы его не используете, вы просто теряете деньги.