Обработка 1 млн писем для AI-агентов: пайплайн, парсинг PDF, реконструкция тредов | AiManual
AiManual Logo Ai / Manual.
18 Янв 2026 Гайд

Когда миллион писем должен стать контекстом для AI-агента: индустриальный гайд по реконструкции тредов, парсингу вложений и работе с 15 языками

Глубокий технический разбор обработки email для AI-агентов. Реконструкция тредов, работа с вложениями (OCR, парсинг), мультиязычные данные, zero data retention.

Почему ваш AI-агент не понимает переписку

Вы запускаете AI-агента для работы с поддержкой клиентов. Он получает доступ к почтовому ящику. Идея простая: агент должен читать письма и отвечать на них в контексте всей переписки.

В теории все гладко. На практике вы сталкиваетесь с кошмаром: 1 000 000 писем за 5 лет, 300 000 вложений, 15 языков переписки, десятки почтовых клиентов с разными форматами цитирования. И самое страшное — ваша модель не видит цепочки. Она получает последнее письмо и пытается угадать, о чем шла речь три года назад.

Проблема не в модели. Проблема в данных. Современные LLM могут работать с контекстом в 128К токенов, но если вы подаете им разрозненные письма без связей — это все равно что читать книгу, вырвав из нее каждую вторую страницу.

Реконструкция тредов: когда стандартные методы ломаются

Первое, что приходит в голову — использовать стандартные поля email: In-Reply-To, References, Message-ID. Звучит логично, пока не открываете реальные данные.

Что ломает стандартные методы:

  • Клиенты теряют заголовки. Gmail, Outlook Web App, мобильные приложения — каждый режет по-своему.
  • Пересылки убивают цепочки. Письмо из треда пересылают новому участнику — и все, тред разорван.
  • Ответы на несколько писем сразу. "Ответить всем" на письмо, которое само было ответом на три других.
  • Темы изменяются посреди треда. Начинается с "Вопрос по счету", а через 20 писем превращается в "Проблема с API интеграцией".
💡
В реальном enterprise-кейсе мы обнаружили, что стандартные поля корректно связывают только 65% писем. Остальные 35% — это ручная работа алгоритмов эвристического сопоставления.

1 Собираем все возможные сигналы

Нельзя полагаться на один метод. Нужна многоуровневая система:

Уровень Метод Точность Когда использовать
1. Строгий Message-ID, In-Reply-To 100% Всегда первый
2. Лексический Нормализация темы (RE:, FWD:, удаление префиксов) 85-90% Когда заголовки повреждены
3. Временной Группировка по участникам + временное окно 75-80% Для пересылок и ответов всем
4. Семантический Embedding текста + кластеризация 60-70% Когда тема меняется кардинально

Вот как выглядит код для нормализации темы письма (уровень 2):

import re

def normalize_subject(subject: str) -> str:
    """Удаляем все префиксы RE:, FWD:, AW: и их вариации."""
    if not subject:
        return ""
    
    # Удаляем языковые префиксы
    prefixes = [
        r'^\s*(RE|Re|FWD|Fwd|FW|Fw|AW|Aw|SV|Sv|VS|Vs)[\s:\[\]\-]*\s*',
        r'^\s*\[[^\]]*\]\s*',  # Удаляем квадратные скобки
        r'^\s*\([^\)]*\)\s*',  # Удаляем круглые скобки
    ]
    
    normalized = subject
    for pattern in prefixes:
        normalized = re.sub(pattern, '', normalized, flags=re.IGNORECASE)
    
    # Удаляем лишние пробелы
    normalized = normalized.strip()
    
    # Для пустых тем возвращаем хэш участников
    if not normalized:
        return "NO_SUBJECT"
    
    return normalized.lower()

# Пример:
print(normalize_subject("RE: RE: [External] Ваш вопрос по API"))
# Output: "ваш вопрос по api"

2 Строим граф связей

После сбора сигналов строим граф, где узлы — письма, а ребра — связи. Самый сложный момент — разрешение конфликтов:

Конфликт: Письмо A связано с B по Message-ID, но по времени отправки и семантике оно должно быть в треде с C. Что делать? В enterprise выбирают сторону сохранения целостности треда, даже если это противоречит техническим заголовкам.

Вложения: когда PDF скрывает больше, чем показывает

300 000 вложений в миллионе писем. Из них 40% — PDF, 30% — изображения (скриншоты, сканы), 20% — документы Office, 10% — все остальное.

Ваш AI-агент должен понимать не просто "в письме есть вложение", а что в этом вложении содержится. Счет, договор, техническая спецификация, скриншот ошибки — каждый тип требует своей обработки.

