~/wiki / telegram-boty / telegram-channel-parser-filter

Как сделать парсер Telegram-каналов без мусора: фильтрация спама, дублей, вакансий и рекламы

Основной чат

Чат для вайбкодеров: новости, гайды, поиск исполнителей, маркетплейс и разбор реальных кейсов.

$ cd раздел/ $ join vibe dev
Как сделать парсер Telegram-каналов без мусора: фильтрация спама, дублей, вакансий и рекламы - обложка

Актуально для Telethon 1.43 и Pyrogram 2.x, июнь 2026.


Написать парсер Telegram-каналов — несложно. Написать парсер, который не захламит базу рекламой, одинаковыми форвардами и «требуется менеджер по продажам» — уже требует архитектурных решений.

Из коробки iter_messages отдаёт всё подряд: рекламные интеграции, форварды одного поста из десяти каналов сразу, технические сервисные сообщения, вакансии. Без слоя фильтрации вы собираете не контент, а шум.

В этой статье — полный разбор: как устроен парсер, какие фильтры нужны, как их реализовать без лишних зависимостей. С кодом, архитектурными решениями и объяснением, почему именно так.


Что такое «мусор» в контексте парсинга каналов

Прежде чем фильтровать — определим, что убираем:

Тип мусора Признак Почему проблема
Рекламные интеграции Нативный промопост, пометка sponsored Не контент канала, а чужой материал
Форварды message.fwd_from is not None Один пост тиражируется в десятках каналов
Дубли Похожий текст с отличием в эмодзи/пунктуации Засоряют выборку, ломают анализ
Вакансии Ключевые слова: «требуется», «зарплата», «оформление по ТК» Не тематический контент, фоновый шум
Сервисные сообщения message.action is not None «Пользователь вступил», «изменилось фото» — технические события
Короткие бессодержательные Длина < 50 символов «🔥🔥🔥», «Подписывайся!»
Запрещённые тематики Казино, крипто-хайп, микрозаймы Юридические и репутационные риски

Стек и настройка

Telethon vs Pyrogram: что выбрать

Оба работают поверх MTProto — собственного протокола Telegram. Принципиальная разница:

Telethon — асинхронная библиотека на Python, более зрелая, отличная документация, используется чаще для парсинга. Текущая стабильная версия 1.43.

Pyrogram — альтернатива с чуть более удобным API для новичков, активно развивается. Версия 2.x.

Для парсинга каналов с тяжёлой фильтрацией разница несущественна. В статье — примеры на Telethon, так как он распространённее в этом сценарии.

Получение API-ключей

Парсер работает через MTProto как пользовательский аккаунт, а не бот. Ботам iter_messages недоступен для чужих каналов.

  1. Идём на my.telegram.org
  2. Логинимся номером телефона
  3. «API development tools» → создаём приложение
  4. Получаем api_id (число) и api_hash (строка)

Важно: используйте отдельный аккаунт для парсинга, не основной. При агрессивном парсинге аккаунт могут ограничить.

Установка

bash
pip install telethon asyncio aiofiles python-dotenv
# Для хранения в SQLite
pip install aiosqlite
# Для симхэша (дедупликация)
pip install simhash
plaintext
# .env
API_ID=12345678
API_HASH=your_api_hash_here
SESSION_NAME=parser_session

Базовый парсер без фильтров

Сначала — рабочий скелет, потом добавим фильтры слоями.

python
import asyncio
import os
from telethon import TelegramClient
from dotenv import load_dotenv

load_dotenv()

client = TelegramClient(
    os.getenv('SESSION_NAME'),
    int(os.getenv('API_ID')),
    os.getenv('API_HASH')
)

CHANNELS = [
    'https://t.me/example_channel_1',
    'https://t.me/example_channel_2',
]

async def parse_channel(channel_url: str, limit: int = 500):
    messages = []
    async for message in client.iter_messages(channel_url, limit=limit):
        messages.append(message)
    return messages

async def main():
    await client.start()
    for channel in CHANNELS:
        msgs = await parse_channel(channel)
        print(f'{channel}: получено {len(msgs)} сообщений')

with client:
    client.loop.run_until_complete(main())

Это работает — но вернёт всё подряд. Переходим к фильтрации.


Слой 1: фильтрация по структуре сообщения

Первый и самый дешёвый слой — проверки по метаданным объекта Message, без анализа текста.

