Почему ваш AI-агент не понимает переписку
Вы запускаете AI-агента для работы с поддержкой клиентов. Он получает доступ к почтовому ящику. Идея простая: агент должен читать письма и отвечать на них в контексте всей переписки.
В теории все гладко. На практике вы сталкиваетесь с кошмаром: 1 000 000 писем за 5 лет, 300 000 вложений, 15 языков переписки, десятки почтовых клиентов с разными форматами цитирования. И самое страшное — ваша модель не видит цепочки. Она получает последнее письмо и пытается угадать, о чем шла речь три года назад.
Проблема не в модели. Проблема в данных. Современные LLM могут работать с контекстом в 128К токенов, но если вы подаете им разрозненные письма без связей — это все равно что читать книгу, вырвав из нее каждую вторую страницу.
Реконструкция тредов: когда стандартные методы ломаются
Первое, что приходит в голову — использовать стандартные поля email: In-Reply-To, References, Message-ID. Звучит логично, пока не открываете реальные данные.
Что ломает стандартные методы:
- Клиенты теряют заголовки. Gmail, Outlook Web App, мобильные приложения — каждый режет по-своему.
- Пересылки убивают цепочки. Письмо из треда пересылают новому участнику — и все, тред разорван.
- Ответы на несколько писем сразу. "Ответить всем" на письмо, которое само было ответом на три других.
- Темы изменяются посреди треда. Начинается с "Вопрос по счету", а через 20 писем превращается в "Проблема с API интеграцией".
1 Собираем все возможные сигналы
Нельзя полагаться на один метод. Нужна многоуровневая система:
| Уровень | Метод | Точность | Когда использовать |
|---|---|---|---|
| 1. Строгий | Message-ID, In-Reply-To | 100% | Всегда первый |
| 2. Лексический | Нормализация темы (RE:, FWD:, удаление префиксов) | 85-90% | Когда заголовки повреждены |
| 3. Временной | Группировка по участникам + временное окно | 75-80% | Для пересылок и ответов всем |
| 4. Семантический | Embedding текста + кластеризация | 60-70% | Когда тема меняется кардинально |
Вот как выглядит код для нормализации темы письма (уровень 2):
import re
def normalize_subject(subject: str) -> str:
"""Удаляем все префиксы RE:, FWD:, AW: и их вариации."""
if not subject:
return ""
# Удаляем языковые префиксы
prefixes = [
r'^\s*(RE|Re|FWD|Fwd|FW|Fw|AW|Aw|SV|Sv|VS|Vs)[\s:\[\]\-]*\s*',
r'^\s*\[[^\]]*\]\s*', # Удаляем квадратные скобки
r'^\s*\([^\)]*\)\s*', # Удаляем круглые скобки
]
normalized = subject
for pattern in prefixes:
normalized = re.sub(pattern, '', normalized, flags=re.IGNORECASE)
# Удаляем лишние пробелы
normalized = normalized.strip()
# Для пустых тем возвращаем хэш участников
if not normalized:
return "NO_SUBJECT"
return normalized.lower()
# Пример:
print(normalize_subject("RE: RE: [External] Ваш вопрос по API"))
# Output: "ваш вопрос по api"
2 Строим граф связей
После сбора сигналов строим граф, где узлы — письма, а ребра — связи. Самый сложный момент — разрешение конфликтов:
Конфликт: Письмо A связано с B по Message-ID, но по времени отправки и семантике оно должно быть в треде с C. Что делать? В enterprise выбирают сторону сохранения целостности треда, даже если это противоречит техническим заголовкам.
Вложения: когда PDF скрывает больше, чем показывает
300 000 вложений в миллионе писем. Из них 40% — PDF, 30% — изображения (скриншоты, сканы), 20% — документы Office, 10% — все остальное.
Ваш AI-агент должен понимать не просто "в письме есть вложение", а что в этом вложении содержится. Счет, договор, техническая спецификация, скриншот ошибки — каждый тип требует своей обработки.
Три слоя обработки вложений:
- Метаданные: Имя файла, размер, MIME-тип. Просто, но уже дает 30% контекста.
- Текстовое содержимое: Парсинг PDF, DOCX, таблиц Excel. Сложность в том, что PDF бывают сканированными, защищенными паролем, с таблицами, которые ломают структуру.
- Семантическое понимание: Это счет или договор? Техническая документация или маркетинговый буклет? Здесь подключается классификация на основе содержимого.
Код для определения типа вложения по содержимому:
import re
from typing import Optional
def classify_attachment_content(text: str, filename: str) -> str:
"""Классифицируем вложение по содержимому и имени файла."""
text_lower = text.lower()
filename_lower = filename.lower()
# Паттерны для счетов
invoice_patterns = [
r'счет\s*№', r'invoice', r'оплата', r'total\s*amount',
r'итого\s*к\s*оплате', r'банковские\s*реквизиты'
]
# Паттерны для договоров
contract_patterns = [
r'договор\s*№', r'контракт', r'agreement',
r'стороны\s*договорились', r'подписант'
]
# Проверяем по содержимому
for pattern in invoice_patterns:
if re.search(pattern, text_lower):
return "invoice"
for pattern in contract_patterns:
if re.search(pattern, text_lower):
return "contract"
# Проверяем по имени файлу
if any(word in filename_lower for word in ['invoice', 'счет', 'bill']):
return "invoice"
if any(word in filename_lower for word in ['contract', 'договор', 'agreement']):
return "contract"
# Проверяем структуру
if re.search(r'таблица|table|\d+\s*[xх]\s*\d+', text_lower):
return "table"
return "unknown"
# Пример использования
content = "Счет № 12345 от 01.01.2024\nИтого к оплате: 1000 USD"
print(classify_attachment_content(content, "invoice_123.pdf"))
# Output: "invoice"
pdf.miner извлекает меньше 50 символов на страницу — запускаем OCR.Мультиязычность: когда один тред на трех языках
Реальный кейс из финансового сектора: клиент начинает переписку на английском, техподдержка отвечает на русском, в цепочку включается немецкий коллега, а итоговый счет приходит на французском.
Ваш AI-агент должен:
- Определять язык каждого письма (да, даже смешанные предложения вроде "Hello, мне нужен help с этим issue")
- Сохранять языковой контекст при ответе
- Переводить только когда нужно (не каждый запрос требует перевода всего треда)
3 Определяем язык правильно
FastText от Facebook работает хорошо, но тяжеловесен для миллиона писем. LangDetect легче, но хуже с короткими текстами. Наш компромисс:
from langdetect import detect, DetectorFactory
from typing import Optional
# Для воспроизводимости результатов
DetectorFactory.seed = 0
def detect_email_language(text: str, fallback: str = "en") -> str:
"""Определяем язык письма с fallback на английский."""
if not text or len(text.strip()) < 10:
return fallback
# Извлекаем только текст, удаляем цитаты и подписи
clean_text = clean_email_text(text)
if len(clean_text) < 20:
return fallback
try:
# Пробуем определить язык
lang = detect(clean_text)
# Маппим к стандартным кодам
lang_map = {
'ru': 'ru', 'en': 'en', 'de': 'de', 'fr': 'fr',
'es': 'es', 'it': 'it', 'pt': 'pt', 'zh-cn': 'zh'
}
return lang_map.get(lang, fallback)
except:
return fallback
def clean_email_text(text: str) -> str:
"""Очищаем текст от цитат и подписей."""
# Удаляем стандартные блоки цитирования
lines = text.split('\n')
clean_lines = []
quote_indicators = ['>', 'On ', ' wrote:', 'Sent:', 'From:']
for line in lines:
line_stripped = line.strip()
# Пропускаем пустые строки и цитаты
if not line_stripped:
continue
if any(line_stripped.startswith(indicator) for indicator in quote_indicators):
break # Все что после - цитата
clean_lines.append(line_stripped)
return ' '.join(clean_lines[:500]) # Берем первые 500 слов
Важно: Не переводите автоматически все письма на один язык. Контекст теряется. Вместо этого храните оригинальный язык каждого сообщения и переводите только при необходимости поиска или анализа.
Zero Data Retention: когда нельзя хранить ничего лишнего
GDPR, CCPA, российский 152-ФЗ. Enterprise-клиенты требуют гарантий: данные должны удаляться по запросу, без следов. Это не просто "удалить из базы". Это:
- Удаление всех embedding'ов из векторной базы
- Очистка логов обработки
- Удаление кэшированных результатов парсинга
- Гарантия, что даже в бэкапах данных нет
Архитектура для compliance:
class ComplianceAwarePipeline:
"""Пайплайн с поддержкой полного удаления данных."""
def __init__(self):
self.data_registry = {} # Регистр: message_id -> где хранится
def process_email(self, email_data: dict) -> dict:
"""Обрабатываем письмо и регистрируем все места хранения."""
email_id = email_data['id']
# 1. Сохраняем raw email
raw_storage_key = self._store_raw(email_data)
self.data_registry[email_id] = {'raw': raw_storage_key}
# 2. Парсим вложения
attachments_data = self._parse_attachments(email_data)
if attachments_data:
attachment_keys = self._store_attachments(attachments_data)
self.data_registry[email_id]['attachments'] = attachment_keys
# 3. Создаем embedding
embedding_key = self._create_embedding(email_data)
self.data_registry[email_id]['embedding'] = embedding_key
# 4. Индексируем для поиска
search_key = self._index_for_search(email_data)
self.data_registry[email_id]['search'] = search_key
return {
'email_id': email_id,
'processed': True,
'registry_key': f"email:{email_id}"
}
def delete_all_data(self, email_id: str) -> bool:
"""Полное удаление всех данных по письму."""
if email_id not in self.data_registry:
return False
registry = self.data_registry[email_id]
# Удаляем из всех систем
self._delete_from_raw_storage(registry.get('raw'))
self._delete_attachments(registry.get('attachments', []))
self._delete_embedding(registry.get('embedding'))
self._delete_from_search_index(registry.get('search'))
# Очищаем регистр
del self.data_registry[email_id]
# Логируем удаление (логи тоже должны очищаться)
self._audit_deletion(email_id)
return True
def _audit_deletion(self, email_id: str):
"""Аудит удаления с автоматической очисткой через 30 дней."""
# Здесь реализация логгирования с TTL
pass
Ключевая идея: каждый кусочек данных должен иметь метку, откуда его удалять. Без этого вы не сможете гарантировать compliance.
Семантический поиск по миллиону писем: не только embedding
Вы построили embedding для всех писем. Запускаете поиск "проблема с API в прошлом месяце". И получаете... 5000 результатов. Потому что в каждом письме техподдержки есть слова "проблема" и "API".
Многоуровневый поиск:
| Уровень | Технология | Что ищет | Когда использовать |
|---|---|---|---|
| 1. Фильтрация | Elasticsearch / PostgreSQL | Метаданные: дата, отправитель, тема | Всегда первый шаг |
| 2. Ключевые слова | BM25 / TF-IDF | Конкретные термины в контексте | Когда нужна точность |
| 3. Семантика | Vector Search (Qdrant, Pinecone) | Похожие по смыслу письма | Для сложных запросов |
| 4. Ранжирование | Cross-Encoder / Reranker | Точное соответствие запросу | Финальная сортировка |
Вот как выглядит гибридный поиск на практике:
class HybridEmailSearch:
"""Гибридный поиск по письмам."""
def search(self, query: str, filters: dict = None, limit: int = 50):
"""Ищем письма по запросу с фильтрами."""
# 1. Фильтрация по метаданным
filtered_ids = self._filter_by_metadata(filters)
if not filtered_ids:
return []
# 2. Поиск по ключевым словам
keyword_results = self._keyword_search(query, filtered_ids, limit=limit*2)
# 3. Семантический поиск
vector_results = self._vector_search(query, filtered_ids, limit=limit*2)
# 4. Объединяем и дедуплицируем
all_results = self._merge_results(keyword_results, vector_results)
# 5. Переранжируем с помощью cross-encoder
reranked = self._rerank_with_cross_encoder(query, all_results[:limit*3])
# 6. Возвращаем топ результатов
return reranked[:limit]
def _rerank_with_cross_encoder(self, query: str, results: list):
"""Используем cross-encoder для точного ранжирования."""
# Используем модель типа cross-encoder/ms-marco-MiniLM-L-6-v2
# которая сравнивает запрос с каждым документом отдельно
pairs = [(query, result['text']) for result in results]
# Здесь вызов модели cross-encoder
# scores = cross_encoder_model.predict(pairs)
# Сортируем по убыванию score
# sorted_results = sorted(zip(results, scores), key=lambda x: x[1], reverse=True)
# Для примера возвращаем как есть
return results
Конвейерная обработка: как не утонуть в миллионе писем
Обработка миллиона писем — это не batch job. Это конвейер, который должен работать постоянно, обрабатывая новые письма и периодически переиндексируя старые.
Архитектура пайплайна:
# docker-compose.yml для пайплайна обработки email
version: '3.8'
services:
# 1. Прием писем
email_ingest:
image: apache/nifi:latest
environment:
- NIFI_WEB_HTTP_PORT=8080
volumes:
- ./nifi_templates:/templates
ports:
- "8080:8080"
# 2. Очередь задач
redis_queue:
image: redis:alpine
command: redis-server --appendonly yes
ports:
- "6379:6379"
# 3. Воркеры обработки
email_worker:
build: ./workers
environment:
- REDIS_HOST=redis_queue
- MODEL_PATH=/models
deploy:
replicas: 4 # Масштабируем под нагрузку
volumes:
- ./models:/models
# 4. Векторная база
qdrant:
image: qdrant/qdrant:latest
ports:
- "6333:6333"
volumes:
- ./qdrant_storage:/storage
# 5. Поисковый движок
elasticsearch:
image: elasticsearch:8.11.0
environment:
- discovery.type=single-node
- xpack.security.enabled=false
ports:
- "9200:9200"
# 6. Мониторинг
grafana:
image: grafana/grafana:latest
ports:
- "3000:3000"
depends_on:
- prometheus
prometheus:
image: prom/prometheus:latest
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
ports:
- "9090:9090"
Ключевые метрики для мониторинга:
- Latency 95-й перцентиль — как быстро обрабатывается письмо от получения до индексации
- Error rate — процент писем, которые не удалось обработать
- Attachment processing time — время парсинга вложений (PDF с OCR могут быть медленными)
- Vector search recall@10 — насколько хорошо поиск находит релевантные письма
Ошибки, которые сломают ваш пайплайн
Ошибка 1: Обрабатывать все письма сразу. Результат: Out of Memory через 2 часа. Решение: потоковая обработка с backpressure.
Ошибка 2: Хранить embedding для каждого письма отдельно. Результат: 1M писем × 768 измерений = 3GB RAM только на векторы. Решение: использовать quantization (SQ8) и сжимать в 4 раза.
Ошибка 3: Запускать OCR для всех PDF. Результат: CPU на 100% на неделю. Решение: определять, есть ли текстовый слой, и запускать OCR только при необходимости.
Ошибка 4: Игнорировать кодировки. Результат: кракозябры в 15% писем. Решение: использовать chardet и fallback на Windows-1251 для русских писем.
Что дальше? Когда AI-агент действительно понимает контекст
Вы построили пайплайн. Письма индексируются. Треды восстановлены. Вложения распарсены. Поиск работает. Что дальше?
Теперь ваш AI-агент может:
- Отвечать на вопросы в контексте всей истории переписки ("Что мы предлагали этому клиенту в 2022 году?")
- Находить похожие кейсы в других тредах ("У кого еще была такая проблема с API?")
- Анализировать вложения и извлекать структурированные данные ("Покажи все счета за последний квартал")
- Работать с мультиязычными данными без потери контекста
Но самое важное — вы создали не просто поисковик по письмам. Вы построили систему контекстной памяти для AI-агента. Теперь он не просто отвечает на последнее письмо. Он помнит всю историю взаимодействия.
И вот парадокс: потратив 80% времени на обработку данных (а не на fine-tuning модели), вы получили на порядок больше пользы. Потому что самая крутая модель бесполезна с плохими данными.
Следующий шаг? Интеграция с stateful memory архитектурой, где агент не просто ищет в прошлых письмах, а строит долгосрочные профили взаимодействия с каждым контактом.
Но это уже другая история. А пока — проверьте, как у вас с парсингом вложений.