Три слоя обработки вложений:

  1. Метаданные: Имя файла, размер, MIME-тип. Просто, но уже дает 30% контекста.
  2. Текстовое содержимое: Парсинг PDF, DOCX, таблиц Excel. Сложность в том, что PDF бывают сканированными, защищенными паролем, с таблицами, которые ломают структуру.
  3. Семантическое понимание: Это счет или договор? Техническая документация или маркетинговый буклет? Здесь подключается классификация на основе содержимого.

Код для определения типа вложения по содержимому:

import re
from typing import Optional

def classify_attachment_content(text: str, filename: str) -> str:
    """Классифицируем вложение по содержимому и имени файла."""
    text_lower = text.lower()
    filename_lower = filename.lower()
    
    # Паттерны для счетов
    invoice_patterns = [
        r'счет\s*№', r'invoice', r'оплата', r'total\s*amount',
        r'итого\s*к\s*оплате', r'банковские\s*реквизиты'
    ]
    
    # Паттерны для договоров
    contract_patterns = [
        r'договор\s*№', r'контракт', r'agreement',
        r'стороны\s*договорились', r'подписант'
    ]
    
    # Проверяем по содержимому
    for pattern in invoice_patterns:
        if re.search(pattern, text_lower):
            return "invoice"
    
    for pattern in contract_patterns:
        if re.search(pattern, text_lower):
            return "contract"
    
    # Проверяем по имени файлу
    if any(word in filename_lower for word in ['invoice', 'счет', 'bill']):
        return "invoice"
    
    if any(word in filename_lower for word in ['contract', 'договор', 'agreement']):
        return "contract"
    
    # Проверяем структуру
    if re.search(r'таблица|table|\d+\s*[xх]\s*\d+', text_lower):
        return "table"
    
    return "unknown"

# Пример использования
content = "Счет № 12345 от 01.01.2024\nИтого к оплате: 1000 USD"
print(classify_attachment_content(content, "invoice_123.pdf"))
# Output: "invoice"
💡
Сканированные PDF — отдельный ад. Tesseract OCR работает медленно на больших объемах. Решение: запускать OCR только для PDF, где нет текстового слоя. Проверка простая: если pdf.miner извлекает меньше 50 символов на страницу — запускаем OCR.

Мультиязычность: когда один тред на трех языках

Реальный кейс из финансового сектора: клиент начинает переписку на английском, техподдержка отвечает на русском, в цепочку включается немецкий коллега, а итоговый счет приходит на французском.

Ваш AI-агент должен:

  • Определять язык каждого письма (да, даже смешанные предложения вроде "Hello, мне нужен help с этим issue")
  • Сохранять языковой контекст при ответе
  • Переводить только когда нужно (не каждый запрос требует перевода всего треда)

3 Определяем язык правильно

FastText от Facebook работает хорошо, но тяжеловесен для миллиона писем. LangDetect легче, но хуже с короткими текстами. Наш компромисс:

from langdetect import detect, DetectorFactory
from typing import Optional

# Для воспроизводимости результатов
DetectorFactory.seed = 0

def detect_email_language(text: str, fallback: str = "en") -> str:
    """Определяем язык письма с fallback на английский."""
    if not text or len(text.strip()) < 10:
        return fallback
    
    # Извлекаем только текст, удаляем цитаты и подписи
    clean_text = clean_email_text(text)
    
    if len(clean_text) < 20:
        return fallback
    
    try:
        # Пробуем определить язык
        lang = detect(clean_text)
        
        # Маппим к стандартным кодам
        lang_map = {
            'ru': 'ru', 'en': 'en', 'de': 'de', 'fr': 'fr',
            'es': 'es', 'it': 'it', 'pt': 'pt', 'zh-cn': 'zh'
        }
        
        return lang_map.get(lang, fallback)
    except:
        return fallback

def clean_email_text(text: str) -> str:
    """Очищаем текст от цитат и подписей."""
    # Удаляем стандартные блоки цитирования
    lines = text.split('\n')
    clean_lines = []
    
    quote_indicators = ['>', 'On ', ' wrote:', 'Sent:', 'From:']
    
    for line in lines:
        line_stripped = line.strip()
        
        # Пропускаем пустые строки и цитаты
        if not line_stripped:
            continue
        
        if any(line_stripped.startswith(indicator) for indicator in quote_indicators):
            break  # Все что после - цитата
        
        clean_lines.append(line_stripped)
    
    return ' '.join(clean_lines[:500])  # Берем первые 500 слов