python
def is_structural_garbage(message) -> tuple[bool, str]:
    """
    Возвращает (True, причина) если сообщение — структурный мусор.
    Проверки от дешёвых к дорогим.
    """

    # Пустое сообщение (только медиа без подписи, или сервисное)
    if not message.text and not message.caption:
        return True, 'no_text'

    # Сервисное сообщение: вступление, смена фото, пин и т.д.
    if message.action is not None:
        return True, 'service_action'

    # Форвард — чужой контент
    if message.fwd_from is not None:
        return True, 'forward'

    text = message.text or message.caption or ''

    # Слишком короткое — скорее всего анонс или эмодзи-реакция
    if len(text.strip()) < 50:
        return True, 'too_short'

    # Спонсированное сообщение (реклама через Telegram Ads)
    # Помечается флагом sponsored в некоторых версиях API
    if getattr(message, 'sponsored', False):
        return True, 'sponsored'

    return False, ''

Этот фильтр синхронный и быстрый — применяем первым, до любых других проверок.


Слой 2: фильтрация вакансий

Вакансии — самый популярный нежелательный тип контента в тематических IT-каналах. Простой словарный фильтр покрывает 90% случаев.

python
import re

VACANCY_PATTERNS = [
    # Прямые маркеры вакансий
    r'требу[ею]тся?',
    r'открыт[аы]?\s+вакансия',
    r'вакансия\s*:',
    r'ищем\s+(разработчика|специалиста|менеджера|аналитика)',
    r'приглашаем\s+(в\s+команду|разработчика|специалиста)',
    # Условия работы
    r'оформление\s+по\s+тк',
    r'официальное\s+трудоустройство',
    r'полный\s+(рабочий\s+)?день',
    r'удалённая?\s+работа',
    r'гибридный\s+формат',
    # Зарплата
    r'зарплата\s+от',
    r'грейд\s*(junior|middle|senior)',
    r'\d+\s*(тыс|к)\s*/?\s*мес',
    r'доход\s+от\s+\d+',
    # HR-маркеры
    r'резюме\s+на',
    r'hr@',
    r'откликнуться\s+в\s+лс',
    r'пишите\s+в\s+лс',
    r'стек\s*:',
    r'требования\s*:',
    r'обязанности\s*:',
    r'условия\s*:',
]

_vacancy_re = re.compile(
    '|'.join(VACANCY_PATTERNS),
    re.IGNORECASE | re.UNICODE
)

def is_vacancy(text: str) -> bool:
    return bool(_vacancy_re.search(text))

Словарный подход хорошо работает для однозначных случаев, но даёт ложные срабатывания на «ищем решение задачи» или «требуется внимательность». Балансируйте порог под свою аудиторию: для узкоспециализированных каналов точность высокая, для общих — может потребоваться тонкая настройка.


Слой 3: фильтрация запрещённых тематик

Запрещённые тематики — казино, нелегальные финансовые схемы, псевдо-инвестиции, контент для взрослых.

python
FORBIDDEN_PATTERNS = {
    'gambling': [
        r'казино', r'ставки\s+на\s+спорт', r'онлайн.?казино',
        r'слоты', r'рулетка', r'1xbet', r'1win', r'melbet',
        r'бонус\s+за\s+депозит',
    ],
    'crypto_scam': [
        r'х\d+\s+за\s+\d+\s+дней',
        r'пассивный\s+доход\s+от',
        r'вложи\s+и\s+получи',
        r'майнинг.{0,20}доход',
        r'крипт[оа].{0,20}заработ',
        r'pump.{0,10}группа',
        r'инсайд.{0,10}сигнал',
    ],
    'loans': [
        r'займ\s+без\s+отказа',
        r'микрозайм',
        r'мфо',
        r'кредит\s+без\s+справок',
        r'деньги\s+в\s+долг\s+срочно',
    ],
    'adult': [
        r'18\+.{0,20}(фото|видео|контент)',
        r'только\s+для\s+взрослых',
    ],
}

_forbidden_res = {
    category: re.compile('|'.join(patterns), re.IGNORECASE | re.UNICODE)
    for category, patterns in FORBIDDEN_PATTERNS.items()
}

def get_forbidden_category(text: str) -> str | None:
    for category, pattern in _forbidden_res.items():
        if pattern.search(text):
            return category
    return None

