Разговариваем с датчиками на человеческом: как связать MQTT, TimescaleDB и LLM через Model Context Protocol (MCP). ai.. ai. ai agent.. ai. ai agent. mcp.. ai. ai agent. mcp. mqtt.. ai. ai agent. mcp. mqtt. opencode.. ai. ai agent. mcp. mqtt. opencode. telegraf.. ai. ai agent. mcp. mqtt. opencode. telegraf. TimescaleDB.. ai. ai agent. mcp. mqtt. opencode. telegraf. TimescaleDB. Анализ и проектирование систем.. ai. ai agent. mcp. mqtt. opencode. telegraf. TimescaleDB. Анализ и проектирование систем. искусственный интеллект.. ai. ai agent. mcp. mqtt. opencode. telegraf. TimescaleDB. Анализ и проектирование систем. искусственный интеллект. промышленная автоматизация.. ai. ai agent. mcp. mqtt. opencode. telegraf. TimescaleDB. Анализ и проектирование систем. искусственный интеллект. промышленная автоматизация. Промышленное программирование.. ai. ai agent. mcp. mqtt. opencode. telegraf. TimescaleDB. Анализ и проектирование систем. искусственный интеллект. промышленная автоматизация. Промышленное программирование. Умный дом.. ai. ai agent. mcp. mqtt. opencode. telegraf. TimescaleDB. Анализ и проектирование систем. искусственный интеллект. промышленная автоматизация. Промышленное программирование. Умный дом. Управление проектами.

Любой, кто когда-либо работал на фабрике или в автоматизированных коммерческих помещениях, хорошо знаком с этой болью: данные датчиков занимают гигабайты, но чтобы извлечь из них хоть какую-то пользу, нужно пройти семь кругов ада. Надо писать SQL-запросы, загружать данные в CSV, переводить на Python, составлять графики в Excel, внимательно просматривать и искать аномалии…

Что делать, если вам нужно быстро понять, почему влажность подскочила в три часа ночи в прошлый вторник? Инженер тратит на это полдня. У завода есть данные, а современные большие языковые модели (LLM) обладают отличными аналитическими “мозгами”. Но долгое время они не понимали друг друга. Передавать терабайты необработанных логов напрямую в контекст нейронной сети дорого и бессмысленно.

Решение появилось недавно, и называется оно Model Context Protocol (MCP). Это открытый стандарт, разработанный компанией Anthropic (и активно поддерживаемый Google, Microsoft и другими гигантами), который позволяет LLM безопасно и стандартизированно вызывать “инструменты” во внешнем мире.

Разговариваем с датчиками на человеческом: как связать MQTT, TimescaleDB и LLM через Model Context Protocol (MCP) - 1

В нашем случае таким инструментом будет доступ к базе данных в режиме реального времени. Давайте изменим привычный порядок вещей. Вместо скучной теории я сначала покажу вам, как это работает на практике, а затем мы шаг за шагом разберемся, как сделать этот мост своими руками.

Как это выглядит для инженера (Демо)

Представьте, что на объекте стоят два датчика, которые измеряют температуру и влажность. Данные с них непрерывно пишутся в базу. Вместо написания скриптов дежурный инженер просто открывает чат с AI-ассистентом (это может быть Claude, OpenCode или любой клиент с поддержкой MCP) и пишет обычным текстом:

Инженер: «Посмотри показания датчиков с температурой и влажностью и покажи максимальные и минимальные значения за 3 дня».

Нейросеть сама понимает, что для этого нужно сделать SQL-запрос к TimescaleDB, вызывает наш MCP-инструмент, получает сжатый ответ и выдает аккуратную сводку:

Максимальные и минимальные значения датчиков

Максимальные и минимальные значения датчиков

Инженер: «Нарисуй гистограмму изменений температуры».

Ассистент мгновенно строит наглядный график прямо в интерфейсе чата:

Гистограмма изменений температуры

Гистограмма изменений температуры

Аналогично по влажности с нужным шагом:

Инженер: «Нарисуй гистограмму изменений влажности с шагом в 10 минут».

Гистограмма изменений влажности

Гистограмма изменений влажности

И, наконец, сложная аналитика, на которую у человека ушло бы много времени:

Инженер: «Проанализируй корреляцию при изменении значений температуры и влажности по времени».

Корреляция при изменении значений температуры и влажности

Корреляция при изменении значений температуры и влажности

AI-инженер работает безупречно: он неутомим, бесстрастен и очень быстр. За секунды он просеивает миллионы точек телеметрии и выдает готовые выводы.

Хотите себе такой же инструмент? Давайте соберем его. Наша цепочка данных будет выглядеть так: Датчик → Mosquitto (MQTT) → Telegraf → TimescaleDB → MCP-сервер → LLM.

В качестве серверной платформы мы использовали сервер на Ubuntu. Пройдемся по всем этапам настройки.

Шаг 1. Настраиваем хранилище: PostgreSQL + TimescaleDB

Обычный Postgres под большой нагрузкой от IoT-датчиков начинает «грустить». Поэтому мы используем расширение TimescaleDB — оно превращает Postgres в мощную базу данных для временных рядов (time-series), автоматически разбивая таблицы на партиции (гипертаблицы) по времени.

Для удобства и изоляции развернем всё в Docker. Вот наш docker-compose.yml (обратите внимание, мы вынесли базу на нестандартный порт 5433, чтобы не конфликтовать с локальным Postgres, если он у вас уже установлен):

version: '3.8'

services:
  timescaledb:
    image: timescale/timescaledb:latest-pg16
    container_name: timescaledb
    restart: unless-stopped
    environment:
      POSTGRES_USER: ${DB_USER:-postgres}
      POSTGRES_PASSWORD: ${DB_PASSWORD:-changeme}
      POSTGRES_DB: ${DB_NAME:-sensor_data}
      TZ: UTC
    ports:
      - "${DB_PORT:-5433}:5432"  # Маппинг нестандартного порта на хост
    volumes:
      - timescale_data:/var/lib/postgresql/data
      - ./backups:/backups
      - ./init:/docker-entrypoint-initdb.d
    networks:
      - iot_network
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U ${DB_USER:-postgres} -d ${DB_NAME:-sensor_data}"]
      interval: 30s
      timeout: 10s
      retries: 5
    logging:
      driver: "json-file"
      options:
        max-size: "10m"
        max-file: "3"
    deploy:
      resources:
        limits:
          memory: 1G
        reservations:
          memory: 512M

  pgadmin:  # Веб-интерфейс для удобного управления БД
    image: dpage/pgadmin4:latest
    container_name: pgadmin
    restart: unless-stopped
    environment:
      PGADMIN_DEFAULT_EMAIL: admin@example.com
      PGADMIN_DEFAULT_PASSWORD: ${PGADMIN_PASSWORD:-changeme}
      PGADMIN_CONFIG_SERVER_MODE: 'False'
    ports:
      - "5050:80"
    volumes:
      - pgadmin_data:/var/lib/pgadmin
    networks:
      - iot_network
    depends_on:
      - timescaledb

volumes:
  timescale_data:
    name: timescale_production_data
  pgadmin_data:
    name: pgadmin_storage

networks:
  iot_network:
    name: iot_sensor_network
    driver: bridge

Чтобы база сразу подготовилась к работе, создадим SQL-скрипт инициализации. Положите его в папку ./init/01-timescale.sql:

-- Подключение к нашей целевой базе
c sensor_data;

-- Включаем расширение TimescaleDB
CREATE EXTENSION IF NOT EXISTS timescaledb CASCADE;

-- Создаем отдельную схему для сенсоров
CREATE SCHEMA IF NOT EXISTS sensors;

-- Устанавливаем права
GRANT ALL PRIVILEGES ON SCHEMA sensors TO postgres;

-- Создаем базовую таблицу для данных, которую мы потом сделаем гипертаблицей
CREATE TABLE IF NOT EXISTS sensors.mqtt_consumer (
    time TIMESTAMPTZ NOT NULL,
	host TEXT,
    topic TEXT NOT NULL,
    value DOUBLE PRECISION NOT NULL
);

-- Превращаем таблицу в гипертаблицу TimescaleDB по полю time
SELECT create_hypertable('sensors.mqtt_consumer', 'time', if_not_exists => TRUE);

Рядом создаем файл переменных окружения .env:

# Настройки БД
DB_USER=postgres
DB_PASSWORD=super_password_123  # Обязательно поменяйте на свой в продакшене!
DB_NAME=sensor_data
DB_PORT=5433
PGADMIN_PASSWORD=super_password_pgadmin

Запускаем нашу базу:

docker-compose up -d

Проверяем, что всё поднялось и работает:

docker compose ps
docker exec -it timescaledb psql -U postgres -d sensor_data -c "SELECT extversion FROM pg_extension WHERE extname = 'timescaledb';"

Шаг 2. Настраиваем транспорт: брокер Mosquitto с авторизацией

Датчики будут слать данные по легковесному протоколу MQTT. Нам нужен брокер. Использовать публичные брокеры без паролей на производстве — это сразу приговор безопасности, поэтому мы настроим Mosquitto с жестким разделением прав (ACL) и авторизацией.

Создаем структуру папок на хосте:

mkdir ~/mosquitto && cd ~/mosquitto
mkdir config data

Пишем конфигурационный файл ./config/mosquitto.conf:

# Запрещаем анонимный доступ
allow_anonymous false

# Указываем пути к файлам авторизации внутри контейнера
password_file /mosquitto/config/passwd
acl_file /mosquitto/config/acl

# Стандартный MQTT порт
listener 1883 0.0.0.0

# Порт для веб-сокетов (если захотите выводить данные на веб-панель)
listener 9001
protocol websockets

# Включаем персистентность (сохранение сессий при перезапуске)
persistence true
persistence_location /mosquitto/data/

Теперь настроим права доступа в файле ./config/acl:

user admin
topic readwrite sensors/#

Важный нюанс по безопасности: Файл паролей нельзя создавать руками в блокноте — пароли должны быть зашифрованы. Мы сгенерируем файл с помощью утилиты mosquitto_passwd прямо через временный контейнер:

# Запускаем временный контейнер для генерации пароля (пользователь admin, пароль secret)
docker run --rm -it -v $(pwd)/config:/mosquitto/config eclipse-mosquitto mosquitto_passwd -c -b /mosquitto/config/passwd admin secret

Совет: Если нужно добавить еще одного пользователя без перезаписи первого, выполните ту же команду, но уберите флаг -c.

Решаем частую проблему с правами в Ubuntu: Mosquitto внутри контейнера работает от пользователя с UID 1883. Если права на файлы на хосте выставлены неверно, брокер выдаст ошибку Unable to open passwordfile и упадет. Лечим это на хосте:

sudo chown -R 1883:1883 ./config
sudo chmod 600 ./config/passwd
sudo chmod 700 ./config/acl
sudo chmod 755 ./config

Теперь создаем простой docker-compose.yml для Mosquitto в папке ~/mosquitto:

version: '3.8'

services:
  mosquitto:
    image: eclipse-mosquitto:latest
    container_name: mosquitto
    restart: unless-stopped
    ports:
      - "1883:1883"
      - "9001:9001"
    volumes:
      - ./config:/mosquitto/config:ro  # Монтируем конфигурацию только для чтения
      - ./data:/mosquitto/data

Запускаем брокер:

docker compose up -d

Быстрый тест связи:

Установим утилиты на хост и проверим, что брокер не пускает анонимов, но отлично работает под нашей учеткой:

sudo apt-get update && sudo apt-get install -y mosquitto-clients

# В первом терминале запускаем подписку:
mosquitto_sub -h localhost -t "sensors/#" -u "admin" -P "secret" -v

# Во втором терминале отправляем тестовое значение:
mosquitto_pub -h localhost -t "sensors/esp8266/temperature" -m "24.5" -u "admin" -P "secret"

Если в первом терминале появилось сообщение — транспорт готов!

Шаг 3. Строим мост: Установка и настройка Telegraf

Нам нужен надёжный инструмент, который забирает данные из MQTT и складывает в TimescaleDB. Писать собственный скрипт на Python для этого – классический велосипед, который развалится при первом же сбое сети. Поэтому берём промышленное решение от InfluxData, Telegraf. Он буферизирует данные в памяти, если база временно недоступна, и почти не грузит систему.

Установим Telegraf на наш Ubuntu-сервер по официальной инструкции:

# Очищаем старые ключи, если они были
sudo rm -f /etc/apt/sources.list.d/influxdata.list
sudo rm -f /etc/apt/trusted.gpg.d/influxdata-archive*

# Добавляем официальный репозиторий
wget -q https://repos.influxdata.com/influxdata-archive.key
cat influxdata-archive.key | gpg --dearmor | sudo tee /etc/apt/trusted.gpg.d/influxdata-archive.gpg > /dev/null
echo 'deb [signed-by=/etc/apt/trusted.gpg.d/influxdata-archive.gpg] https://repos.influxdata.com/debian stable main' | sudo tee /etc/apt/sources.list.d/influxdata.list

# Устанавливаем
sudo apt-get update && sudo apt-get install telegraf -y

Теперь настроем конфигурационный файл /etc/telegraf/telegraf.conf. Нам нужно указать ему слушать топики в MQTT и записывать их в схему sensors нашей базы данных:

[agent]
  interval = "10s"
  round_interval = true
  flush_interval = "10s"

[[inputs.mqtt_consumer]]
  servers = ["tcp://127.0.0.1:1883"]
  topics = ["sensors/esp8266/temperature"]
  data_format = "value"
  data_type = "float"
  username = "admin"
  password = "secret"
  client_id = "telegraf_t1"
  persistent_session = false
[[inputs.mqtt_consumer]]
  servers = ["tcp://127.0.0.1:1883"]
  topics = ["sensors/esp8266/humidity"]
  data_format = "value"
  data_type = "float"
  username = "admin"
  password = "secret"
  client_id = "telegraf_t2"
  persistent_session = false

[[outputs.postgresql]]
  connection = "host=localhost port=5433 user=postgres password=super_password_123 dbname=sensor_data sslmode=disable"

Проверим конфигурацию на ошибки синтаксиса:

sudo telegraf --config /etc/telegraf/telegraf.conf --test

Если ошибок нет, перезапускаем службу:

sudo systemctl restart telegraf

Шаг 4. Пишем «мозг»: MCP-сервер на Go

Вот мы и подошли к самому интересному. Нам нужно написать легковесный сервер, который будет общаться с LLM по протоколу MCP.

Почему Go? Он компилируется в один компактный бинарник без внешних зависимостей, работает очень быстро и почти не ест память. Можно сказать, что это идеальный вариант для промышленного шлюза рядом с базой.

Создаем проект:

mkdir mcp-timescale-server && cd mcp-timescale-server
go mod init mcp-timescale-server
go get github.com/modelcontextprotocol/go-sdk
go get github.com/jackc/pgx/v5

Создаем файл main.go. Наш MCP-сервер будет предоставлять один инструмент — query_sensor_database.

Важный момент безопасности: Чтобы ИИ случайно (или умышленно) не удалил базу, мы жестко ограничиваем логику работы на уровне кода: сервер принимает только SELECT-запросы. Для продакшена мы также настоятельно рекомендуем создать в PostgreSQL пользователя с правами исключительно SELECT (read-only) на таблицу sensors.mqtt_consumer.

package main

import (
    "context"
    "fmt"
    "log"
    "os"
    "strings"
    
    "github.com/jackc/pgx/v5"
    "github.com/modelcontextprotocol/go-sdk/mcp"
)

// Структура входных данных, которую сформирует LLM
type QueryInput struct {
    SqlQuery string `json:"sql_query" jsonschema:"Полный SQL запрос SELECT для чтения данных из TimescaleDB. Запрос должен быть безопасным и использовать только SELECT."`
}

// Структура ответа для LLM
type QueryOutput struct {
    Result string `json:"result" jsonschema:"Результат выполнения запроса"`
}

