- BrainTools - https://www.braintools.ru -

Бесплатная защита от спама на почте с помощью ИИ фильтрации без VPN: многоуровневый метод с BERT и 550 МБ RAM

Вас тоже достаёт спам и реклама? Рекламу я блокирую через свой DNS сервер и локальными CSS фильтрами, а вот для почты пришлось придумать что-то другое.

Бесплатная защита от спама на почте с помощью ИИ фильтрации

Бесплатная защита от спама на почте с помощью ИИ фильтрации

Почта у меня устроена немного необычно (я так думаю). Все мои ящики — Яндекс, Mail, старые адреса — настроены на пересылку на мой домен. С домена всё летит в Gmail. Протоколом IMAP не пользуюсь много лет (да ещё какие то платные мутки в РФ недавно придумали). А вот интерфейс Google меня полностью устраивает, поиск мгновенный, удобные метки и сортировка. Есть мелочи, которые не нравятся, но лучше варианта, который закрывал бы мои потребности [1], пока не встречал. В итоге получается единая точка, где видна вся переписка.

Схема многоступенчатая. Первыми срабатывают фильтры Яндекса и Mail — что-то они отсеивают сами, ещё до пересылки. То, что прошло через них, падает на мой сервер, где стоит SpamAssassin. Ловит ещё часть. Но после двух уровней всё равно что-то просачивается, спамеры же не сидят без дела. И вот этот остаток доезжает до Gmail и что-то оседает в папке Спам, а что-то попадает во входящие и приходит раздражающее уведомление. Хотелось, чтобы со временем не накапливался мусор в папках, который надо разгребать вручную. Особенно важно заблокировать то, что не является полностью спамом: приглашения на конференции, партнёрские предложения, кредиты — формально не нарушение, поэтому байесовский фильтр такие вещи плохо ловит.

Сначала думал взять LLM через API. Llama или GPT отлично разбирают текст, с ними гибко настраиваются критерии. Но это внешняя зависимость. В наше нестабильное сетевое время желательно уменьшать такие зависимости. Сервис может поменять условия, отключиться временно, геоблокировки, да и много чего ещё.

Локальная BERT-модель закрыла обе проблемы. Взял ruBert-base-antispam с HuggingFace — файн-тюн на базе DeepPavlov/rubert-base-cased-conversational. 177 миллионов параметров, 12 слоёв трансформера, 768 hidden size. Физически не принимает больше 512 токенов на вход. В памяти [2] занимает около 550 МБ, ответ приходит за 100-200 миллисекунд. Бинарный классификатор — текст на входе, 0 или 1 на выходе, никаких промптов и reasoning. Идеально!

Встроить вызов скрипта прямо в SMTP-конвейер Exim оказалось непросто. Механизмы вроде ACL или transport_filter имеют свои особенности, особенно с мудреной панелью сервера, которая сама обновляет многие файлы. А то будет так: где-то письмо недоступно целиком, где-то прав не хватает, где-то модифицированный текст не передаётся дальше, где-то файл обновился. Пошёл другим путём — post-delivery обработкой.

Получилась трёхуровневая серверная защита. Первый уровень — pre-rules, набор регулярок для очевидного спама. Срабатывают за миллисекунду, без обращения к нейросети. Второй уровень — сама BERT-модель, обрабатывает всё, что не отсекли регулярки, это около 70% потока. Третий уровень работает параллельно — SpamAssassin со своим байесовским фильтром. Если он ставит score выше порога, Dovecot через Sieve перемещает письмо в .Spam. Это страховка на случай, если первые два уровня что-то пропустили. При этом AI-скрипт всё равно проверяет письмо: если SpamAssassin ошибся и пометил нормальное письмо, а AI считает его чистым, оно всё равно уйдёт на Gmail. Fail-open стратегия в AI-скрипте спасает от параноидального SpamAssassin и без сильных заморочек с коэффициентами.

Схема работает так. Exim принимает письмо и складывает его в Maildir, в папку new/. Отдельный скрипт каждые полминуты обходит все директории. Из MIME вытаскивает текст и заголовки — поддерживает и plain, и HTML, теги чистит стандартным HTMLParser. Сначала гонит через pre-rules, причём смотрит не только в текст, но и в заголовки. Что не отсеялось — идёт в постоянно работающий daemon через Unix-сокет. Спам перекладывается в .Spam/, нормальные письма идут в cur/ и параллельно улетают на Gmail через sendmail.

Задержка на доставку письма в 30–60 секунд меня не беспокоит. В почте я в реальном времени не сижу, это ж не чат, да и Gmail подтягивает не прям мгновенно. Почта нужна для спокойных переписок, а не для гонок кто быстрее ответил.

Привожу пример daemon (/opt/spam_filter/daemon.py):

#!/usr/bin/env python3
import socket, sys, os, re, signal, logging
import torch
from transformers import AutoTokenizer, AutoModelForSequenceClassification