Этот словарь не универсален — он стартовая точка. Под каждый проект нужно дополнять его на реальных данных.


Слой 4: дедупликация через симхэш

Самый технически интересный слой. Задача: определить, что два текста — это фактически одно и то же, даже если они различаются эмодзи, пунктуацией или небольшими правками.

Почему не MD5? Хеш изменится при изменении хотя бы одного символа. «Привет, мир!» и «Привет мир» — разные MD5, но один и тот же контент.

SimHash — алгоритм для нечёткого поиска дублей. Похожие тексты дают похожие хеши (разница в нескольких битах). Использовался в Google для дедупликации веб-страниц.

python
from simhash import Simhash

def text_to_simhash(text: str) -> int:
    """Нормализуем текст и считаем симхэш."""
    # Убираем всё кроме слов — эмодзи, пунктуация, лишние пробелы
    normalized = re.sub(r'[^\w\s]', ' ', text.lower())
    normalized = re.sub(r'\s+', ' ', normalized).strip()
    # Разбиваем на токены (слова и биграммы для лучшей точности)
    words = normalized.split()
    tokens = words + [f'{a} {b}' for a, b in zip(words, words[1:])]
    return Simhash(tokens).value

def simhash_distance(h1: int, h2: int) -> int:
    """Расстояние Хэмминга между двумя симхэшами."""
    xor = h1 ^ h2
    return bin(xor).count('1')

class DuplicateDetector:
    def __init__(self, threshold: int = 5):
        # threshold: максимальное расстояние Хэмминга для «дубля»
        # 0 = точное совпадение, 5 = тексты ~90% похожи, 10 = ~70%
        self.threshold = threshold
        self.seen_hashes: list[tuple[int, int]] = []  # (message_id, simhash)

    def is_duplicate(self, message_id: int, text: str) -> bool:
        h = text_to_simhash(text)
        for seen_id, seen_hash in self.seen_hashes:
            if seen_id == message_id:
                continue
            if simhash_distance(h, seen_hash) <= self.threshold:
                return True
        self.seen_hashes.append((message_id, h))
        return False

При большой базе линейный поиск по seen_hashes медленный. Для тысяч записей используйте SimHash LSH (Locality-Sensitive Hashing) или храните хеши в PostgreSQL с поиском по битовому расстоянию.


Слой 5: фильтрация рекламных интеграций

Нативная реклама сложнее для обнаружения — она написана как обычный пост. Несколько признаков, которые работают в комбинации:

python
AD_PATTERNS = [
    # Призывы и UTM-паттерны
    r'переходи\s+по\s+ссылке',
    r'жми\s+сюда',
    r'utm_source',
    r'utm_medium',
    r'utm_campaign',
    # Партнёрские маркеры
    r'реклама',
    r'рекламный\s+пост',
    r'на\s+правах\s+рекламы',
    r'партнёрский\s+материал',
    r'промо',
    # Реферальные паттерны
    r'промокод\s+[A-Z0-9]{3,}',
    r'скидка\s+\d+%\s+по\s+промокоду',
    # Характерные конструкции
    r'подписывайся\s+на\s+канал',
    r'подпишись\s+на\s+@\w+',
    r'переходи\s+в\s+@\w+',
]

_ad_re = re.compile('|'.join(AD_PATTERNS), re.IGNORECASE | re.UNICODE)

def looks_like_ad(message) -> bool:
    text = message.text or message.caption or ''

    # Прямые маркеры
    if _ad_re.search(text):
        return True

    # Кнопки с UTM или реферальными ссылками
    if message.reply_markup:
        for row in (message.reply_markup.rows or []):
            for button in (row.buttons or []):
                url = getattr(button, 'url', '') or ''
                if 'utm_' in url or any(
                    p in url for p in ['ref=', 'promo=', 'aff=']
                ):
                    return True

    return False

Сборка: полный pipeline фильтрации

Объединяем все слои в единый pipeline с логированием причин отклонения.

python
import asyncio
import logging
from dataclasses import dataclass
from datetime import datetime

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger('tg_parser')

@dataclass
class ParsedMessage:
    message_id: int
    channel: str
    text: str
    date: datetime
    views: int | None
    simhash: int

