AI в энтерпрайзе. ai.. ai. Big Data.. ai. Big Data. Data Engineering.. ai. Big Data. Data Engineering. DevOps.. ai. Big Data. Data Engineering. DevOps. llm.. ai. Big Data. Data Engineering. DevOps. llm. Metabase.. ai. Big Data. Data Engineering. DevOps. llm. Metabase. sling.. ai. Big Data. Data Engineering. DevOps. llm. Metabase. sling. Анализ и проектирование систем.. ai. Big Data. Data Engineering. DevOps. llm. Metabase. sling. Анализ и проектирование систем. искусственный интеллект.

Мнение.

Основные проблемы в корпоративном IT это, как и прежде: инфраструктура, безопасность и работа с данными.

AI и, так называемые, Агенты AI, в этой сфере, в ближайшие 2-3 года, мало что поменяют.

В корпоративном секторе столько неэффективности и реакционности, что буст продуктивности, который принесет AI станет каплей в море.

Миграция с Oracle на Postgresql или переезд с Lotus Domino, для большинства крупных не-IT компаний принесет больше пользы, чем внедрение AI, здесь и сейчас.

Без современной инфраструктуры и стека данных, внедрение AI не отобьет OPEX и тем более CAPEX.

Единственное революционное измерение, которые стоит ожидать уже в этом году – западные корпорации будут отказывается от аутотренинг, особенно в Индии.

Вот кстати пример, того как можно автоматизировать работу с данными.

Представим себе, что нам на почту присылают письма с вложенными csv, json и xml файлами. Скрипт, который запускается по расписанию и ищет письма с словом «documents» в теме и вложенным файлом в формате csv. Сохраняет эти файлы в папку.

import imaplib
import email
from email.header import decode_header
import os
import csv

IMAP_SERVER = "imap.yandex.com"
EMAIL = "email@yandex.com"
PASSWORD = "password"
DOWNLOAD_FOLDER = "attachments"

if not os.path.exists(DOWNLOAD_FOLDER):
    os.makedirs(DOWNLOAD_FOLDER)

def connect_to_imap():
    try:
        print(f"Connecting to IMAP server: {IMAP_SERVER}")
        mail = imaplib.IMAP4_SSL(IMAP_SERVER, port=993)
        mail.login(EMAIL, PASSWORD)
        mail.select("INBOX")
        print("Connected successfully.")
        return mail
    except imaplib.IMAP4.error as e:
        print(f"IMAP error: {e}")
        print("Check your email and password, or enable app passwords if 2FA is enabled.")
        return None
    except Exception as e:
        print(f"Unexpected error: {e}")
        return None

def decode_header_value(encoded_value):
    decoded_parts = decode_header(encoded_value)
    result = ""
    for part, encoding in decoded_parts:
        if isinstance(part, bytes):
            try:
                result += part.decode(encoding or "utf-8")
            except LookupError:
                result += part.decode("latin-1")
            except UnicodeDecodeError:
                result += part.decode("utf-8", errors="ignore")
        else:
            result += part
    return result

def extract_csv_attachments(msg):
    for part in msg.walk():
        if part.get_content_maintype() == "multipart":
            continue
        if part.get("Content-Disposition") is None:
            continue

        filename = part.get_filename()
        if filename and filename.lower().endswith(".csv"):
            filename = decode_header_value(filename)
            print(f"Found CSV attachment: {filename}")

            filepath = os.path.join(DOWNLOAD_FOLDER, filename)
            with open(filepath, "wb") as f:
                f.write(part.get_payload(decode=True))
            print(f"Saved CSV attachment: {filename}")

def fetch_emails():
    mail = connect_to_imap()
    if not mail:
        return

    try:
        status, messages = mail.search(None, 'SUBJECT "documents"')
        if status != "OK":
            print("No emails found with 'documents' in the subject.")
            return

        email_ids = messages[0].split()
        print(f"Found {len(email_ids)} emails with 'documents' in the subject.")

        for mail_id in reversed(email_ids):
            status, data = mail.fetch(mail_id, "(RFC822)")
            if status != "OK":
                continue

            msg = email.message_from_bytes(data[0][1])
            subject = decode_header_value(msg["Subject"])
            print(f"Processing email: {subject}")

            extract_csv_attachments(msg)
    finally:
        mail.logout()

if __name__ == "__main__":
    fetch_emails()

Другой скрипт забирает csv и перекладывает их в duckDB – ультрабыстрая колоночная RDBMS, которая пользуется популярностью среди дата-аналитиков:

import os
import duckdb

DUCKDB_PATH = "my_database.duckdb"

ATTACHMENTS_DIR = "attachments"

def send_csv_to_duckdb():
    conn = duckdb.connect(DUCKDB_PATH)
    print(f"Connected to DuckDB database: {DUCKDB_PATH}")

    for filename in os.listdir(ATTACHMENTS_DIR):
        if filename.endswith(".csv"):
            filepath = os.path.join(ATTACHMENTS_DIR, filename)
            table_name = os.path.splitext(filename)[0]
            print(f"Processing file: {filename}")

            quoted_table_name = f'"{table_name}"'

            conn.execute(
                f"CREATE OR REPLACE TABLE {quoted_table_name} AS SELECT * FROM read_csv_auto('{filepath}')"
            )
            print(f"Data from {filename} sent to DuckDB table {quoted_table_name}.")

    print("All CSV files processed.")