MODEL_NAME = "assskelad/ruBert-base-antispam"  # любая BertForSequenceClassification
SOCKET_PATH = "/tmp/spam_filter.sock"
MAX_LEN = 512      # лимит BERT
MAX_TEXT = 6000    # символы из письма

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("spam_daemon")

torch.set_num_threads(1)
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
model = AutoModelForSequenceClassification.from_pretrained(MODEL_NAME)
model.eval()

def predict(text: str) -> int:
    inputs = tokenizer(text, return_tensors="pt", truncation=True, max_length=MAX_LEN)
    with torch.inference_mode():
        out = model(**inputs)
        return int(torch.argmax(out.logits, dim=1).item())

def handle(conn):
    try:
        data = conn.recv(1024 * 1024)
        text = data.decode("utf-8", errors="replace")[:MAX_TEXT]
        result = predict(text) if text else 0
        conn.sendall(str(result).encode())
    except Exception as e:
        logger.error(f"Error: {e}")
        conn.sendall(b"0")
    finally:
        conn.close()

def shutdown(signum, frame):
    try:
        os.unlink(SOCKET_PATH)
    except:
        pass
    sys.exit(0)

signal.signal(signal.SIGTERM, shutdown)
signal.signal(signal.SIGINT, shutdown)

srv = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
srv.bind(SOCKET_PATH)
srv.listen(10)

while True:
    conn, _ = srv.accept()
    handle(conn)

Pre-rules (/opt/spam_filter/pre_rules.py) — паттерны:

#!/usr/bin/env python3
import re

def check_pre_rules(text, headers=None):
    text_lower = text.lower()
    headers = headers or {}
    
    # паттерны
    spam_patterns = [
        r'рассылк[аи]s+поs+баз|миллионs+адрес',
        r'внедрени.{0,15}crm|ботs+продвижени.{0,20}(?:telegram|whatsapp)',
        r'диплом.{0,20}(?:профпереподготовк|допобразован)',
        r'бухгалтерск.{0,30}сопровожден|нулевой.{0,20}ндс',
        r'спарсить.{0,30}базу|парсинг.{0,30}клиентов',
        r'подтвердите.{0,30}данны.{0,200}предложениеs+истекает',
        r'(?:[а-я]s){4,}',
        r'деньгиs+подs+залог.{0,20}(?:авто|машин)',
        r'сеоs+оптимизац.{0,20}недорого',
    ]
    for pattern in spam_patterns:
        if re.search(pattern, text_lower):
            return "SPAM"
    
    # рассылки
    bulk_headers = any(
        k.lower() in ['list-unsubscribe', 'precedence', 'feedback-id', 'x-rpcampaign']
        for k in headers
    )
    if bulk_headers and re.search(r'предлагаем|приглашаем|бесплатн.{0,20}консультац', text_lower):
        return "SPAM"
    
    # спам
    if re.search(r'кето|метаболизм|инсулин|жиросжигани[ею]|жкт', text_lower) and 
       re.search(r'тыs+устал|энерги[яи]s+наs+нуле|мыs+да[её]мs+систему', text_lower):
        return "SPAM"
    
    # нужные письма
    ham_patterns = [
        r'кодs+(?:подтвержден|дляs+вход|доступа):s*d{4,6}',
        r'вашs+заказs+№?d+|orders+#?d+',
        r'сбросs+парол|passwords+reset',
        r'платежs+прошел|payments+received',
        r'встреч[аи]s+(?:перенесен|назначен|состоится)',
    ]
    for pattern in ham_patterns:
        if re.search(pattern, text_lower):
            return "HAM"
    
    return None

Правило про массовые рассылки оказалось очень полезным. Нейросеть часто пропускает такие письма, потому что они грамматически корректны и не содержат явного мусора. А связка заголовок List-Unsubscribe + слово приглашаем в тексте даёт 100% точности без ложных срабатываний на личные письма. Правило про медицинские термины — пример составной регулярки: по отдельности слова могут употребляться в переписке, но в сочетании с некоторыми это однозначный спам. Так файл pre_rules.py со временем обрастает специфичными правилами под ваш поток, который иногда можно пополнять.

Сокращенный Post-delivery скрипт:

#!/usr/bin/env python3
import os, sys, socket, datetime, time, subprocess
from email import policy
from email.parser import BytesParser
from html.parser import HTMLParser
import re

SOCKET = "/tmp/spam_filter.sock"
MAIL_BASE = "/home/admin/mail"         # путь
FORWARD_TO = "your@gmail.com"          # куда присылать
WAIT_SECONDS = 5                       # ожидание
LOCK_FILE = "/tmp/ai_spam_check.lock"

