6 паттернов управления API-квотой для мультиагентных систем: 429 ошибки | AiManual
AiManual Logo Ai / Manual.
15 Июн 2026 Гайд

6 паттернов для управления API-квотой в мультиагентных системах: как избежать 429 ошибок

Избегайте 429 Too Many Requests в мультиагентных системах. 6 проверенных паттернов: Token Bucket, Rate Limiter на Redis, Circuit Breaker, Adaptive Throttling и

Реклама
cliv2

Ты запустил 50 агентов. Каждый агент дергает GPT-5, Claude Opus 4.5, Gemini Ultra 2. Через 3 секунды — братан, 429 Too Many Requests. Все агенты в ступоре, задача валится, логи пестрят красным. Знакомо?

Я сам прошел этот ад. В статье про оркестрацию без роутинга мы обсуждали, как легко сломать систему неправильным распределением задач. Но даже идеальная архитектура разобьется о суровый rate limit провайдера. OpenAI дает Tier 5 до 10 000 RPM, Anthropic — до 4 000 RPM на Claude Opus 4.5, Google Gemini Ultra 2 — 6 000 RPM. И это раздельные квоты! А если у тебя 100 агентов — ты привысишь лимит за секунду.

Почему стандартные ретраи не спасут

Большинство библиотек типа tenacity или backoff тупо ждут. Они не знают про общий пул запросов. Если 50 агентов одновременно увидят 429 — они дружно отвалятся, подождут одинаковое время и снова ударят в тот же момент. Это называется Thundering Herd. Лимит провайдера не восстановится, ты получишь новый 429, и так до бесконечности. Твоя мультиагентка превращается в тормозного червя.

В разборе типовых ошибок AI-агентов я уже упоминал, что 80% падений в production связаны с плохим управлением ресурсами. Rate limiting — та же история.

Хорошо, хватит теории. Вот 6 паттернов, которые я выстрадал на проектах с 500+ агентами. Каждый с кодом, граблями и объяснением «почему».

Паттерн #1: Token Bucket с адаптивным наполнением

Базовая идея — ты кладешь токены в ведро с постоянной скоростью. Агенты забирают токен перед запросом. Если ведро пусто — ждут. Но есть нюанс: скорость наполнения не должна быть константой. Провайдеры часто дают burst capacity (всплеск), а затем режут. Нужно адаптироваться.

1 Реализуем адаптивный Token Bucket на Python

import asyncio
from time import time

class AdaptiveTokenBucket:
    def __init__(self, initial_rate: float, max_tokens: int):
        self.rate = initial_rate  # токенов в секунду
        self.max_tokens = max_tokens
        self.tokens = max_tokens
        self.last_refill = time()
        self.lock = asyncio.Lock()

    async def acquire(self) -> float:
        """Возвращает время ожидания в секундах."""
        async with self.lock:
            now = time()
            elapsed = now - self.last_refill
            self.tokens = min(self.max_tokens, self.tokens + elapsed * self.rate)
            self.last_refill = now
            if self.tokens >= 1:
                self.tokens -= 1
                return 0.0
            else:
                wait_time = (1 - self.tokens) / self.rate
                self.tokens = 0.0
                return wait_time

    def adjust_rate(self, new_rate: float):
        """Адаптируем скорость в ответ на 429."""
        self.rate = new_rate * 0.9  # срезаем 10% для запаса

⚠️ Ошибка: многие делают bucket.rate -= decrease_factor каждый раз при 429. Не делайте так. Лучше храните историю последних N ответов и вычисляйте скользящее среднее скорости. Резкое падение rate вызовет голодовку агентов, а потом внезапный всплеск — снова 429.

Адаптация работает так: после каждого успешного запроса слегка увеличиваем rate (если не было 429), после 429 — уменьшаем на фиксированный процент. Храним скользящее окно последних 10 ответов для сглаживания. Burst capacity мы не трогаем — пусть остается в начале, чтобы быстро обработать всплеск.

Паттерн #2: Distributed Rate Limiter на Redis (Sliding Window + Lua)

Когда агенты работают на разных нодах (Kubernetes, несколько машин), локальный bucket не работает. Нужен общий счетчик. Redis + Lua — золотой стандарт.

Sliding Window без гонок данных. Lua-скрипт выполняется атомарно — никаких race conditions между агентами.

2 Скрипт Redis

-- KEYS[1] = key, ARGV[1] = limit, ARGV[2] = window_ms
local now = redis.call('TIME')[1]
local window_start = now - ARGV[2]/1000
redis.call('ZREMRANGEBYSCORE', KEYS[1], 0, window_start)
local count = redis.call('ZCARD', KEYS[1])
if count < tonumber(ARGV[1]) then
    redis.call('ZADD', KEYS[1], now, now .. ':' .. math.random())
    redis.call('EXPIRE', KEYS[1], ARGV[2]/1000 + 1)
    return 1