// Обработчик вызова инструмента ИИ
func ExecuteQuery(ctx context.Context, request *mcp.CallToolRequest, input QueryInput) (*mcp.CallToolResult, QueryOutput, error) {
    // Безопасность: грубая, но полезная валидация на уровне кода
    cleanQuery := strings.TrimSpace(strings.ToLower(input.SqlQuery))
    if !strings.HasPrefix(cleanQuery, "select") {
        return nil, QueryOutput{}, fmt.Errorf("разрешены только запросы на чтение данных (SELECT)")
    }

    connStr := os.Getenv("TIMESCALE_URL")
    if connStr == "" {
        connStr = "postgres://postgres:super_password_123@localhost:5433/sensor_data?sslmode=disable"
    }
    
    conn, err := pgx.Connect(ctx, connStr)
    if err != nil {
        return nil, QueryOutput{}, fmt.Errorf("ошибка подключения к БД: %w", err)
    }
    defer conn.Close(ctx)

    rows, err := conn.Query(ctx, input.SqlQuery)
    if err != nil {
        return nil, QueryOutput{}, fmt.Errorf("ошибка выполнения SQL: %w", err)
    }
    defer rows.Close()

    var resultText string
    for rows.Next() {
        values, err := rows.Values()
        if err != nil {
            continue
        }
        resultText += fmt.Sprintf("%vn", values)
    }
    
    if resultText == "" {
        resultText = "Запрос выполнен успешно, строк не возвращено."
    }

    return nil, QueryOutput{Result: resultText}, nil
}

func main() {
    // Инициализируем MCP сервер
    server := mcp.NewServer(&mcp.Implementation{
        Name:    "timescale-mcp-brain",
        Version: "v1.0.0",
    }, nil)

    // Регистрируем инструмент, подробно описывая его назначение для LLM
    mcp.AddTool(server, &mcp.Tool{
        Name:        "query_sensor_database",
        Description: "Выполняет SQL SELECT запросы к базе данных TimescaleDB с данными датчиков температуры и влажности. Используй для аналитики, агрегации или получения последних значений. Таблица: sensors.mqtt_consumer (поля: time, host, topic, value).",
    }, ExecuteQuery)

    log.Println("MCP Brain server запущен на STDIO транспорте")
    
    // Запускаем сервер через стандартные потоки ввода-вывода (Стандарт связи для MCP)
    if err := server.Run(context.Background(), &mcp.StdioTransport{}); err != nil {
        log.Fatal(err)
    }
}

Собираем наш проект в бинарный файл:

go build -o mcp-brain main.go

Шаг 5. Подключаем ИИ (Настройка клиента)

Теперь нам нужно подружить наш скомпилированный сервер с ИИ-клиентом. К примеру, если вы используете OpenCode, откройте его файл конфигурации настроек MCP (обычно лежит в ~/.config/opencode/opencode.json на Linux/macOS или C:Users%USER%.configopencodeopencode.jsonc на Windows) и добавьте туда наш сервер:

{
  "mcpServers": {
    "timescale-brain": {
      "type": "local",
      "command": "/home/user/mcp-timescale-server/mcp-brain",
      "enabled": true,
      "timeout": 30000,
      "env": {
        "TIMESCALE_URL": "postgres://postgres:super_password_123@localhost:5433/sensor_data?sslmode=disable"
      }
    }
  }
}

(Не забудьте заменить путь /home/user/... на ваш реальный путь к бинарнику).

Перезапустите OpenCode. В списке MCP-серверов должен появиться timescale-brain с зеленой точкой. Это означает, что инструмент готов к работе. OpenCode можно запускать на одной или нескольких рабочих станциях, не обязательно на сервере.

Заключение

Мы создали полноценный, масштабируемый и безопасный мост между сырыми промышленными данными и аналитикой современных LLM. Model Context Protocol открывает огромные возможности. Теперь инженеру не нужно быть экспертом в SQL и Python для глубокого анализа телеметрии. Достаточно уметь формулировать мысли.

Конечно, это лишь базовая архитектура. В реальных проектах мы добавляем к ней интеграцию со SCADA-системами, промышленные шлюзы на Modbus/OPC UA, политики безопасности и тонкую настройку ИИ-агентов под внутренние регламенты предприятий.

Если вам интересна тема интеграции искусственного интеллекта в реальный сектор экономики, автоматизации производств или умных систем мониторинга — мы, команда AWWANTIL, готовы помочь в реализации проектов любой сложности и предоставим существенную скидку на интересный проект.

Пишите нам в MAX, в Telegram или на почту: info [собака] awwantil.ru – обсудим ваши задачи и найдем оптимальное архитектурное решение.

Автор: vpomo

Источник