if __name__ == "__main__":
    send_csv_to_duckdb()

Metabase – один из лучших инструментов с открытым исходным кодом для аналитики и визуализации данных.

docker run -d -p 3000:3000 -v "$(pwd)":/srv --name metabase metabase/metabase

Для работы с duckdb, необходимо установить плагин:

https://github.com/MotherDuck-Open-Source/metabase_duckdb_driver

Дальше в дело вступает Sling это инструмент для репликации между базами и различными типами хранилищ. Вместо нагромождения UI и микросервисов вроде Airbyte, эти ребята решили пойти по простому пути и написали rsync или rclone для работы с большими данными.

curl -LO 'https://github.com/slingdata-io/sling-cli/releases/latest/download/sling_linux_amd64.tar.gz' 
  && tar xf sling_linux_amd64.tar.gz 
  && rm -f sling_linux_amd64.tar.gz 
  && chmod +x sling

mv sling /usr/local/bin/
$ sling conns list
+--------------------------+-----------------+-------------------+
| CONN NAME                | CONN TYPE       | SOURCE            |
+--------------------------+-----------------+-------------------+
| AWS_S3                   | FileSys - S3    | sling env yaml    |
| DO_SPACES                | FileSys - S3    | sling env yaml    |
| LOCALHOST_DEV            | DB - PostgreSQL | dbt profiles yaml |
| MSSQL                    | DB - SQLServer  | sling env yaml    |
| MYSQL                    | DB - MySQL      | sling env yaml    |
| ORACLE_DB                | DB - Oracle     | env variable      |
| MY_PG                    | DB - PostgreSQL | sling env yaml    |
+--------------------------+-----------------+-------------------+

Дата пайплайн можно описать в простом YAML файле. В данном случае мы будем брать csv файл и перекладывать его в PostgreSQL и minio s3 для долгосрочного хранения:

version: 1
sources:
  duckdb_source:
    type: duckdb
    path: /path/to/your_database.duckdb

targets:
  minio_target:
    type: s3
    bucket: <bucket>
    access_key_id: <access_key_id>
    secret_access_key: '<secret_access_key>'
    endpoint: '<endpoint>'
    url_style: path

  postgres_target:
    type: postgres
    host: postgres_host
    port: 5432
    database: mydb
    username: user
    password: password

pipelines:
  - name: duckdb_to_minio
    source: duckdb_source
    target: minio_target
    query: "SELECT * FROM sales"
    mode: overwrite

  - name: duckdb_to_postgres
    source: duckdb_source
    target: postgres_target
    query: "SELECT * FROM sales"
    mode: overwrite

Ну и наконец AI. Куда же без него?
Ollama это очень популярный проект для работы с AI моделями локально, предоставляет не только CLI но и отличную библиотеку на Python.

Сначала нужно установить зависимости и скачать модельку:

pip install ollama graphvizollama run qwen2.5-coder:7b

Для своей весовой категории qwen2.5 это одна из лучших открытых LLM.

Скрипт с помощью qwen сделает краткое изложение csv на 10 гигабайт, а если надо нарисует графики и сконвертирует этот файл в нужный формат.

analyze_csv.py

analyze_csv.py

Все зависит от того, какой вы отправите промпт:

промпт

промпт

Скрипт:

#!/usr/bin/env python3
import csv
import ollama
import argparse
import logging
from typing import Optional

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

PROMPT_TEMPLATE = """[INST] <<SYS>>
You are a data analysis expert. Analyze this CSV data and:

1. Summarize the data
2. Identify trends or patterns
3. Provide insights and recommendations

Format response with:
- Markdown analysis
- Key insights
- Recommendations for further analysis
<</SYS>>

CSV Data:
{data}
[/INST]"""

def read_csv(file_path: str) -> str:
    try:
        with open(file_path, "r") as file:
            reader = csv.reader(file)
            rows = [",".join(row) for row in reader]
        return "n".join(rows)
    except Exception as e:
        logging.error(f"Error reading CSV file: {e}")
        raise

def analyze_csv(csv_file: str, context_size: Optional[int] = None) -> str:
    try:
        logging.info(f"Reading CSV file: {csv_file}")
        csv_text = read_csv(csv_file)
        logging.info(f"Extracted CSV text: {csv_text[:100]}...")

        options = {}
        if context_size:
            options['max_tokens'] = context_size

        logging.info("Sending data to Ollama model...")
        response = ollama.chat(
            model='qwen2.5-coder:7b',
            messages=[{'role': 'user', 'content': PROMPT_TEMPLATE.format(data=csv_text)}],
            options=options
        )['message']['content']
        logging.info("Received response from Ollama model")

        return response

    except Exception as e:
        logging.error(f"An error occurred: {e}")
        raise

if __name__ == "__main__":
    parser = argparse.ArgumentParser(description='CSV AI Analyzer')
    parser.add_argument('csv_file', help='Input CSV file')
    parser.add_argument('--context-size', type=int, help='Optional context size (max tokens) for the Ollama model')
    args = parser.parse_args()

    try:
        report = analyze_csv(args.csv_file, args.context_size)
        print("nAnalysis Report:")
        print(report)
    except Exception as e:
        logging.error(f"Script failed: {e}")

Автор: Rikimaru22

Источник

Rambler's Top100