Ты запустил 50 агентов. Каждый агент дергает GPT-5, Claude Opus 4.5, Gemini Ultra 2. Через 3 секунды — братан, 429 Too Many Requests. Все агенты в ступоре, задача валится, логи пестрят красным. Знакомо?
Я сам прошел этот ад. В статье про оркестрацию без роутинга мы обсуждали, как легко сломать систему неправильным распределением задач. Но даже идеальная архитектура разобьется о суровый rate limit провайдера. OpenAI дает Tier 5 до 10 000 RPM, Anthropic — до 4 000 RPM на Claude Opus 4.5, Google Gemini Ultra 2 — 6 000 RPM. И это раздельные квоты! А если у тебя 100 агентов — ты привысишь лимит за секунду.
Почему стандартные ретраи не спасут
Большинство библиотек типа tenacity или backoff тупо ждут. Они не знают про общий пул запросов. Если 50 агентов одновременно увидят 429 — они дружно отвалятся, подождут одинаковое время и снова ударят в тот же момент. Это называется Thundering Herd. Лимит провайдера не восстановится, ты получишь новый 429, и так до бесконечности. Твоя мультиагентка превращается в тормозного червя.
В разборе типовых ошибок AI-агентов я уже упоминал, что 80% падений в production связаны с плохим управлением ресурсами. Rate limiting — та же история.
Хорошо, хватит теории. Вот 6 паттернов, которые я выстрадал на проектах с 500+ агентами. Каждый с кодом, граблями и объяснением «почему».
Паттерн #1: Token Bucket с адаптивным наполнением
Базовая идея — ты кладешь токены в ведро с постоянной скоростью. Агенты забирают токен перед запросом. Если ведро пусто — ждут. Но есть нюанс: скорость наполнения не должна быть константой. Провайдеры часто дают burst capacity (всплеск), а затем режут. Нужно адаптироваться.
1 Реализуем адаптивный Token Bucket на Python
import asyncio
from time import time
class AdaptiveTokenBucket:
def __init__(self, initial_rate: float, max_tokens: int):
self.rate = initial_rate # токенов в секунду
self.max_tokens = max_tokens
self.tokens = max_tokens
self.last_refill = time()
self.lock = asyncio.Lock()
async def acquire(self) -> float:
"""Возвращает время ожидания в секундах."""
async with self.lock:
now = time()
elapsed = now - self.last_refill
self.tokens = min(self.max_tokens, self.tokens + elapsed * self.rate)
self.last_refill = now
if self.tokens >= 1:
self.tokens -= 1
return 0.0
else:
wait_time = (1 - self.tokens) / self.rate
self.tokens = 0.0
return wait_time
def adjust_rate(self, new_rate: float):
"""Адаптируем скорость в ответ на 429."""
self.rate = new_rate * 0.9 # срезаем 10% для запаса
⚠️ Ошибка: многие делают bucket.rate -= decrease_factor каждый раз при 429. Не делайте так. Лучше храните историю последних N ответов и вычисляйте скользящее среднее скорости. Резкое падение rate вызовет голодовку агентов, а потом внезапный всплеск — снова 429.
Адаптация работает так: после каждого успешного запроса слегка увеличиваем rate (если не было 429), после 429 — уменьшаем на фиксированный процент. Храним скользящее окно последних 10 ответов для сглаживания. Burst capacity мы не трогаем — пусть остается в начале, чтобы быстро обработать всплеск.
Паттерн #2: Distributed Rate Limiter на Redis (Sliding Window + Lua)
Когда агенты работают на разных нодах (Kubernetes, несколько машин), локальный bucket не работает. Нужен общий счетчик. Redis + Lua — золотой стандарт.
Sliding Window без гонок данных. Lua-скрипт выполняется атомарно — никаких race conditions между агентами.
2 Скрипт Redis
-- KEYS[1] = key, ARGV[1] = limit, ARGV[2] = window_ms
local now = redis.call('TIME')[1]
local window_start = now - ARGV[2]/1000
redis.call('ZREMRANGEBYSCORE', KEYS[1], 0, window_start)
local count = redis.call('ZCARD', KEYS[1])
if count < tonumber(ARGV[1]) then
redis.call('ZADD', KEYS[1], now, now .. ':' .. math.random())
redis.call('EXPIRE', KEYS[1], ARGV[2]/1000 + 1)
return 1
else
return 0
end
А с клиентской стороны добавляем джиттер перед повторной попыткой — обязательно. Без джиттера агенты на разных нодах снова синхронизируются.
import random
async def wait_with_jitter(base_ms: int):
jitter = random.uniform(0, base_ms * 0.5)
await asyncio.sleep((base_ms + jitter) / 1000)
В разборе проблем мультиагентных систем по ICLR 2026 показано, что именно отсутствие распределенного лимитера — причина 70% падений в масштабных пайплайнах.
Паттерн #3: Priority Queue с весами агентов
Не все запросы равны. Есть критические — их нельзя ронять. Фоновые аналитические агенты могут подождать. Решение — assign priority каждому запросу и обслуживать очередь с весами.
Пример: у тебя агент-оркестратор (приоритет 5), агент-исследователь (3), агент-саммаризатор (1). Когда лимит на исходе, низкоприоритетные запросы блокируются первыми. Ты получаешь graceful degradation, а не полный фейл.
from dataclasses import dataclass
from heapq import heappush, heappop
@dataclass
class Request:
priority: int
timestamp: float
payload: any
def __lt__(self, other):
# Чем выше priority, тем раньше
return (other.priority, self.timestamp) < (self.priority, other.timestamp)
class PriorityScheduler:
def __init__(self, bucket: AdaptiveTokenBucket):
self.queue = []
self.bucket = bucket
async def submit(self, req: Request):
wait = await self.bucket.acquire()
if wait > 2.0 and req.priority < 2:
raise Exception("Rate limited, low priority dropped")
await asyncio.sleep(wait)
# выполнить запрос
Критично: не роняй запросы молча. Верни агенту статус QUEUED или DEFERRED. Пусть он решит — повторить позже или переключиться на другого провайдера.
Паттерн #4: Circuit Breaker с полу-открытым состоянием
Если провайдер начал сыпать 429, дергать его снова — только ухудшать ситуацию. Circuit Breaker отключает запросы на время, давая бэкенду остыть. В отличие от простого ретрая, здесь три состояния: Closed (работает), Open (все запросы отвергаются мгновенно), Half-Open (пробный запрос).
Я использую библиотеку pybreaker (версия 2.1.5) с кастомным детектором 429.
from pybreaker import CircuitBreaker, CircuitBreakerError
class TooManyRequests(Exception):
pass
breaker = CircuitBreaker(
fail_max=5,
reset_timeout=30,
exclude=[TooManyRequests] # 429 не считаем fail для breaker? Нет, считаем!
)
# Исправленный детектор:
breaker = CircuitBreaker(
fail_max=3,
reset_timeout=45,
state_machine={
'closed': {'fail': 'open'},
'open': {'success': 'half-open', 'fail': 'open'},
'half-open': {'success': 'closed', 'fail': 'open'}
}
)
💡 Важно: timeout в Open состоянии должен быть не случайным, а вычисленным на основе заголовка Retry-After из ответа 429. Провайдеры в 2026 году почти всегда отдают этот заголовок. Если игнорировать — можно открыть цепь слишком рано и словить новый 429.
Примерно так же устроен механизм Circuit Breaker в фреймворке Squad AI — он автоматически парсит Retry-After и переводится в Half-Open на правильное время. Рекомендую посмотреть их реализацию, если пишешь свою.
Паттерн #5: Adaptive Throttling на основе ответов (Backoff с джиттером)
Это уже не выбор паттерна, а маст-хэв. Делай экспоненциальный бэк-офф обязательно с джиттером и только на основании HTTP-кода 429. Если получил 500 — не ретрай сразу, дай Circuit Breaker решить.
Формула, которую я использую:
import random
import asyncio
def compute_delay(attempt: int, base_ms: float = 1000, max_ms: float = 60000):
delay = min(base_ms * (2 ** attempt) + random.uniform(0, base_ms * 2), max_ms)
return delay / 1000
async def retry_with_backoff(coro, max_attempts=5):
for i in range(max_attempts):
try:
return await coro
except Exception as e:
if hasattr(e, 'status_code') and e.status_code == 429:
delay = compute_delay(i)
await asyncio.sleep(delay)
else:
raise
raise Exception("Max retries exceeded")
Грабли: если 429 приходит с Retry-After: 120, не надо домножать на экспоненту. Используй явно указанное время. У меня был случай: провайдер вернул 429 с Retry-After=10, а библиотека умножила на 2^3=8 секунд, итог 80 секунд — агент ждал почти полторы минуты вместо 10 секунд. Тупняк.
В статье об отладке глубоких агентов я описывал кейс, когда агент зависал на 5 минут из-за неправильного бэк-оффа. После добавления джиттера и парсинга Retry-After — проблема ушла.
Паттерн #6: Multi-API Provider Fallback с health check
Лучшая защита от 429 — не быть привязанным к одному провайдеру. Если OpenAI лимитит — шли запросы в Anthropic или Google. Реализуй роутер с health check'ами.
Я храню в Redis информацию о последнем статусе каждого провайдера (OK, DEGRADED, DOWN). Агенты сначала шлют запрос в провайдера с наименьшей загрузкой.
3 Простейший роутер
class ProviderRouter:
def __init__(self):
self.providers = {
'openai': {'base_url': 'https://api.openai.com/v1', 'status': 'OK'},
'anthropic': {'base_url': 'https://api.anthropic.com/v1', 'status': 'OK'},
'google': {'base_url': 'https://generativelanguage.googleapis.com/v1', 'status': 'OK'},
}
self.lock = asyncio.Lock()
async def get_available(self):
async with self.lock:
ok = [p for p, s in self.providers.items() if s['status'] == 'OK']
return ok
async def mark_failure(self, provider: str):
async with self.lock:
self.providers[provider]['status'] = 'DEGRADED'
# Запланировать health check через N секунд
async def health_check(self, provider: str):
# Сделать тестовый запрос (можно с маленьким промптом)
pass
Важно: не переключай провайдера слишком часто — может быть дороже (разные модели, разные цены). Используй fallback только после 2-3 последовательных 429 у одного провайдера. Иначе ты просто перекинешь проблему на другой API.
Как НЕ надо делать: типовой ад
Одна команда сделала мультиагентку, где каждый агент имел свой отдельный API-ключ от OpenAI (купили 50 ключей). Думали — обойдем лимиты. Но лимиты действуют на уровне аккаунта, а не ключа. Все 50 ключей одного пользователя суммируются в общий лимит. Получили 429 по всем ключам одновременно.
Второй пример: использование time.sleep() в синхронном коде под asyncio. Блокирует весь event loop, другие агенты тоже зависают. Используй asyncio.sleep() или trio.sleep().
Третий: игнорирование лимитов на входящие токены (для LLM). OpenAI считает лимит по RPM и TPM. Если агенты гоняют большие промпты — можно упереться в TPM раньше, чем в RPM. Считай и то, и другое.
Четвертый: не обновлять bucket в реальном времени. Если ты используешь фиксированный Token Bucket с rate = 1000 запросов в секунду, а провайдер понизил лимит до 500 — твои агенты будут валиться, пока ты не перезапустишь bucket. Делай динамическое обнаружение лимитов через заголовки X-RateLimit-Remaining и X-RateLimit-Reset.
Бонус: архитектура на Rust для максимальной производительности
Если у тебя >1000 агентов, Python может не справиться с сетевым I/O. Перепиши critical path на Rust: tokio + reqwest + bb8 (пул соединений) + redis. Примерный скелет тормозного механизма:
use std::sync::Arc;
use tokio::sync::Mutex;
use std::time::{Duration, Instant};
struct LeakyBucket {
capacity: u32,
tokens: f64,
leak_rate: f64,
last_update: Instant,
}
impl LeakyBucket {
async fn acquire(&mut self) -> Duration {
let now = Instant::now();
let elapsed = now.duration_since(self.last_update).as_secs_f64();
self.tokens = (self.tokens + elapsed * self.leak_rate).min(self.capacity as f64);
self.last_update = now;
if self.tokens >= 1.0 {
self.tokens -= 1.0;
Duration::ZERO
} else {
Duration::from_secs_f64((1.0 - self.tokens) / self.leak_rate)
}
}
}
На практике Rust дает +30-50% пропускной способности при том же количестве ядер (из-за меньшего GC и нулевых накладных расходов на asyncio). Я переписал core rate-limiter на Rust и обернул в PyO3 — получил 2-кратный прирост в агентском пайплайне.
Последний совет: никогда не используй дефолтные библиотеки ретраев. Пиши кастомный слой с учетом динамики провайдера. И обязательно тестируй с эмуляцией 429 — например, с помощью WireMock или VCR-записей. Иначе в production ты узнаешь, что твой 6-й ретрай убил все агенты.
Удачи. Пусть 429 будет только в коде, а не в продакшене.