else
    return 0
end

А с клиентской стороны добавляем джиттер перед повторной попыткой — обязательно. Без джиттера агенты на разных нодах снова синхронизируются.

import random

async def wait_with_jitter(base_ms: int):
    jitter = random.uniform(0, base_ms * 0.5)
    await asyncio.sleep((base_ms + jitter) / 1000)

В разборе проблем мультиагентных систем по ICLR 2026 показано, что именно отсутствие распределенного лимитера — причина 70% падений в масштабных пайплайнах.

Паттерн #3: Priority Queue с весами агентов

Не все запросы равны. Есть критические — их нельзя ронять. Фоновые аналитические агенты могут подождать. Решение — assign priority каждому запросу и обслуживать очередь с весами.

Пример: у тебя агент-оркестратор (приоритет 5), агент-исследователь (3), агент-саммаризатор (1). Когда лимит на исходе, низкоприоритетные запросы блокируются первыми. Ты получаешь graceful degradation, а не полный фейл.

from dataclasses import dataclass
from heapq import heappush, heappop

@dataclass
class Request:
    priority: int
    timestamp: float
    payload: any

    def __lt__(self, other):
        # Чем выше priority, тем раньше
        return (other.priority, self.timestamp) < (self.priority, other.timestamp)

class PriorityScheduler:
    def __init__(self, bucket: AdaptiveTokenBucket):
        self.queue = []
        self.bucket = bucket

    async def submit(self, req: Request):
        wait = await self.bucket.acquire()
        if wait > 2.0 and req.priority < 2:
            raise Exception("Rate limited, low priority dropped")
        await asyncio.sleep(wait)
        # выполнить запрос

Критично: не роняй запросы молча. Верни агенту статус QUEUED или DEFERRED. Пусть он решит — повторить позже или переключиться на другого провайдера.

Паттерн #4: Circuit Breaker с полу-открытым состоянием

Если провайдер начал сыпать 429, дергать его снова — только ухудшать ситуацию. Circuit Breaker отключает запросы на время, давая бэкенду остыть. В отличие от простого ретрая, здесь три состояния: Closed (работает), Open (все запросы отвергаются мгновенно), Half-Open (пробный запрос).

Я использую библиотеку pybreaker (версия 2.1.5) с кастомным детектором 429.

from pybreaker import CircuitBreaker, CircuitBreakerError

class TooManyRequests(Exception):
    pass

breaker = CircuitBreaker(
    fail_max=5,
    reset_timeout=30,
    exclude=[TooManyRequests]  # 429 не считаем fail для breaker? Нет, считаем!
)

# Исправленный детектор:
breaker = CircuitBreaker(
    fail_max=3,
    reset_timeout=45,
    state_machine={
        'closed': {'fail': 'open'},
        'open': {'success': 'half-open', 'fail': 'open'},
        'half-open': {'success': 'closed', 'fail': 'open'}
    }
)

💡 Важно: timeout в Open состоянии должен быть не случайным, а вычисленным на основе заголовка Retry-After из ответа 429. Провайдеры в 2026 году почти всегда отдают этот заголовок. Если игнорировать — можно открыть цепь слишком рано и словить новый 429.

Примерно так же устроен механизм Circuit Breaker в фреймворке Squad AI — он автоматически парсит Retry-After и переводится в Half-Open на правильное время. Рекомендую посмотреть их реализацию, если пишешь свою.

Паттерн #5: Adaptive Throttling на основе ответов (Backoff с джиттером)

Это уже не выбор паттерна, а маст-хэв. Делай экспоненциальный бэк-офф обязательно с джиттером и только на основании HTTP-кода 429. Если получил 500 — не ретрай сразу, дай Circuit Breaker решить.

Формула, которую я использую:

import random
import asyncio

def compute_delay(attempt: int, base_ms: float = 1000, max_ms: float = 60000):
    delay = min(base_ms * (2 ** attempt) + random.uniform(0, base_ms * 2), max_ms)
    return delay / 1000

async def retry_with_backoff(coro, max_attempts=5):
    for i in range(max_attempts):
        try:
            return await coro
        except Exception as e:
            if hasattr(e, 'status_code') and e.status_code == 429:
                delay = compute_delay(i)
                await asyncio.sleep(delay)
            else:
                raise
    raise Exception("Max retries exceeded")

