- BrainTools - https://www.braintools.ru -
Продолжение статьи https://habr.com/ru/articles/1028290/ [1]
Мы поняли, что скелетом агента является долговременное состояние (durable state). Именно оно должно позволить ответить на скучные, но жизненно важные вопросы: какой ход активен, какой шаг уже выполнен, какой job держит lease, какой файл был исходным, какой результат можно выдать пользователю, какое подтверждение еще действительно.
В первой части мы разложили durable state на ход агента, шаг плана и событие. У нас появились такие сущности, как AgentTurn, AgentPlanItem, AgentEvent, и агент уже перестает быть нервным генератором текста, который живет ровно до первого рестарта процесса.
Но трех таблиц мало. Нужны еще разрешения, состояние диалоги/сессии, состояние проекта, фоновые задачи, механизм обработки фоновых задач (lease), счетчик и политика повторов, закладка событий (event cursor) и санитарная обработка payload-ов (payload sanitizer).
Минимальный набор первой части можно расширить так:
|
Сущность |
Что хранит |
Зачем нужна |
|
ApprovalGrant |
Выданное пользователем разрешение |
Не спрашивать повторно одно и то же действие в рамках допустимого scope |
|
SessionContext |
Активный turn, профиль агента, краткую историю, pending approval |
Восстановить диалог и текущую сцену сессии |
|
ProjectContext |
Активный проект, файлы, настройки, текущую операцию |
Не дать двум тяжелым операциям одновременно менять один проект |
|
BackgroundJob |
Длинную операцию вне HTTP-запроса |
Например, парсинг, workbook-операции, retry, progress, cancellation |
|
WorkerHeartbeat |
Присутствие и занятость исполнителя |
Отличить долгую работу от умершего worker-а |
|
Durable payload policy |
Правила сохранения payload-ов |
Не складывать base64, секреты и гигантские строки в event log |
Approval – это юридически важная запись о том, что пользователь разрешил действие с конкретным scope. Если подтверждение живет только в памяти [2] процесса, то после рестарта агент снова спросит то же самое или, что хуже, решит продолжить без понятного основания.
Разрешения привязаны к session_id, project_id, tool_name, mode, scope и expires_at. Это правильная форма: подтверждение не становится вечным. Пользователь мог разрешить править файлы на этом проекте, но это не значит, что агент получил право трогать все проекты, все файлы и все будущие операции.
Хороший approval grant должен быть узким. В идеале scope описывает не человеческую фразу «можно», а машинно проверяемые границы: project_id, tool_name, режим only_missing, срок действия. Тогда executor может принять решение без повторного похода к LLM.
Класс AprrovalGrant, она же таблица для хранения разрешений (прав доступа) на выполнение определенных действий или использование инструментов в какой-то системе, может выглядеть следующим образом
class ApprovalGrant(Base):
__tablename__ = "approval_grants"
grant_id: Mapped[uuid.UUID] = mapped_column(primary_key=True, default=uuid.uuid4)
session_id: Mapped[str] = mapped_column(String(200), index=True)
project_id: Mapped[uuid.UUID | None] = mapped_column(nullable=True, index=True)
tool_name: Mapped[str] = mapped_column(String(120), index=True)
mode: Mapped[str] = mapped_column(String(40), index=True)
scope: Mapped[dict] = mapped_column(JSON, default=dict)
reason: Mapped[str | None] = mapped_column(Text, nullable=True)
expires_at: Mapped[datetime | None] = mapped_column(nullable=True, index=True)
created_at: Mapped[datetime] = mapped_column(default=datetime.utcnow)
Разбор полей (колонок):
grant_id: Уникальный идентификатор каждой записи (UUID). Генерируется автоматически.
session_id: ID сессии пользователя. Позволяет понять, в рамках какого сеанса выдано разрешение.
project_id: Связь с конкретным проектом (может быть пустым).
tool_name: Название инструмента, к которому запрашивается доступ (например, “база_данных”, “отправка_email”).
mode: Режим доступа (например, “чтение”, “запись” или “админ”).
scope: Дополнительные параметры в формате JSON. Позволяет хранить сложные настройки доступа в виде словаря.
reason: Текстовое описание того, зачем это разрешение было выдано.
expires_at: Срок годности разрешения. Если время вышло, доступ аннулируется.
created_at: Время создания записи (автоматически ставится текущее время UTC).
SessionContext — это durable-состояние диалога. Не transcript, не полный лог сообщений и не “вся память агента”, а компактная техническая карточка текущей сессии.
Если AgentTurn отвечает на вопрос “какой запрос сейчас выполняется”, то SessionContext отвечает на вопрос “в какой сцене находится пользователь и агент”.
Например:
какой turn_id сейчас активен;
есть ли незавершенное подтверждение;
какой проект открыт;
какой профиль агента выбран;
какой краткий summary уже построен;
с какого события UI нужно продолжить чтение после reconnect;
какие операции сейчас нельзя запускать параллельно.
То есть SessionContext нужен не для философской “памяти”, а для скучной инженерной магии: закрыли вкладку, обновили страницу, перезапустили backend, worker умер, пользователь вернулся через час — и система все еще понимает, что происходит.
Примерная структура:
class SessionContext(Base):
__tablename__ = "session_contexts"
session_id: Mapped[str] = mapped_column(String(200), primary_key=True)
user_id: Mapped[uuid.UUID | None] = mapped_column(nullable=True, index=True)
project_id: Mapped[uuid.UUID | None] = mapped_column(nullable=True, index=True)
active_turn_id: Mapped[uuid.UUID | None] = mapped_column(nullable=True, index=True)
active_job_id: Mapped[uuid.UUID | None] = mapped_column(nullable=True, index=True)
status: Mapped[str] = mapped_column(String(40), default="idle", index=True)
agent_profile: Mapped[str] = mapped_column(String(80), default="default")
summary: Mapped[str | None] = mapped_column(Text, nullable=True)
pending_approval: Mapped[dict | None] = mapped_column(JSON, nullable=True)
event_cursor: Mapped[int] = mapped_column(default=0)
context_version: Mapped[int] = mapped_column(default=1)
last_user_message_at: Mapped[datetime | None] = mapped_column(nullable=True)
last_agent_event_at: Mapped[datetime | None] = mapped_column(nullable=True)
created_at: Mapped[datetime] = mapped_column(default=datetime.utcnow)
updated_at: Mapped[datetime] = mapped_column(
default=datetime.utcnow,
onupdate=datetime.utcnow,
)
|
Поле |
Что хранит |
Зачем нужно |
|---|---|---|
|
session_id |
ID сессии |
Главный ключ состояния диалога |
|
user_id |
Пользователь |
Изоляция сессий и прав |
|
active_project_id |
Текущий проект |
Понимать рабочий контекст |
|
active_turn_id |
Текущий ход агента |
Восстановить незавершенный turn |
|
agent_profile |
Режим/персона/настройки агента |
Например, “код-ревьюер”, “переводчик”, “аналитик” |
|
summary |
Сжатая история |
Не тащить весь transcript в каждый prompt |
|
pending_approval |
Ожидаемое подтверждение |
Не потерять confirm после рестарта |
|
event_cursor |
Последнее доставленное событие |
Догнать UI после reconnect |
|
status |
active, waiting_user, running, idle |
Быстро понять состояние сессии |
|
updated_at |
Время обновления |
Отладка, TTL, чистка старых сессий |
Важно: SessionContext не должен превращаться в помойку. В него не надо складывать весь prompt, все ответы модели, base64 файлов и простыню traceback-ов. Для этого есть AgentEvent, файловое хранилище, blob storage и отдельные job-таблицы.
Хороший SessionContext маленький, скучный и восстанавливаемый.
Еще одна сущность, которая быстро становится необходимой, — ProjectContext.
Сессия отвечает за диалог. Проект отвечает за рабочую область.
Пользователь может открыть один проект в нескольких вкладках,, потом еще что-то сделать, потом нажать “повторить”. Если система не хранит durable-состояние проекта, два job-а могут одновременно начать менять одни и те же файлы.
И получится не AI-agent, а кибердеревенский комбайн, который одной рукой чинит забор, второй рукой уже его сносит.
сlass ProjectContext(Base):
__tablename__ = "project_contexts"
project_id: Mapped[uuid.UUID] = mapped_column(primary_key=True)
owner_id: Mapped[uuid.UUID | None] = mapped_column(nullable=True, index=True)
active_operation_id: Mapped[uuid.UUID | None] = mapped_column(nullable=True, index=True)
operation_lock: Mapped[dict | None] = mapped_column(JSON, nullable=True)
latest_output_file_id: Mapped[uuid.UUID | None] = mapped_column(nullable=True, index=True)
settings: Mapped[dict | None] = mapped_column(JSON, nullable=True)
status: Mapped[str] = mapped_column(String(40), default="idle", index=True)
created_at: Mapped[datetime] = mapped_column(default=datetime.utcnow)
updated_at: Mapped[datetime] = mapped_column(
default=datetime.utcnow,
onupdate=datetime.utcnow,
)
ProjectContext хранит:
|
Поле |
Что хранит |
|---|---|
|
project_id |
ID проекта |
|
owner_id |
Владелец |
|
active_operation_id |
Текущая тяжелая операция |
|
operation_lock |
Мягкая блокировка проекта |
|
latest_output_file_id |
Последний результат |
|
settings |
Настройки проекта |
|
status |
idle, processing, needs_review, failed |
|
updated_at |
Последнее изменение |
Это не обязательно жесткий database lock. Чаще достаточно прикладной блокировки: “в этом проекте уже идет операция типа workbook_write, вторую такую же не запускаем”.
Например, можно разрешить читать файл и строить preview, но запретить одновременно две операции, которые пишут результат в один и тот же output slot.
Если операция может занять больше пары секунд, она должна стать job-ом.
HTTP-запрос может принять задачу, проверить права, создать AgentTurn, положить BackgroundJob в очередь и вернуть пользователю состояние: “задача принята”. А дальше работает worker.
Класс джоб будет таким
class Job(Base): tablename = “jobs”
class Job(Base):
__tablename__ = "operation_jobs"
job_id: Mapped[uuid.UUID] = mapped_column(primary_key=True)
turn_id: Mapped[uuid.UUID | None] = mapped_column(nullable=True, index=True)
project_id: Mapped[uuid.UUID | None] = mapped_column(nullable=True, index=True)
type: Mapped[str] = mapped_column(String(80), index=True)
status: Mapped[str] = mapped_column(String(40), default="queued", index=True)
attempt: Mapped[int] = mapped_column(default=0)
max_attempts: Mapped[int] = mapped_column(default=3)
next_attempt_at: Mapped[datetime | None] = mapped_column(nullable=True, index=True)
lease_owner: Mapped[str | None] = mapped_column(String(200), nullable=True, index=True)
lease_expires_at: Mapped[datetime | None] = mapped_column(nullable=True, index=True)
progress_seq: Mapped[int] = mapped_column(default=0)
input: Mapped[dict | None] = mapped_column(JSON, nullable=True)
output: Mapped[dict | None] = mapped_column(JSON, nullable=True)
error: Mapped[dict | None] = mapped_column(JSON, nullable=True)
created_at: Mapped[datetime] = mapped_column(default=datetime.utcnow)
updated_at: Mapped[datetime] = mapped_column(
default=datetime.utcnow,
onupdate=datetime.utcnow,
)
|
Поле |
Что хранит |
|---|---|
|
job_id |
ID задачи |
|
turn_id |
К какому turn относится |
|
project_id |
В каком проекте выполняется |
|
type |
translate, parse_workbook, render, export |
|
status |
queued, running, completed, failed, cancelled |
|
attempt |
Номер попытки |
|
max_attempts |
Лимит повторов |
|
next_attempt_at |
Когда можно retry |
|
lease_owner |
Какой worker забрал задачу |
|
lease_expires_at |
Когда lease протухает |
|
progress_seq |
Монотонный номер progress-события |
|
input |
Санитизированный input |
|
output |
Ссылка на результат |
|
error |
Классифицированная ошибка [3] |
Ключевой момент — worker не просто берет задачу. Он атомарно claim-ит ее:
UPDATE background_jobs SET status = 'running', lease_owner = :worker_id, lease_expires_at = :now + interval '2 minutes' WHERE job_id = :job_id AND status = 'queued' AND (next_attempt_at IS NULL OR next_attempt_at <= :now);
Если обновилась одна строка — worker владеет задачей. Если ноль строк — кто-то уже забрал.
Lease нужен, потому что worker может умереть. Не “вернуть ошибку”, не “аккуратно завершиться”, а просто исчезнуть. После истечения lease_expires_at другой worker может подобрать задачу и продолжить или перезапустить ее с учетом idempotency.
Live stream — это приятно, но websocket не является durable state.
Пользователь закрыл ноутбук, сеть моргнула, вкладка перезагрузилась. Если события жили только в памяти процесса, прогресс потерян. Поэтому UI должен читать события из AgentEvent по cursor-у.
Условный сценарий:
UI подписался на события turn-а.
Получил события до event_seq = 42.
Соединение оборвалось.
UI reconnect-ится и говорит: “дай события после 42”.
Backend читает durable event log и отдает 43, 44, 45….
Так интерфейс перестает зависеть от идеальной сети.
class EventCursor(Base):
__tablename__ = "event_cursors"
cursor_id: Mapped[uuid.UUID] = mapped_column(primary_key=True)
session_id: Mapped[str] = mapped_column(String(200), index=True)
turn_id: Mapped[uuid.UUID | None] = mapped_column(nullable=True, index=True)
consumer_id: Mapped[str] = mapped_column(String(200), index=True)
last_event_seq: Mapped[int] = mapped_column(BigInteger, default=0)
created_at: Mapped[datetime] = mapped_column(default=datetime.utcnow)
updated_at: Mapped[datetime] = mapped_column(
default=datetime.utcnow,
onupdate=datetime.utcnow,
)
__table_args__ = (
UniqueConstraint(
"session_id",
"turn_id",
"consumer_id",
name="uq_event_cursor_consumer",
),
)
event_seq лучше делать монотонным внутри turn_id или session_id. Не надо использовать только timestamp: у двух событий может быть одинаковое время, а порядок все равно важен.
Отдельно стоит прописать политику payload-ов.
Почти в каждом агенте рано или поздно появляется соблазн: “давайте просто положим весь JSON в event payload”. Через месяц в event log лежат base64-файлы, токены доступа, гигантские HTML-страницы, персональные данные и ответы модели на 300 килобайт.
Правило простое: durable event должен хранить факт, ссылку и короткий summary, а не весь мир.
Плохо:
{ "event_type": "file_processed", "payload": { "file_base64": "UEsDBBQAAAA...", "openai_api_key": "sk-...", "full_html": "<html>..." } }
Хорошо:
{ "event_type": "file_processed", "payload": { "file_id": "file_123", "mime_type": "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", "rows_count": 1842, "result_file_id": "file_456" } }
Payload sanitizer должен вырезать:
секреты;
base64 и бинарные данные;
слишком длинные строки;
полные prompt-ы без необходимости;
персональные данные, если они не нужны для восстановления процесса.
Идеально, если sanitizer применяется централизованно перед записью AgentEvent, а не “по договоренности между разработчиками”. Договоренность живет до первого пятничного hotfix-а.
Все это ложится в один централизованный слой перед добавлением AgentEvent
import json
import re
from typing import Any
SECRET_KEY_RE = re.compile(
r"(api[_-]?key|token|secret|password|authorization|cookie|access[_-]?token|refresh[_-]?token)",
re.IGNORECASE,
)
BASE64_RE = re.compile(r"^[A-Za-z0-9+/]+={0,2}$")
class EventPayloadSanitizer:
max_depth = 8
max_string_length = 2_000
max_list_items = 100
max_payload_bytes = 32_000
def sanitize(self, payload: dict[str, Any] | None) -> dict[str, Any] | None:
if payload is None:
return None
sanitized = self._sanitize_value(payload, depth=0)
encoded = json.dumps(sanitized, ensure_ascii=False, default=str)
if len(encoded.encode("utf-8")) > self.max_payload_bytes:
return {
"summary": "payload_too_large",
"original_size_bytes": len(encoded.encode("utf-8")),
}
return sanitized
def _sanitize_value(self, value: Any, depth: int) -> Any:
if depth > self.max_depth:
return "[redacted:max_depth]"
if isinstance(value, dict):
result = {}
for key, item in value.items():
key_str = str(key)
if SECRET_KEY_RE.search(key_str):
result[key_str] = "[redacted:secret]"
continue
result[key_str] = self._sanitize_value(item, depth + 1)
return result
if isinstance(value, list):
items = value[: self.max_list_items]
result = [self._sanitize_value(item, depth + 1) for item in items]
if len(value) > self.max_list_items:
result.append(f"[truncated:{len(value) - self.max_list_items}_items]")
return result
if isinstance(value, str):
return self._sanitize_string(value)
return value
def _sanitize_string(self, value: str) -> str:
if self._looks_like_base64(value):
return "[redacted:base64]"
if len(value) > self.max_string_length:
return value[: self.max_string_length] + f"...[truncated:{len(value)} chars]"
return value
def _looks_like_base64(self, value: str) -> bool:
compact = value.strip()
if len(compact) < 256:
return False
if len(compact) % 4 != 0:
return False
return bool(BASE64_RE.fullmatch(compact))
Соберем все вместе на сценарии: пользователь просит, например, обработать файл и применить проектные настройки.
API принимает запрос и нормализует session_id. Если есть client_turn_id или idempotency_key, система проверяет, не запускали ли такой turn раньше.
AgentService создает AgentTurn и строит TurnPlan. Шаги плана попадают в AgentPlanItem.
ApprovalService оценивает риск. Если нужен confirm, в SessionContext сохраняется pending_approval, а в agent_events появляется approval_requested.
Пользователь подтверждает. Система создает ApprovalGrant с ограниченным scope и снимает pending_approval.
Executor запускает шаг. Для долгой операции создается TranslateJob или WorkbookJob, а turn получает события tool_started и job_queued.
Worker атомарно claim-ит job, выставляет lease_owner и lease_expires_at, обновляет progress_seq и публикует progress.
UI читает live stream. Если вкладка закрылась, после reconnect он догоняет события из БД.
При успехе job получает completed и output_file_id, проект обновляет last_translated_file_id, turn получает финальное событие.
При retryable-ошибке job возвращается в queued с next_attempt_at. При terminal-ошибке сохраняются error и классификация отказа.
В коде контракт между слоями можно выразить вот так
class AgentRequestHandler:
def handle(self, request: AgentRequest) -> AgentTurn:
session_id = normalize_session_id(request.session_id)
with self.db.transaction():
existing_turn = self.turns.find_by_idempotency_key(
session_id=session_id,
idempotency_key=request.idempotency_key or request.client_turn_id,
)
if existing_turn is not None:
return existing_turn
turn = self.agent_service.create_turn(
session_id=session_id,
project_id=request.project_id,
user_input=request.input,
idempotency_key=request.idempotency_key,
client_turn_id=request.client_turn_id,
)
plan = self.agent_service.build_plan(turn)
self.agent_service.save_plan_items(turn, plan)
approval = self.approval_service.assess(turn, plan)
if approval.required:
self.sessions.set_pending_approval(
session_id=session_id,
approval=approval.to_pending_payload(),
)
self.events.write(
turn_id=turn.turn_id,
session_id=session_id,
type="approval_requested",
payload=approval.to_event_payload(),
)
return turn
self.executor.enqueue_ready_steps(turn, plan)
return turn
И важная часть дял воркера:
class JobWorker:
def run_once(self) -> None:
job = self.jobs.claim_next(
lease_owner=self.worker_id,
lease_seconds=60,
)
if job is None:
return
try:
self.events.write(
turn_id=job.turn_id,
job_id=job.job_id,
project_id=job.project_id,
type="job_started",
payload={"job_id": str(job.job_id), "type": job.type},
)
output = self.execute(job)
self.jobs.complete(job.job_id, output=output)
self.events.write(
turn_id=job.turn_id,
job_id=job.job_id,
project_id=job.project_id,
type="job_completed",
payload=output,
)
except RetryableJobError as exc:
self.jobs.schedule_retry(job.job_id, error=exc.to_payload())
except TerminalJobError as exc:
self.jobs.fail(job.job_id, error=exc.to_payload())
Телеграм канал автора [4], где он что‑то пишет про ML, NLP и разработку
Автор: kobubu
Источник [5]
Сайт-источник BrainTools: https://www.braintools.ru
Путь до страницы источника: https://www.braintools.ru/article/29929
URLs in this post:
[1] https://habr.com/ru/articles/1028290/: https://habr.com/ru/articles/1028290/
[2] памяти: http://www.braintools.ru/article/4140
[3] ошибка: http://www.braintools.ru/article/4192
[4] Телеграм канал автора: https://t.me/ML_Goose
[5] Источник: https://habr.com/ru/articles/1031440/?utm_source=habrahabr&utm_medium=rss&utm_campaign=1031440
Нажмите здесь для печати.