Важно: Не переводите автоматически все письма на один язык. Контекст теряется. Вместо этого храните оригинальный язык каждого сообщения и переводите только при необходимости поиска или анализа.

Zero Data Retention: когда нельзя хранить ничего лишнего

GDPR, CCPA, российский 152-ФЗ. Enterprise-клиенты требуют гарантий: данные должны удаляться по запросу, без следов. Это не просто "удалить из базы". Это:

  • Удаление всех embedding'ов из векторной базы
  • Очистка логов обработки
  • Удаление кэшированных результатов парсинга
  • Гарантия, что даже в бэкапах данных нет

Архитектура для compliance:

class ComplianceAwarePipeline:
    """Пайплайн с поддержкой полного удаления данных."""
    
    def __init__(self):
        self.data_registry = {}  # Регистр: message_id -> где хранится
        
    def process_email(self, email_data: dict) -> dict:
        """Обрабатываем письмо и регистрируем все места хранения."""
        email_id = email_data['id']
        
        # 1. Сохраняем raw email
        raw_storage_key = self._store_raw(email_data)
        self.data_registry[email_id] = {'raw': raw_storage_key}
        
        # 2. Парсим вложения
        attachments_data = self._parse_attachments(email_data)
        if attachments_data:
            attachment_keys = self._store_attachments(attachments_data)
            self.data_registry[email_id]['attachments'] = attachment_keys
        
        # 3. Создаем embedding
        embedding_key = self._create_embedding(email_data)
        self.data_registry[email_id]['embedding'] = embedding_key
        
        # 4. Индексируем для поиска
        search_key = self._index_for_search(email_data)
        self.data_registry[email_id]['search'] = search_key
        
        return {
            'email_id': email_id,
            'processed': True,
            'registry_key': f"email:{email_id}"
        }
    
    def delete_all_data(self, email_id: str) -> bool:
        """Полное удаление всех данных по письму."""
        if email_id not in self.data_registry:
            return False
        
        registry = self.data_registry[email_id]
        
        # Удаляем из всех систем
        self._delete_from_raw_storage(registry.get('raw'))
        self._delete_attachments(registry.get('attachments', []))
        self._delete_embedding(registry.get('embedding'))
        self._delete_from_search_index(registry.get('search'))
        
        # Очищаем регистр
        del self.data_registry[email_id]
        
        # Логируем удаление (логи тоже должны очищаться)
        self._audit_deletion(email_id)
        
        return True
    
    def _audit_deletion(self, email_id: str):
        """Аудит удаления с автоматической очисткой через 30 дней."""
        # Здесь реализация логгирования с TTL
        pass

Ключевая идея: каждый кусочек данных должен иметь метку, откуда его удалять. Без этого вы не сможете гарантировать compliance.

Семантический поиск по миллиону писем: не только embedding

Вы построили embedding для всех писем. Запускаете поиск "проблема с API в прошлом месяце". И получаете... 5000 результатов. Потому что в каждом письме техподдержки есть слова "проблема" и "API".

Многоуровневый поиск:

Уровень Технология Что ищет Когда использовать
1. Фильтрация Elasticsearch / PostgreSQL Метаданные: дата, отправитель, тема Всегда первый шаг
2. Ключевые слова BM25 / TF-IDF Конкретные термины в контексте Когда нужна точность
3. Семантика Vector Search (Qdrant, Pinecone) Похожие по смыслу письма Для сложных запросов
4. Ранжирование Cross-Encoder / Reranker Точное соответствие запросу Финальная сортировка

Вот как выглядит гибридный поиск на практике:

class HybridEmailSearch:
    """Гибридный поиск по письмам."""
    
    def search(self, query: str, filters: dict = None, limit: int = 50):
        """Ищем письма по запросу с фильтрами."""
        
        # 1. Фильтрация по метаданным
        filtered_ids = self._filter_by_metadata(filters)
        
        if not filtered_ids:
            return []
        
        # 2. Поиск по ключевым словам
        keyword_results = self._keyword_search(query, filtered_ids, limit=limit*2)
        
        # 3. Семантический поиск
        vector_results = self._vector_search(query, filtered_ids, limit=limit*2)
        
        # 4. Объединяем и дедуплицируем
        all_results = self._merge_results(keyword_results, vector_results)
        
        # 5. Переранжируем с помощью cross-encoder
        reranked = self._rerank_with_cross_encoder(query, all_results[:limit*3])
        
        # 6. Возвращаем топ результатов
        return reranked[:limit]
    
    def _rerank_with_cross_encoder(self, query: str, results: list):
        """Используем cross-encoder для точного ранжирования."""
        # Используем модель типа cross-encoder/ms-marco-MiniLM-L-6-v2
        # которая сравнивает запрос с каждым документом отдельно
        
        pairs = [(query, result['text']) for result in results]
        
        # Здесь вызов модели cross-encoder
        # scores = cross_encoder_model.predict(pairs)
        
        # Сортируем по убыванию score
        # sorted_results = sorted(zip(results, scores), key=lambda x: x[1], reverse=True)
        
        # Для примера возвращаем как есть
        return results
