Pandas умирает на 10 ГБ: пора переезжать
Вы загружаете датасет в 20 ГБ, а через 30 секунд видите классический MemoryError. ОЗУ сервера заполнено, swap трещит по швам, а коллеги спрашивают, не майните ли вы крипту. Знакомая картина? Поздравляю, вы уперлись в фундаментальный потолок Pandas – однопоточную обработку в памяти.
Pandas – это идеальный инструмент для данных, которые помещаются в оперативку вашего ноутбука. Но в 2026 году, когда датасеты на терабайты стали обычным делом, нужно думать иначе. PySpark – это не просто "Spark для Python". Это другая философия, где данные живут в распределенной кластерной памяти, а вычисления выполняются лениво и параллельно.
Осторожно: PySpark – это не drop-in замена Pandas. Перенос кода требует переосмысления архитектуры. Если вы скопируете pandas-стиль в PySpark, вы получите худшее из двух миров – сложность Spark и скорость Pandas.
Почему ваш pandas-код не масштабируется (и никогда не будет)
Представьте, что вы пытаетесь загрузить океан в ванную. Именно это делает Pandas с большими данными. Вся обработка происходит в памяти одного процесса, на одном ядре CPU. df.apply(), группировки, joins – всё это блокирующие операции, которые ждут завершения.
PySpark работает по другому принципу. Данные разбиваются на партиции, распределяются по узлам кластера (или ядрам вашего локального компьютера), а операции над ними планируются как граф вычислений. Ничего не выполняется, пока вы явно не скажете "собери результат". Это называется ленивыми вычислениями (lazy evaluation).
df.filter() или df.groupBy(), Spark не трогает данные. Он лишь строит план выполнения (execution plan). Реальная работа начинается только при вызове действий (actions) типа collect(), show() или write. Это позволяет оптимизировать весь граф операций перед запуском.1Настройка окружения: не повторяйте моих ошибок
Первая ошибка – установить PySpark через pip install pyspark и надеяться, что всё заработает. В 2026 году последняя стабильная ветка – Spark 4.x с существенными улучшениями в Python API. Убедитесь, что используете актуальную версию.
# Установка последней версии PySpark на 01.03.2026
pip install pyspark==4.0.0 # Проверьте актуальную версию!
# Для работы с паркетом и AWS S3 часто нужны дополнительные пакеты
pip install pyarrow==15.0.0 pandas==2.2.0 # СовместимостьЛокальный режим Spark (local[*]) использует все ядра вашего компьютера, но всю память JVM. Если у вас 32 ГБ ОЗУ, не давайте Spark больше 24 ГБ, иначе начнется своппинг. Настройка памяти – это искусство, о котором я подробно писал в статье про тонкости настройки DGX Spark.
2Перенос операций: от императивного к декларативному стилю
В Pandas вы привыкли делать так:
# Pandas стиль (императивный)
df = pd.read_csv('huge_file.csv')
df['new_column'] = df['old_column'] * 2
df_filtered = df[df['value'] > 100]
result = df_filtered.groupby('category').mean()В PySpark этот же код выглядит декларативно. Вы описываете, что хотите получить, а не как это делать:
# PySpark стиль (декларативный)
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
spark = SparkSession.builder \
.appName("PandasMigration") \
.config("spark.driver.memory", "8g") \
.getOrCreate()
df = spark.read.csv('huge_file.csv', header=True, inferSchema=True)
df_with_new = df.withColumn('new_column', col('old_column') * 2)
df_filtered = df_with_new.filter(col('value') > 100)
result = df_filtered.groupBy('category').avg() # Пока ещё ничего не выполнилось!
# Только теперь запускаем вычисления
result.show(5) # Action! Запускает весь граф операций| Операция | Pandas | PySpark (SQL API) | Примечание |
|---|---|---|---|
| Чтение CSV | pd.read_csv() | spark.read.csv() | Spark читает распределённо |
| Добавление столбца | df['new'] = ... | df.withColumn() | Возвращает новый DataFrame |
| Фильтрация | df[df.col > 5] | df.filter() или df.where() | Идентичны |
| Группировка | df.groupby().mean() | df.groupBy().avg() | Имена функций differ |
| Применение функции | df.apply() | df.select(udf(...)) | Требует UDF, что медленно |
3Оптимизация: как заставить Spark летать, а не ползать
Самый частый вопрос: "Почему мой Spark-джоб работает медленнее, чем старый pandas-скрипт?" Ответ обычно в неправильной работе с данными.
Партиционирование – это всё. Если вы делаете join двух DataFrame без партиционирования, Spark будет вынужден перетасовывать (shuffle) все данные по сети. Это убийственно для производительности.
# КАК НЕ НАДО делать
big_df.join(other_df, 'key') # Полный shuffle!
# Как надо: предварительное партиционирование
big_df_repartitioned = big_df.repartition(100, 'key')
other_df_repartitioned = other_df.repartition(100, 'key')
result = big_df_repartitioned.join(other_df_repartitioned, 'key') # Локальный joinИспользуйте партиционирование при записи данных, чтобы ускорить последующие чтения. Для работы с временными рядами или категориальными данными это критически важно. Более продвинутые техники вроде liquid clustering или salting я разбирал в статье про оптимизацию ML inference на Databricks.
df.explain() покажет физический план. Ищите там Exchange – это обозначает перетасовку данных. Чем меньше Exchange, тем быстрее выполнится запрос.Четыре смертных греха начинающего PySpark-разработчика
- Использование
collect()для больших данных.df.collect()загружает ВЕСЬ результат в память драйвера. Убийственно для терабайтов. Используйтеtake(),show()или запись в файл. - Пользовательские функции (UDF) на Python. Каждый вызов Python UDF сериализует данные из JVM в Python процесс и обратно. Используйте встроенные функции Spark SQL или, в крайнем случае, Pandas UDF (векторизованные). Если производительность падает, как в случае с расчетом расстояний между аэропортами, UDF часто становится узким местом.
- Игнорирование кэширования. Если вы используете один DataFrame несколько раз – закэшируйте его:
df.cache()илиdf.persist(). Но не кэшируйте всё подряд, иначе упретесь в память. - Неправильные типы данных. Spark не любит
objectили строки там, где должны быть числа. Всегда проверяйте схемуdf.printSchema()и приводите типы явно.
Когда Pandas всё-таки полезен внутри PySpark
Полный отказ от Pandas – утопия. В Spark 4.x есть отличная интеграция через API pandas_on_spark или applyInPandas. Вы можете разбить ваш большой DataFrame на группы, обработать каждую группу с помощью оптимизированного pandas-кода (например, сложных статистических расчётов), а затем собрать результаты обратно.
# Использование applyInPandas для сложных преобразований
def complex_pandas_transform(pdf):
# pdf - это обычный pandas DataFrame
pdf['calculated'] = pdf['a'] * pdf['b'] / pdf['c']
return pdf
# Применяем функцию к каждой партиции
result_df = spark_df.groupBy('group_key').applyInPandas(
complex_pandas_transform,
schema='group_key int, a double, b double, c double, calculated double'
)Это мощный паттерн, особенно для ML-препроцессинга, где некоторые библиотеки (скажем, для обработки аудио или изображений) работают только с pandas/numpy. Но помните: каждая группа должна помещаться в память одного воркера.
Что дальше: читаемость или производительность?
Самый неочевидный совет: иногда лучше написать два отдельных эффективных Spark-джоба, чем один монолитный. Разбивайте пайплайн на этапы с промежуточной записью на диск (в Parquet!). Это упрощает отладку, позволяет перезапускать только упавшие этапы и часто ускоряет общее время выполнения за счет лучшей оптимизации каждого шага.
PySpark – это не серебряная пуля. Для данных до 100 ГБ на современном сервере с большим ОЗУ оптимизированный Pandas (с использованием pyarrow и правильными типами данных) может оказаться быстрее. Но когда данные измеряются терабайтами или приходят потоками – выбор очевиден.
Начните с миграции самого болезненного участка вашего пайплайна. Перепишите один модуль, который постоянно падает из-за нехватки памяти. Профилируйте, смотрите на планы выполнения, играйте с количеством партиций. И не бойтесь выглядеть глупо – все мы когда-то делали collect() на 10 миллионах строк.