Грабли: если 429 приходит с Retry-After: 120, не надо домножать на экспоненту. Используй явно указанное время. У меня был случай: провайдер вернул 429 с Retry-After=10, а библиотека умножила на 2^3=8 секунд, итог 80 секунд — агент ждал почти полторы минуты вместо 10 секунд. Тупняк.

В статье об отладке глубоких агентов я описывал кейс, когда агент зависал на 5 минут из-за неправильного бэк-оффа. После добавления джиттера и парсинга Retry-After — проблема ушла.

Паттерн #6: Multi-API Provider Fallback с health check

Лучшая защита от 429 — не быть привязанным к одному провайдеру. Если OpenAI лимитит — шли запросы в Anthropic или Google. Реализуй роутер с health check'ами.

Я храню в Redis информацию о последнем статусе каждого провайдера (OK, DEGRADED, DOWN). Агенты сначала шлют запрос в провайдера с наименьшей загрузкой.

3 Простейший роутер

class ProviderRouter:
    def __init__(self):
        self.providers = {
            'openai': {'base_url': 'https://api.openai.com/v1', 'status': 'OK'},
            'anthropic': {'base_url': 'https://api.anthropic.com/v1', 'status': 'OK'},
            'google': {'base_url': 'https://generativelanguage.googleapis.com/v1', 'status': 'OK'},
        }
        self.lock = asyncio.Lock()

    async def get_available(self):
        async with self.lock:
            ok = [p for p, s in self.providers.items() if s['status'] == 'OK']
            return ok

    async def mark_failure(self, provider: str):
        async with self.lock:
            self.providers[provider]['status'] = 'DEGRADED'
            # Запланировать health check через N секунд

    async def health_check(self, provider: str):
        # Сделать тестовый запрос (можно с маленьким промптом)
        pass

Важно: не переключай провайдера слишком часто — может быть дороже (разные модели, разные цены). Используй fallback только после 2-3 последовательных 429 у одного провайдера. Иначе ты просто перекинешь проблему на другой API.

Как НЕ надо делать: типовой ад

Одна команда сделала мультиагентку, где каждый агент имел свой отдельный API-ключ от OpenAI (купили 50 ключей). Думали — обойдем лимиты. Но лимиты действуют на уровне аккаунта, а не ключа. Все 50 ключей одного пользователя суммируются в общий лимит. Получили 429 по всем ключам одновременно.

Второй пример: использование time.sleep() в синхронном коде под asyncio. Блокирует весь event loop, другие агенты тоже зависают. Используй asyncio.sleep() или trio.sleep().

Третий: игнорирование лимитов на входящие токены (для LLM). OpenAI считает лимит по RPM и TPM. Если агенты гоняют большие промпты — можно упереться в TPM раньше, чем в RPM. Считай и то, и другое.

Четвертый: не обновлять bucket в реальном времени. Если ты используешь фиксированный Token Bucket с rate = 1000 запросов в секунду, а провайдер понизил лимит до 500 — твои агенты будут валиться, пока ты не перезапустишь bucket. Делай динамическое обнаружение лимитов через заголовки X-RateLimit-Remaining и X-RateLimit-Reset.

Бонус: архитектура на Rust для максимальной производительности

Если у тебя >1000 агентов, Python может не справиться с сетевым I/O. Перепиши critical path на Rust: tokio + reqwest + bb8 (пул соединений) + redis. Примерный скелет тормозного механизма:

use std::sync::Arc;
use tokio::sync::Mutex;
use std::time::{Duration, Instant};

struct LeakyBucket {
    capacity: u32,
    tokens: f64,
    leak_rate: f64,
    last_update: Instant,
}

impl LeakyBucket {
    async fn acquire(&mut self) -> Duration {
        let now = Instant::now();
        let elapsed = now.duration_since(self.last_update).as_secs_f64();
        self.tokens = (self.tokens + elapsed * self.leak_rate).min(self.capacity as f64);
        self.last_update = now;
        if self.tokens >= 1.0 {
            self.tokens -= 1.0;
            Duration::ZERO
        } else {
            Duration::from_secs_f64((1.0 - self.tokens) / self.leak_rate)
        }
    }
}

На практике Rust дает +30-50% пропускной способности при том же количестве ядер (из-за меньшего GC и нулевых накладных расходов на asyncio). Я переписал core rate-limiter на Rust и обернул в PyO3 — получил 2-кратный прирост в агентском пайплайне.

Последний совет: никогда не используй дефолтные библиотеки ретраев. Пиши кастомный слой с учетом динамики провайдера. И обязательно тестируй с эмуляцией 429 — например, с помощью WireMock или VCR-записей. Иначе в production ты узнаешь, что твой 6-й ретрай убил все агенты.

Удачи. Пусть 429 будет только в коде, а не в продакшене.

Подписаться на канал