class TelegramParser:
    def __init__(self, channels: list[str], limit_per_channel: int = 1000):
        self.channels = channels
        self.limit = limit_per_channel
        self.dedup = DuplicateDetector(threshold=5)
        self.stats = {
            'total': 0,
            'passed': 0,
            'filtered': {},
        }

    def _record_filtered(self, reason: str):
        self.stats['filtered'][reason] = self.stats['filtered'].get(reason, 0) + 1

    def filter_message(self, message, channel: str) -> ParsedMessage | None:
        self.stats['total'] += 1
        text = message.text or message.caption or ''

        # Слой 1: структурный мусор
        is_garbage, reason = is_structural_garbage(message)
        if is_garbage:
            self._record_filtered(reason)
            return None

        # Слой 2: реклама
        if looks_like_ad(message):
            self._record_filtered('ad')
            return None

        # Слой 3: вакансии
        if is_vacancy(text):
            self._record_filtered('vacancy')
            return None

        # Слой 4: запрещённые тематики
        forbidden = get_forbidden_category(text)
        if forbidden:
            self._record_filtered(f'forbidden_{forbidden}')
            return None

        # Слой 5: дубли
        if self.dedup.is_duplicate(message.id, text):
            self._record_filtered('duplicate')
            return None

        self.stats['passed'] += 1
        return ParsedMessage(
            message_id=message.id,
            channel=channel,
            text=text,
            date=message.date,
            views=getattr(message, 'views', None),
            simhash=text_to_simhash(text),
        )

    async def parse_channel(self, channel: str) -> list[ParsedMessage]:
        results = []
        logger.info(f'Парсим {channel}...')
        try:
            async for message in client.iter_messages(channel, limit=self.limit):
                parsed = self.filter_message(message, channel)
                if parsed:
                    results.append(parsed)
                # Небольшая пауза между сообщениями — флоуд-контроль
                await asyncio.sleep(0.05)
        except Exception as e:
            logger.error(f'Ошибка парсинга {channel}: {e}')
        return results

    async def run(self) -> list[ParsedMessage]:
        all_messages = []
        for channel in self.channels:
            msgs = await self.parse_channel(channel)
            all_messages.extend(msgs)
            # Пауза между каналами
            await asyncio.sleep(2)

        logger.info(f'Итого: {self.stats["total"]} сообщений, прошло фильтры: {self.stats["passed"]}')
        logger.info(f'Отфильтровано: {self.stats["filtered"]}')
        return all_messages

Флоуд-контроль и лимиты Telegram

Telegram ограничивает частоту запросов. При агрессивном парсинге аккаунт получает FloodWaitError — временную блокировку.

python
from telethon.errors import FloodWaitError

async def safe_iter_messages(channel, limit=1000, pause=0.1):
    """iter_messages с автоматическим retry при FloodWait."""
    retries = 0
    async for message in client.iter_messages(channel, limit=limit):
        try:
            yield message
            await asyncio.sleep(pause)
        except FloodWaitError as e:
            wait_seconds = e.seconds + 5
            logger.warning(f'FloodWait: ждём {wait_seconds} секунд...')
            await asyncio.sleep(wait_seconds)
            retries += 1
            if retries > 5:
                logger.error('Слишком много FloodWait, прерываем канал')
                break

Telethon умеет работать в режиме Takeout Session, при котором некоторые лимиты ниже. Полезно для массового экспорта данных:

python
async with client.takeout(messages=True) as takeout:
    async for message in takeout.iter_messages(channel, wait_time=0.5):
        # обрабатываем
        pass

Хранение: SQLite для локальной разработки

python
import aiosqlite

CREATE_TABLE = """
CREATE TABLE IF NOT EXISTS messages (
    id          INTEGER PRIMARY KEY,
    message_id  INTEGER NOT NULL,
    channel     TEXT NOT NULL,
    text        TEXT NOT NULL,
    date        TEXT NOT NULL,
    views       INTEGER,
    simhash     INTEGER NOT NULL,
    created_at  TEXT DEFAULT (datetime('now')),
    UNIQUE(channel, message_id)
);
CREATE INDEX IF NOT EXISTS idx_simhash ON messages(simhash);
CREATE INDEX IF NOT EXISTS idx_channel_date ON messages(channel, date);
"""