def check_spam(email_bytes):
    text, headers = extract_text_and_headers(email_bytes)
    
    pre_verdict = check_pre_rules(text, headers)
    if pre_verdict == "SPAM":
        return True, text[:100], "pre_rule"
    elif pre_verdict == "HAM":
        return False, text[:100], "pre_rule"
    
    text_bytes = text.encode('utf-8', errors='ignore')
    try:
        s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
        s.settimeout(2)
        s.connect(SOCKET)
        s.settimeout(5)
        s.sendall(text_bytes)
        s.shutdown(socket.SHUT_WR)
        r = s.recv(8).decode("ascii", errors="ignore").strip()
        s.close()
        return r == "1", text[:100], "ai"
    except Exception as e:
        log(f"DAEMON_ERROR: {e}")
        return False, text[:100], "error"

def forward_to_gmail(email_bytes, filename=""):
    modified = b"X-Forwarded-By: AI-Filtern" + email_bytes
    p = subprocess.Popen(
        ["/usr/sbin/sendmail", "-i", FORWARD_TO],
        stdin=subprocess.PIPE,
        stdout=subprocess.DEVNULL,
        stderr=subprocess.DEVNULL
    )
    p.stdin.write(modified)
    p.stdin.close()
    p.wait()

def process_new_dir(new_dir, spam_new_dir, cur_dir):
    for filename in os.listdir(new_dir):
        filepath = os.path.join(new_dir, filename)
        if time.time() - os.path.getmtime(filepath) < WAIT_SECONDS:
            continue
        
        with open(filepath, "rb") as f:
            email_bytes = f.read()
        
        if b"X-Forwarded-By: AI-Filter" in email_bytes:
            os.rename(filepath, os.path.join(cur_dir, filename + ":2,"))
            continue
        
        is_spam, preview, method = check_spam(email_bytes)
        if is_spam:
            os.rename(filepath, os.path.join(spam_new_dir, filename))
            log(f"SPAM [{method}]: {filename} | {preview}")
        else:
            os.rename(filepath, os.path.join(cur_dir, filename + ":2,"))
            forward_to_gmail(email_bytes, filename)
            log(f"NOT_SPAM [{method}]: {filename} | {preview}")

Самое интересное — гонки с MTA. Если читать файл сразу после создания, можно получить недописанное письмо пока оно сохраняется на носитель. Отсюда WAIT_SECONDS: скрипт пропускает файлы моложе заданного возраста. Это плата за post-delivery.

Петли пересылки тоже пришлось учесть. Если случайно отправить самому себе или замкнуть цепочку, начнётся бесконечный цикл. Защита — заголовок X-Forwarded-By: скрипт видит его и пропускает такие письма, просто перемещая в cur/.

По RAM памяти: модель занимает около 550 МБ, скрипт проверки при запуске ещё около 50 МБ, Exim 25 МБ, Dovecot 10 МБ, SpamAssassin 80 МБ. Итого около 716 МБ на всю систему. У меня сервер на 2 гига, в итоге занято около одного. Всё стабильно, без свопа. Для более слабых машин подошёл бы DistilBERT, он вдвое легче, но у меня запаса хватает.

Если daemon упал, сокет недоступен или произошла любая ошибка [3] — письмо считается нормальным и идёт дальше. Лучше получить пару спам-писем, чем пропустить важное.

Для мониторинга есть bash-скрипт SPAM, который показывает статус всех компонентов, статистику за день, состояние ящиков и trend за последний час прямо в терминале. Команда SPAM -w даёт live-обновление каждые 3 секунды, SPAM -t тестирует модель на 10 примерах, SPAM -r — pre-rules на 21 примере. Для админа почтового сервера удобный CLI-мониторинг оказывается полезнее любого веб-интерфейса. Логи пишу в /var/log/ai_spam.log с пометками [pre_rule] или [ai], чтобы было видно, какой уровень сработал.

Спам перестал доезжать до Gmail. Что раньше оседало в папке и требовало периодической чистки, теперь остаётся на сервере в .Spam/. На Gmail попадают только письма, прошедшие все три фильтра. Иногда модель ошибается — на редких темах или специфическом сленге. В таких случаях добавляю паттерн в pre_rules.py, и проблема больше не повторяется. Раз в месяц можно глянуть в .Spam/, просматриваю заголовки — не потерялось ли чего. Обычно нет. А если что и попадет по ошибке — пара строк в конфиге, и проблема закрыта.

Нагрузка на сервер минимальная, при сотне писем в день daemon большую часть времени спит, CPU почти не ест. Раз настроил и работает, иногда только добавляю новые регулярки, когда замечаю в логах повторяющийся мусор. Проблема со спамом закрыта!

А как вы боретесь со спамом на своих серверах? Если будут нужны исходники, то выложу на GitHub.

Автор: ArtemErykov

Источник [4]


Сайт-источник BrainTools: https://www.braintools.ru

Путь до страницы источника: https://www.braintools.ru/article/32637

URLs in this post:

[1] потребности: http://www.braintools.ru/article/9534

[2] памяти: http://www.braintools.ru/article/4140

[3] ошибка: http://www.braintools.ru/article/4192

[4] Источник: https://habr.com/ru/articles/1055518/?utm_campaign=1055518&utm_source=habrahabr&utm_medium=rss

www.BrainTools.ru

Rambler's Top100