- BrainTools - https://www.braintools.ru -
Привет! Меня зовут Максим Морозов, я AI Project Manager в Битрикс24.
В предыдущей статье [1] я рассказывал о локальных нейросетях как безопасной и экономичной альтернативе облачным API. Сегодня — практический кейс, где мы применили этот подход в реальном проекте.
Главная идея этой работы: вместо дообучения (Fine-Tuning) нейросети на своих данных — что долго, дорого и требует поддержки датасета — мы используем штатную модель без дополнительного обучения [2]. Модель генерирует regex, а скрипт сохраняет эти правила и использует их автономно.
Я покажу архитектуру системы, где локальная LLM генерирует регулярные выражения для парсинга логов, экономя сотни часов ручной отладки. Все вычисления происходят внутри периметра компании, без отправки данных в облако.
У нас есть централизованный Syslog-сервер, куда стекаются данные со всех сервисов отдела:
Стандартные логи: Nginx, Apache, системные логи Linux;
Кастомный софт и микросервисы;
Самописные скрипты с собственными форматами вывода.
Вся эта информация агрегируется в Wazuh (Open Source SIEM) — система мониторинга безопасности, которая умеет реагировать [3] на инциденты. Но для этого Wazuh должен понимать структуру лога: где IP-адрес, где уровень критичности (Critical/Warning), где информационные сообщения.
Нативные XML-декодеры Wazuh
Чтобы Wazuh понял нестандартный лог, нужно написать XML-декодер с регулярным выражением внутри. Процесс выглядит так:
Анализируете формат нового лога;
Пишете XML-декодер с Regex;
Загружаете конфигурацию в Wazuh;
Перезапускаете сервис.
Основная проблема — отладка. Если в Regex есть ошибка [4]:
Wazuh не указывает строку с ошибкой;
Не объясняет характер проблемы.
Отладка превращается в итеративный процесс: копируете XML в сторонние валидаторы, проверяете Regex, видите что работает — вставляете в Wazuh, получаете ошибку. Когда у вас десятки сервисов с постоянно меняющимися форматами логов, поддержка SIEM становится значительной нагрузкой.
Пример простого XML декодера:
```xml
<!-- Простой декодер с предварительным matching -->
<decoder name="custom-app">
<prematch>^d{4}-d{2}-d{2} d{2}:d{2}:d{2}</prematch>
</decoder>
<!-- Дочерний декодер с извлечением полей -->
<decoder name="custom-app-fields">
<parent>custom-app</parent>
<regex>^(d{4}-d{2}-d{2}) (d{2}:d{2}:d{2}) (w+) (.+)$</regex>
<order>date, time, level, message</order>
</decoder>
```
Пример сложного декодера:

Каждый новый сервис — это новый XML файл, свои regex, и цикл отладки. Решение – один штатный json декодер.
Wazuh имеет встроенный JSON-декодер, который работает намного надежнее XML-декодеров.
<decoder name="json-msgraph">
<prematch>"integration":"ms-graph"</prematch>
<plugin_decoder>JSON_Decoder</plugin_decoder>
<json_null_field>discard</json_null_field>
</decoder>
<decoder name="json">
<prematch>^{s*"</prematch>
<plugin_decoder>JSON_Decoder</plugin_decoder>
</decoder>
Если подать в SIEM не сырую строку, а структурированный JSON:
```json
{
"timestamp": "2025-01-15",
"level": "Error",
"user": "admin",
"action": "failed login",
"src_ip": "192.168.1.5"
}
…то Wazuh автоматически распарсит поля. Достаточно настроить правила вида: «Если поле level равно Error — создай алерт».
Мы вынесли логику [6] парсинга за пределы Wazuh. Теперь пайплайн выглядит так:
Syslog-сервер принимает сырые логи;
Python-скрипт (пре-парсер) перехватывает их на лету;
Скрипт применяет к строке регулярное выражение;
Если строку удалось распарсить – отдаем в Wazuh.
Проблема с декодерами решена, но осталась одна задача: нам нужны регулярные выражения для Python-скрипта, и их нужно писать под каждую новую сигнатуру лога. Именно здесь мы подключили локальную LLM.
Обычно, когда говорят про AI, представляют стойки с NVIDIA H100 или мощные GPU-серверы. Но для нашей задачи это избыточно. Мы использовали уже имеющийся Mac Mini на чипе M4 Pro и протестировали его на этой задаче.
Почему не облако?
Логи могут содержать IP-адреса, пути, логины. Отправлять эти данные в облако — нарушение контура безопасности. Локальная модель гарантирует, что данные не покидают периметр;
Облачные API стоят денег. Mac Mini потребляет 30–40 Вт — это копейки по сравнению с оплатой облачных API.
Софт и Модель
Среда исполнения: LM Studio или Ollama. Предоставляют API, совместимый с OpenAI;
Модель: мы используем собственную модель BitrixGPT, но подойдут и другие. Например GPT-OSS-20B — open-weight от OpenAI;
Производительность: ~70 токенов/сек на M4 Pro.
Таким образом, у нас появился бесплатный, приватный и быстрый сервис генерации.
Прежде чем переходить к алгоритму, рассмотрим архитектуру системы.
Компоненты
Drain3 Manager: Кластеризация лог-строк по шаблонам;
Regex Generator: LLM-генерация паттернов с валидацией и retry-механизмом;
Regex Parser: Применение проверенных паттернов к логам;
Stream Listener: Real-time обработка через TCP-сокет для syslog-ng;
Feedback Loop: Дообучение системы на нераспознанных строках;
Service Registry: Фильтрация и управление списком сервисов.
Full Pipeline (для пакетной обработки файлов):
Log File → Ingest (Drain3) → Signatures→
LLM Generation → Regex → Parse → Output
Stream Pipeline (для real-time обработки):
syslog-ng → TCP Socket → Drain3 clustering →
Parse with existing regex → OK or FAIL buffer →
[threshold reached] → Feedback to Drain3 → Generate new regex → Re-parse FAIL
Структура данных
**Signatures JSON** (выход Drain3):
```json
[
{
"cluster_id": "c2",
"size": 85,
"signature": "<DATE> <TIME> <LEVEL> Login failed user=<*> ip=<*>",
"examples": [
"2025-01-15 10:30:00 ERROR Login failed user=admin ip=192.168.1.5",
"2025-01-15 10:31:00 ERROR Login failed user=guest ip=10.0.0.1"
]
},
{
"cluster_id": "c3",
"size": 200,
"signature": "<DATE> <TIME> <LEVEL> Request processed by <*> in <*> ms",
"examples": [
"2025-01-15 10:30:00 INFO Request processed by worker-3 in 150 ms",
"2025-01-15 10:31:00 INFO Request processed by worker-1 in 89 ms"
]
}
]
Regex JSON (результат генерации):
[
{
"cluster_id": "c2",
"signature": "<DATE> <TIME> <LEVEL> Login failed user=<*> ip=<*>",
"size": 85,
"regex": "^(?P<date>\d{4}-\d{2}-\d{2}) (?P<time>\d{2}:\d{2}:\d{2}) (?P<level>\w+) Login failed user=(?P<user>\S+) ip=(?P<ip>[\d.]+)$",
"fields": ["date", "time", "level", "user", "ip"],
"validation": {
"valid": true,
"matched": 85,
"total": 85
}
},
{
"cluster_id": "c3",
"signature": "<DATE> <TIME> <LEVEL> Request processed by <*> in <*> ms",
"size": 200,
"regex": "^(?P<date>\d{4}-\d{2}-\d{2}) (?P<time>\d{2}:\d{2}:\d{2}) (?P<level>\w+) Request processed by (?P<worker>\S+) in (?P<duration>\d+) ms$",
"fields": ["date", "time", "level", "worker", "duration"],
"validation": {
"valid": true,
"matched": 200,
"total": 200
}
}
]
Output (финальный формат):
{"date": "2025-01-15", "time": "10:30:00", "level": "ERROR", "user": "admin", "ip": "192.168.1.5"}
{"date": "2025-01-15", "time": "10:31:00", "level": "ERROR", "user": "guest", "ip": "10.0.0.1"}
{"date": "2025-01-15", "time": "10:30:00", "level": "INFO", "worker": "worker-3", "duration": "150"}
{"date": "2025-01-15", "time": "10:31:00", "level": "INFO", "worker": "worker-1", "duration": "89"}
Система поддерживает два режима обработки логов:
1. Batch Mode (пакетная обработка):
Используется для обработки исторических данных или разового анализа логов.
Log File → Ingest (Drain3) → Signatures →
LLM Generation → Regex → Parse → Wazuh
2. Stream Mode (real-time обработка):
Используется для обработки логов в реальном времени.
syslog-ng → TCP Socket → Stream Listener → Drain3 clustering →
Parse with existing regex → OK or FAIL buffer →
[threshold] → Feedback → Generate new regex → Re-parse FAIL
Мы используем оба режима: Stream Mode для текущих логов, Batch Mode для ретроспективного анализа.
Ключевая идея проекта — вместо Fine-Tuning модели мы используем стандартную модель как универсальный инструмент генерации regex. Модель не “запоминает” логи, она создает правила, которые скрипт сохраняет и применяет самостоятельно.
Вот пошаговый разбор пайплайна:
Шаг 1: Кластеризация (Drain3)
Самая большая ошибка — пытаться скормить нейросети каждую строку лога. Это убьет производительность. Мы используем Python-библиотеку Drain3. Это алгоритм, который в реальном времени читает поток строк и выявляет шаблоны (сигнатуры).
Пример:
Строка 1: User admin login failed from 192.168.1.1
Строка 2: User guest login failed from 10.0.0.5
Drain3 определяет, что это один шаблон: User <*> login failed from <*> и присваивает ему уникальный ID кластера. Если скрипт видит лог с известным ID кластера, он применяет готовое правило без обращения к LLM.
Пример реализации:
def process_line(self, line: str) -> Optional[str]:
"""Обработка одной строки лога через Drain3."""
result = self._template_miner.add_log_message(line)
cluster_id = result.get("cluster_id")
if cluster_id:
if cluster_id not in self._cluster_examples:
self._cluster_examples[cluster_id] = []
if len(self._cluster_examples[cluster_id]) < 5:
self._cluster_examples[cluster_id].append(line)
return cluster_id
Шаг 2: Генерация (Промпт-инжиниринг)
Если Drain3 сигнализирует о новом шаблоне, скрипт собирает буфер из 5 реальных примеров и формирует промпт для локальной модели.
Логика retry: Максимум 3 попытки исправить невалидный regex. Каждая попытка включает feedback об ошибке. При успехе сохраняем regex.
Модель возвращает regex, которое мы сразу валидируем.
Пример regex, сгенерированного моделью:

40+ именованных групп. Такой regex вручную писать долго и чревато ошибками, а LLM генерирует его за секунды.
Метрики эффективности
На реальных данных наш подход показывает следующие результаты:
Точность генерации: 85-90% regex проходят валидацию с первой попытки;
Retry-эффективность: После 2-3 попыток успешность достигает 98%+;
Скорость обработки: Stream Mode обрабатывает ~100-500 строк/сек (зависит от сложности regex);
Инкрементальная обработка: Повторные запуски быстрее в 50-100 раз благодаря hash-based deduplication.
Шаг 3: Валидация с автоматическим исправлением
Мы не доверяем коду от нейросети без проверки. Скрипт получает ответ от модели и сразу валидирует его — пытается применить этот regex к тем же 5 примерам. Если regex не работает, система возвращает примеры, regex и ошибку обратно в модель с просьбой исправить
Пример функции валидации:
def _validate_regex(regex: str, examples: List[str]) -> Dict[str, Any]:
"""Компилируем regex и проверяем на примерах."""
try:
compiled = re.compile(regex)
except re.error as exc:
return {
"valid": False,
"matched": 0,
"total": len(examples),
"error": f"Compile error: {exc}"
}
matched = sum(1 for ex in examples if compiled.match(ex.strip()))
return {
"valid": matched == len(examples),
"matched": matched,
"total": len(examples),
"error": None if matched == len(examples) else f"Matched only {matched}/{len(examples)}"
}
Логика обработки результата:
Regex работает и извлекает данные → сохраняем в базу и привязываем к ID кластера Drain3
Ошибка или неполное совпадение → скрипт просит модель перегенерировать regex (обычно со второй попытки получается идеально)
Шаг 4: Продакшн (Кэширование)
Теперь, когда для этого шаблона есть проверенный Regex, нейросеть больше не нужна.
При появлении такого лога скрипт достает правило из базы, парсит строку и отправляет в Wazuh.
Итог: Нейросеть работает только в момент появления нового сервиса или изменения формата логов. 99.9% времени система работает на чистом CPU, потребляя минимум ресурсов.
В теории всё звучит красиво, но на практике мы собрали достаточно граблей. Вот что стоит учесть, если захотите повторить наш опыт [7].
1. Зарезервированные поля Wazuh
На практике мы обнаружили, что некоторые имена полей могут вызывать конфликты с внутренней логикой Wazuh. Например в правиле нельзя использовать имя “action” для полей. Вместо “action” используй “log_action” и т.д. https://documentation.wazuh.com/current/user-manual/ruleset/ruleset-xml-syntax/rules.html [8]
2. Regex-галлюцинации
Иногда модель увлекается и выдает избыточно сложные Regex — например, использует сложные Lookbehind-проверки или атомарные группы, которые медленные на больших объемах.
А иногда наоборот создается слишком “жадные” и пытается положить 90% лога в один (?P<message>.+) или (?P<info>.+)
Хорошо, что все эти сложности решаются обычной инструкцией:
1. Используй ТОЛЬКО именованные группы вида (?P<name>pattern)
2. ЗАПРЕЩЕНО:
lookahead (?=...)
lookbehind (?<=...)
2.1 создавать группу (?P<message>.+) или (?P<info>.+), если "хвост" лога содержит структуру
Структурой считаются:
URL-адреса (ws://..., http://...)
IP-адреса
Числа
Устойчивые фразы (Connecting to, Failed at, User logged in)
2.2 Запрещено использовать "action" для названия полей. Вместо "action" используй "log_action" и т.д.
Инкрементальная обработка
Пересчитывать все кластеры при каждом запуске дорого. Добавили hash-based идентификацию обработанных кластеров.
def _compute_cluster_hash(signature: str, examples: List[str]) -> str:
"""Вычисляем стабильный хеш кластера."""
if not examples:
return hashlib.sha256(signature.encode()).hexdigest()[:16]
content = f"{signature}|{examples[0]}"
return hashlib.sha256(content.encode()).hexdigest()[:16]
# В основном цикле:
if _compute_cluster_hash(cluster.signature, cluster.examples) in processed_hashes:
print(f"Skipping already processed: {cluster.cluster_id}")
continue
Новые типы логов появляются в произвольные моменты. Реализовали буферизацию FAIL-строк с триггером по threshold ИЛИ интервалу.
def _check_and_trigger_feedback(self, service: str) -> None:
"""Проверка условий для запуска feedback loop."""
if service not in self.feedback_buffers:
return
buffer_size = len(self.feedback_buffers[service])
now = datetime.now()
last_trigger = self.last_trigger_times.get(service, now)
# Триггер: накопилось 100 FAIL строк ИЛИ прошло 5 минут
threshold_reached = buffer_size >= 100
interval_reached = (now - last_trigger).total_seconds() >= 300
if threshold_reached or (interval_reached and buffer_size > 0):
# Feed to Drain3 → Generate new regex → Re-parse FAIL
self._trigger_feedback_loop(service)
self.last_trigger_times[service] = now
При падении сервиса можно потерять текущее состояние. Добавили периодическое сохранение + graceful shutdown.
def gracefulshutdown(self):
"""Сохранение состояния при остановке."""
for service, manager in self.drain_managers.items():
manager.save_state() # Сохраняем Drain3 состояние
manager.dump_signatures() # Сохраняем сигнатуры
Локальная модель на Mac Mini идеально справляется с тактической задачей — написанием конкретных регулярных выражений. Но у Wazuh есть и стратегический уровень — иерархия правил (Rules).
Это когда одно правило зависит от другого: «Если сработал парсер Nginx (Parent), И уровень ошибки Critical, И IP из черного списка (Children) — тогда бей тревогу».
Строить такую красивую древовидную структуру (XML) локальной модели сложновато — ей не хватает контекста и «интеллекта» для глобальной логики.
Поэтому мы используем гибридный подход:
Локально (Mac Mini): Парсим «сырые» логи, генерируем базовые Regex-кирпичики и создаем xml для правило для каждой сигнатуры. Здесь остаются все персональные данные (IP, логины).
Анонимизация: Скрипт берет пачку готовых, проверенных regex, xml правила для сервиса, убирает примеры логов, оставляя только структуру полей (src_ip, user_name),
Облако: Обезличенную структуру мы отправляем в большую модель
Запрос: «Вот 30 regex и 30 xml. Построй из них оптимальную иерархию XML-правил для Wazuh с наследованием».
Результат: Облачная модель возвращает структурированный XML, который мы импортируем в Wazuh .
<group name="optimized-rules">
<!-- ================================================== -->
<!-- 🚦 SECTION 1: TRAFFIC JOURNAL (База ID: 26000) -->
<!-- ================================================== -->
<!-- РОДИТЕЛЬСКОЕ ПРАВИЛО ДЛЯ TRAFFIC-JOURNAL -->
<rule id="26000" level="0">
<decoded_as>json</decoded_as>
<field type="pcre2" name="log_type">^traffic-journal$</field>
<description>Base rule for Traffic Journal logs</description>
</rule>
<!-- Дети: Actions (Drop/Accept) -->
<rule id="26008" level="5">
<if_sid>26000</if_sid>
<field type="pcre2" name="result">^drop$</field>
<description>Traffic journal: Result DROP</description>
<group>traffic-journal, drop</group>
</rule>
и т.д.
Так мы автоматизируем задачу администратора SIEM на 80-90%, оставляя за человеком только финальное «ОК». И при этом никакие чувствительные данные не покидают периметр — в облако уходит только сухая логика правил.
Что получилось:
Автоматическая генерация Regex и XML без участия человека;
Real-time обработка с feedback loop для “дообучения” скрипта на новых типах логов;
Приватность данных — вся обработка чувствительных данных внутри периметра компании.
Если у вас есть возможность, то используйте LLM как архитектора правил, а не как исполнителя.
Автор: MaxMoro1
Источник [9]
Сайт-источник BrainTools: https://www.braintools.ru
Путь до страницы источника: https://www.braintools.ru/article/25384
URLs in this post:
[1] статье: https://habr.com/ru/companies/bitrix/articles/969626/
[2] обучения: http://www.braintools.ru/article/5125
[3] реагировать: http://www.braintools.ru/article/1549
[4] ошибка: http://www.braintools.ru/article/4192
[5] Image: https://sourcecraft.dev/
[6] логику: http://www.braintools.ru/article/7640
[7] опыт: http://www.braintools.ru/article/6952
[8] https://documentation.wazuh.com/current/user-manual/ruleset/ruleset-xml-syntax/rules.html: https://documentation.wazuh.com/current/user-manual/ruleset/ruleset-xml-syntax/rules.html
[9] Источник: https://habr.com/ru/companies/bitrix/articles/992708/?utm_source=habrahabr&utm_medium=rss&utm_campaign=992708
Нажмите здесь для печати.