async def save_messages(messages: list[ParsedMessage], db_path: str = 'parser.db'):
    async with aiosqlite.connect(db_path) as db:
        await db.executescript(CREATE_TABLE)
        await db.executemany(
            """
            INSERT OR IGNORE INTO messages
                (message_id, channel, text, date, views, simhash)
            VALUES (?, ?, ?, ?, ?, ?)
            """,
            [
                (m.message_id, m.channel, m.text,
                 m.date.isoformat(), m.views, m.simhash)
                for m in messages
            ]
        )
        await db.commit()

Для продакшена — PostgreSQL с индексом по simhash. Поиск дублей в базе становится быстрым SQL-запросом вместо цикла в памяти:

sql
-- Найти все сообщения с похожим симхэшем (расстояние ≤ 5 бит)
-- В PostgreSQL нет битового расстояния из коробки, используем функцию
CREATE OR REPLACE FUNCTION hamming_distance(a BIGINT, b BIGINT)
RETURNS INTEGER AS $$
BEGIN
    RETURN bit_count((a # b)::bit(64));
END;
$$ LANGUAGE plpgsql IMMUTABLE;

-- Найти потенциальные дубли для конкретного симхэша
SELECT channel, text, date
FROM messages
WHERE hamming_distance(simhash, $1) <= 5
  AND message_id != $2
LIMIT 20;

Реалтайм-парсинг: слушаем новые сообщения

Помимо ретроспективного парсинга через iter_messages, можно подписаться на новые сообщения через события:

python
from telethon import events

WATCH_CHANNELS = ['https://t.me/channel1', 'https://t.me/channel2']

parser = TelegramParser(channels=WATCH_CHANNELS)

@client.on(events.NewMessage(chats=WATCH_CHANNELS))
async def handle_new_message(event):
    channel = event.chat.username or str(event.chat_id)
    parsed = parser.filter_message(event.message, channel)
    if parsed:
        await save_messages([parsed])
        logger.info(f'Новое сообщение из {channel}: {parsed.text[:80]}...')

async def main():
    await client.start()
    logger.info('Слушаем новые сообщения...')
    await client.run_until_disconnected()

with client:
    client.loop.run_until_complete(main())

Осторожно с slowmode_enabled — если в группе включён медленный режим, часть событий NewMessage может не приходить. Решение — периодически вызывать client.catch_up():

python
async def periodic_catchup():
    while True:
        await client.catch_up()
        await asyncio.sleep(30)

Вайбкодинг: что поручить Claude Code

Полный парсер — хороший кандидат для вайбкодинга. Несколько задач, которые Claude Code и Codex решают хорошо:

Расширение словарей фильтрации:

plaintext
У меня есть список вакансий из Telegram-каналов.
Проанализируй 50 примеров и составь regex-паттерны,
которые покрывают все случаи, но не дают ложных срабатываний
на обычный технический контент. Результат — Python-список строк для re.compile.

Классификация тематик через Claude API:

python
# Для сложных случаев — классификация через API вместо словарей
import anthropic

claude = anthropic.Anthropic()

def classify_message(text: str) -> dict:
    response = claude.messages.create(
        model="claude-sonnet-4-20250514",
        max_tokens=100,
        messages=[{
            "role": "user",
            "content": f"""Классифицируй это сообщение из Telegram-канала.
Ответь ТОЛЬКО JSON: {{"is_ad": bool, "is_vacancy": bool, "category": str, "confidence": float}}
Категории: tech_news, tutorial, opinion, tool_review, other
Сообщение: {text[:500]}"""
        }]
    )
    import json
    return json.loads(response.content[0].text)

Это дорого при большом объёме — используйте для граничных случаев, когда словари дают неуверенный результат.


Типичные ошибки и как их избежать

Ошибка Последствие Решение
Парсить основным аккаунтом Бан, потеря доступа Отдельный аккаунт для парсинга
Нет задержки между запросами FloodWaitError, временный бан asyncio.sleep(0.05–0.5) между сообщениями
Хранить симхэши только в памяти При перезапуске дубли снова проходят Сохранять симхэши в БД, загружать при старте
Сравнивать строки через == для дедупликации Пропускать очевидные вариации Нормализовать + SimHash
Парсить закрытые каналы без вступления ChannelPrivateError Аккаунт должен быть участником
Игнорировать message.action Сервисные сообщения засоряют базу Всегда проверять is not None
Фиксированный лимит дат Пропускать новые сообщения при повторном запуске Хранить last_parsed_id и использовать min_id

Инкрементальный парсинг: не собирать одно дважды

При повторном запуске парсить с нуля — расточительно. Правильный подход — запоминать последний обработанный message_id и при следующем запуске начинать с него:

python
async def get_last_message_id(db_path: str, channel: str) -> int:
    async with aiosqlite.connect(db_path) as db:
        cursor = await db.execute(
            'SELECT MAX(message_id) FROM messages WHERE channel = ?',
            (channel,)
        )
        row = await cursor.fetchone()
        return row[0] or 0

async def parse_channel_incremental(channel: str, db_path: str):
    min_id = await get_last_message_id(db_path, channel)
    results = []
    async for message in client.iter_messages(channel, min_id=min_id):
        parsed = parser.filter_message(message, channel)
        if parsed:
            results.append(parsed)
        await asyncio.sleep(0.05)
    if results:
        await save_messages(results, db_path)
    logger.info(f'{channel}: +{len(results)} новых сообщений (с ID > {min_id})')

Чеклист готового парсера

plaintext
Настройка:
☐ Отдельный Telegram-аккаунт (не основной)
☐ api_id и api_hash в .env, не в коде
☐ Сессионный файл вне репозитория (в .gitignore)

Фильтры:
☐ Структурный: отклонение пустых, сервисных, слишком коротких
☐ Форварды: message.fwd_from is not None → пропуск
☐ Реклама: паттерны + проверка кнопок на UTM
☐ Вакансии: regex по ключевым словам HR-тематики
☐ Запрещённые тематики: gambling, crypto_scam, adult, loans
☐ Дедупликация: SimHash с порогом 5 бит

Флоуд-контроль:
☐ asyncio.sleep между сообщениями (0.05–0.5 сек)
☐ asyncio.sleep между каналами (2–5 сек)
☐ Обработка FloodWaitError с ожиданием e.seconds

Хранение:
☐ UNIQUE(channel, message_id) — нет повторного сохранения
☐ simhash сохраняется в БД — дедупликация между сессиями
☐ Инкрементальный парсинг через min_id

Мониторинг:
☐ Логирование причин отклонения каждого фильтра
☐ Итоговая статистика: всего / прошло / по категориям

Итог

Хороший парсер Telegram-каналов — это не iter_messages + сохранить в базу. Это многослойная система фильтрации, где каждый слой убирает свой тип мусора с минимальными ресурсами.

Порядок слоёв важен: сначала дешёвые структурные проверки (нет текста, форвард, сервисное), потом паттернные (реклама, вакансии, запрещённое), потом дорогая дедупликация через SimHash. Это минимизирует нагрузку — большинство мусора уходит на первых проверках.

Начните с малого: возьмите два-три канала, запустите парсер, посмотрите на статистику отфильтрованных сообщений. Настройте пороги под свои данные — универсальный конфиг не существует, всё зависит от тематики каналов и ваших требований к чистоте данных.


FAQ

Законно ли парсить Telegram-каналы? Публичные каналы технически доступны всем. Telegram ограничивает массовый сбор пользователей из групп и рассылку им сообщений — это явно против правил. Парсинг контента публичных каналов для анализа или агрегации находится в серой зоне: Telegram это не поощряет, но прямого запрета на чтение открытого контента нет. Персональные данные пользователей — другая история, там применяется ФЗ-152.

Можно ли парсить через бота, без пользовательского аккаунта? Бот может читать только те сообщения, которые отправлены в чат, где он является участником. iter_messages для чужих публичных каналов через Bot API недоступен. MTProto через Telethon/Pyrogram с пользовательским аккаунтом — единственный вариант.

Как парсить закрытые каналы? Аккаунт должен быть участником. Для приватных каналов по инвайт-ссылке — await client.join_channel(invite_link).

Почему SimHash, а не MD5 или SHA256? MD5 и SHA256 меняются при любом изменении текста. «Привет мир» и «Привет, мир!» — разные хеши. SimHash даёт похожие значения для похожих текстов — именно это нужно для дедупликации контента с мелкими вариациями.

Как масштабировать на сотни каналов? Несколько пользовательских аккаунтов с распределением каналов между ними. Celery или asyncio.Queue для очереди задач. PostgreSQL с партиционированием таблицы по дате. Redis для хранения SimHash-индекса в памяти с быстрым поиском.

$ cd ../ ← назад к Telegram-боты