💡
Cross-encoder дорогой для больших наборов (O(n) сложность), но дает лучшую точность. Используйте его только на топ-100 результатах после фильтрации и vector search.

Конвейерная обработка: как не утонуть в миллионе писем

Обработка миллиона писем — это не batch job. Это конвейер, который должен работать постоянно, обрабатывая новые письма и периодически переиндексируя старые.

Архитектура пайплайна:

# docker-compose.yml для пайплайна обработки email
version: '3.8'

services:
  # 1. Прием писем
  email_ingest:
    image: apache/nifi:latest
    environment:
      - NIFI_WEB_HTTP_PORT=8080
    volumes:
      - ./nifi_templates:/templates
    ports:
      - "8080:8080"

  # 2. Очередь задач
  redis_queue:
    image: redis:alpine
    command: redis-server --appendonly yes
    ports:
      - "6379:6379"

  # 3. Воркеры обработки
  email_worker:
    build: ./workers
    environment:
      - REDIS_HOST=redis_queue
      - MODEL_PATH=/models
    deploy:
      replicas: 4  # Масштабируем под нагрузку
    volumes:
      - ./models:/models

  # 4. Векторная база
  qdrant:
    image: qdrant/qdrant:latest
    ports:
      - "6333:6333"
    volumes:
      - ./qdrant_storage:/storage

  # 5. Поисковый движок
  elasticsearch:
    image: elasticsearch:8.11.0
    environment:
      - discovery.type=single-node
      - xpack.security.enabled=false
    ports:
      - "9200:9200"

  # 6. Мониторинг
  grafana:
    image: grafana/grafana:latest
    ports:
      - "3000:3000"
    depends_on:
      - prometheus

  prometheus:
    image: prom/prometheus:latest
    volumes:
      - ./prometheus.yml:/etc/prometheus/prometheus.yml
    ports:
      - "9090:9090"

Ключевые метрики для мониторинга:

  • Latency 95-й перцентиль — как быстро обрабатывается письмо от получения до индексации
  • Error rate — процент писем, которые не удалось обработать
  • Attachment processing time — время парсинга вложений (PDF с OCR могут быть медленными)
  • Vector search recall@10 — насколько хорошо поиск находит релевантные письма

Ошибки, которые сломают ваш пайплайн

Ошибка 1: Обрабатывать все письма сразу. Результат: Out of Memory через 2 часа. Решение: потоковая обработка с backpressure.

Ошибка 2: Хранить embedding для каждого письма отдельно. Результат: 1M писем × 768 измерений = 3GB RAM только на векторы. Решение: использовать quantization (SQ8) и сжимать в 4 раза.

Ошибка 3: Запускать OCR для всех PDF. Результат: CPU на 100% на неделю. Решение: определять, есть ли текстовый слой, и запускать OCR только при необходимости.

Ошибка 4: Игнорировать кодировки. Результат: кракозябры в 15% писем. Решение: использовать chardet и fallback на Windows-1251 для русских писем.

Что дальше? Когда AI-агент действительно понимает контекст

Вы построили пайплайн. Письма индексируются. Треды восстановлены. Вложения распарсены. Поиск работает. Что дальше?

Теперь ваш AI-агент может:

  • Отвечать на вопросы в контексте всей истории переписки ("Что мы предлагали этому клиенту в 2022 году?")
  • Находить похожие кейсы в других тредах ("У кого еще была такая проблема с API?")
  • Анализировать вложения и извлекать структурированные данные ("Покажи все счета за последний квартал")
  • Работать с мультиязычными данными без потери контекста

Но самое важное — вы создали не просто поисковик по письмам. Вы построили систему контекстной памяти для AI-агента. Теперь он не просто отвечает на последнее письмо. Он помнит всю историю взаимодействия.

И вот парадокс: потратив 80% времени на обработку данных (а не на fine-tuning модели), вы получили на порядок больше пользы. Потому что самая крутая модель бесполезна с плохими данными.

Следующий шаг? Интеграция с stateful memory архитектурой, где агент не просто ищет в прошлых письмах, а строит долгосрочные профили взаимодействия с каждым контактом.

Но это уже другая история. А пока — проверьте, как у вас с парсингом вложений.