~/wiki / telegram-boty / telegram-channel-parser-filter

How to make a parser of Telegram channels without garbage: filtering spam, doubles, vacancies and advertising

Main chat

A chat for vibe coders: news, guides, live cases, marketplace, and finding executors.

$ cd section/ $ join vibe dev
How to make a parser of Telegram channels without garbage: filtering spam, doubles, vacancies and advertising - обложка

*Relevant for Telethon 1.43 and Pyrogram 2.x, June 2026 *


Writing a parser of Telegram channels is not difficult. To write a parser that does not clutter the base with advertising, the same forwards and “requires a sales manager” – already requires architectural solutions.

Out of the box, iter_messages gives everything in a row: advertising integrations, forwards of one post from ten channels at once, technical service messages, vacancies. Without a layer of filtering, you’re not collecting content, but noise.

In this article - a complete analysis: how the parser works, what filters are needed, how to implement them without unnecessary dependencies. With code, architectural solutions and an explanation of why.


What is “junk” in the context of canal parsing

Before filtering, let us determine what we are removing:

Тип мусора Признак Почему проблема
Рекламные интеграции Нативный промопост, пометка sponsored Не контент канала, а чужой материал
Форварды message.fwd_from is not None Один пост тиражируется в десятках каналов
Дубли Похожий текст с отличием в эмодзи/пунктуации Засоряют выборку, ломают анализ
Вакансии Ключевые слова: «требуется», «зарплата», «оформление по ТК» Не тематический контент, фоновый шум
Сервисные сообщения message.action is not None «Пользователь вступил», «изменилось фото» — технические события
Короткие бессодержательные Длина < 50 символов «🔥🔥🔥», «Подписывайся!»
Запрещённые тематики Казино, крипто-хайп, микрозаймы Юридические и репутационные риски

Stack and setup

Telethon vs Pyrogram: What to Choose

Both run on top of MTProto, Telegram’s own protocol. The fundamental difference:

Telethon is an asynchronous library in Python, more mature, better documentation, used more often for parsing. Current stable version 1.43.

Pyrogram is an alternative with a slightly more user-friendly API for beginners. Version 2.x.

For parsing channels with heavy filtration, the difference is insignificant. The article shows examples on Telethon, as it is more common in this scenario.

Obtaining API keys

The parser works through MTProto as a user account, not a bot. iter_messages is not available for other channels.

  1. Go to my.telegram.org
  2. Log in on the phone number
  3. “API Development Tools” Create an Application
  4. We get api_id (number) and api_hash (line)

Use a separate account for parsing, not the main one. With aggressive parsing, the account may be restricted.

Installation

bash
pip install telethon asyncio aiofiles python-dotenv
# Для хранения в SQLite
pip install aiosqlite
# Для симхэша (дедупликация)
pip install simhash
plaintext
# .env
API_ID=12345678
API_HASH=your_api_hash_here
SESSION_NAME=parser_session

Basic parser without filters

First, the working skeleton, then add the filters in layers.

python
import asyncio
import os
from telethon import TelegramClient
from dotenv import load_dotenv

load_dotenv()

client = TelegramClient(
    os.getenv('SESSION_NAME'),
    int(os.getenv('API_ID')),
    os.getenv('API_HASH')
)

CHANNELS = [
    'https://t.me/example_channel_1',
    'https://t.me/example_channel_2',
]

async def parse_channel(channel_url: str, limit: int = 500):
    messages = []
    async for message in client.iter_messages(channel_url, limit=limit):
        messages.append(message)
    return messages

async def main():
    await client.start()
    for channel in CHANNELS:
        msgs = await parse_channel(channel)
        print(f'{channel}: получено {len(msgs)} сообщений')

with client:
    client.loop.run_until_complete(main())

It works, but it will bring it back. Let's move on to filtering.


Layer 1: filtering by message structure

The first and cheapest layer is to check the metadata of the Message object without text analysis.

python
def is_structural_garbage(message) -> tuple[bool, str]:
    """
    Возвращает (True, причина) если сообщение — структурный мусор.
    Проверки от дешёвых к дорогим.
    """

    # Пустое сообщение (только медиа без подписи, или сервисное)
    if not message.text and not message.caption:
        return True, 'no_text'

    # Сервисное сообщение: вступление, смена фото, пин и т.д.
    if message.action is not None:
        return True, 'service_action'

    # Форвард — чужой контент
    if message.fwd_from is not None:
        return True, 'forward'

    text = message.text or message.caption or ''

    # Слишком короткое — скорее всего анонс или эмодзи-реакция
    if len(text.strip()) < 50:
        return True, 'too_short'

    # Спонсированное сообщение (реклама через Telegram Ads)
    # Помечается флагом sponsored в некоторых версиях API
    if getattr(message, 'sponsored', False):
        return True, 'sponsored'

    return False, ''

This filter is synchronous and fast - apply first, before any other checks.


Layer 2: vacancy filtering

Jobs are the most popular unwanted type of content in thematic IT channels. A simple vocabulary filter covers 90% of cases.

python
import

VACANCY PATTERNS = [
# Direct job markers
R'required?
R'Are you open?\S+Vacancy?
r'vacancy\s*:',
r'we are looking for \s+ (developer |Manager |Analytics)',
r'invite \s+(in\s+team |developer |specialist)',
Working conditions
r'formation\s+by\s+tc',
R'Official \s + Employment',
r'complete\s+(work\s+)?day',
R'remote? \s+work',
r'hybrid\s+format',
#Salary
r's salary\s+from',
r'grade\s*(junior|middle|senior)',
r'\d+\s*(thousand|k)\s*/?\s*me',
r'Income\s+ from\s+\d+',
#HR markers
r'summary\s+on',
r'hr@',
r'response\s+in\s+ls',
r'write\s+in\s+l',
r'stack\s*:',
r'requirements\s*:',
r'obligations\s*:',
r'conditions\s*:',
]

vacancy re = re.compile()
'|'.join (VACANCY PATTERNS),
re.IGNORECASE | re.UNICODE
)

def is vacancy(text: str) -> bool:
return bool( vacancy re.search(text))

The dictionary approach works well for unambiguous cases, but gives false positives on "looking for a solution to the problem" or "mindfulness is required." Balance the threshold for your audience: for highly specialized channels, accuracy is high, for general channels, fine-tuning may be required.


Layer 3: Filtering Prohibited Subjects

Prohibited topics include casinos, illegal financial schemes, pseudo-investments, adult content.

python
FORBIDDEN_PATTERNS = {
    'gambling': [
        r'казино', r'ставки\s+на\s+спорт', r'онлайн.?казино',
        r'слоты', r'рулетка', r'1xbet', r'1win', r'melbet',
        r'бонус\s+за\s+депозит',
    ],
    'crypto_scam': [
        r'х\d+\s+за\s+\d+\s+дней',
        r'пассивный\s+доход\s+от',
        r'вложи\s+и\s+получи',
        r'майнинг.{0,20}доход',
        r'крипт[оа].{0,20}заработ',
        r'pump.{0,10}группа',
        r'инсайд.{0,10}сигнал',
    ],
    'loans': [
        r'займ\s+без\s+отказа',
        r'микрозайм',
        r'мфо',
        r'кредит\s+без\s+справок',
        r'деньги\s+в\s+долг\s+срочно',
    ],
    'adult': [
        r'18\+.{0,20}(фото|видео|контент)',
        r'только\s+для\s+взрослых',
    ],
}

_forbidden_res = {
    category: re.compile('|'.join(patterns), re.IGNORECASE | re.UNICODE)
    for category, patterns in FORBIDDEN_PATTERNS.items()
}

def get_forbidden_category(text: str) -> str | None:
    for category, pattern in _forbidden_res.items():
        if pattern.search(text):
            return category
    return None

This dictionary is not universal - it is a starting point. Each project should be supplemented with real data.


Layer 4: Deduplication through Simhash

The most technically interesting layer. Objective: Determine that two texts are actually the same, even if they differ in emoji, punctuation, or small edits.

Why not MD5? The hash will change if you change at least one character. “Hello World” and “Hello World” are different MD5s, but the same content.

SimHash is an algorithm for fuzzy search of duplicates. Similar texts give similar hashes (the difference in several bits). Used by Google to deduplication web pages.

python
from simhash import Simhash

def text_to_simhash(text: str) -> int:
    """Нормализуем текст и считаем симхэш."""
    # Убираем всё кроме слов — эмодзи, пунктуация, лишние пробелы
    normalized = re.sub(r'[^\w\s]', ' ', text.lower())
    normalized = re.sub(r'\s+', ' ', normalized).strip()
    # Разбиваем на токены (слова и биграммы для лучшей точности)
    words = normalized.split()
    tokens = words + [f'{a} {b}' for a, b in zip(words, words[1:])]
    return Simhash(tokens).value

def simhash_distance(h1: int, h2: int) -> int:
    """Расстояние Хэмминга между двумя симхэшами."""
    xor = h1 ^ h2
    return bin(xor).count('1')

class DuplicateDetector:
    def __init__(self, threshold: int = 5):
        # threshold: максимальное расстояние Хэмминга для «дубля»
        # 0 = точное совпадение, 5 = тексты ~90% похожи, 10 = ~70%
        self.threshold = threshold
        self.seen_hashes: list[tuple[int, int]] = []  # (message_id, simhash)

    def is_duplicate(self, message_id: int, text: str) -> bool:
        h = text_to_simhash(text)
        for seen_id, seen_hash in self.seen_hashes:
            if seen_id == message_id:
                continue
            if simhash_distance(h, seen_hash) <= self.threshold:
                return True
        self.seen_hashes.append((message_id, h))
        return False

With a large base, linear search on seen_hashes is slow. For thousands of records, use SimHash LSH (Locality-Sensitive Hashing) or store hashes in PostgreSQL with a bit distance search.


Layer 5: Filtering Advertising Integrations

Native adverts are more difficult to detect – they are written as a regular post. Several signs that work in combination:

python
AD PATTERNS = []
# Calls and UTM patterns
R'click \s+ on \s+link',
r'click here',
r'utm source',
r'utm medium',
r'utm campaign',
#Partner markers
r'advertising',
r'advertising\s+post',
r'on\s+rights\s+advertising',
r'partner's/s+material',
r'promo',
#Referral patterns
r'promocode\s+[A-Z0-9]{3,}',
r'discount\s+\d+%\s+ by\s+promo code',
# Characteristic designs
r'Subscribe \s+ to \s+channel',
r's+ to s+@\w+',
r' go \s+ to \s+@\w+',
]

ad re = re.compile('|'.join(AD PATTERNS), re.IGNORECASE | re.UNICODE)

def looks like ad(message) -> bool:
text = message.text or message.caption or '

# Direct markers
if ad re.search(text):
True, true return.

# buttons with UTM or referral links
if message.reply markup:
for row in (message.reply markup.rows or []):
for button in (row.buttons or []):
url = getattr(button, 'url', '' or '')
if 'utm ' in url or any()
p in url for p in ['ref=', 'promo=', 'aff=']
:
True, true return.

return

Assembly: Full filtration pipeline

Combine all layers into a single pipeline with logging the causes of deviation.

python
import asyncio
import logging
from dataclasses import dataclass
from datetime import datetime

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger('tg_parser')

@dataclass
class ParsedMessage:
    message_id: int
    channel: str
    text: str
    date: datetime
    views: int | None
    simhash: int

class TelegramParser:
    def __init__(self, channels: list[str], limit_per_channel: int = 1000):
        self.channels = channels
        self.limit = limit_per_channel
        self.dedup = DuplicateDetector(threshold=5)
        self.stats = {
            'total': 0,
            'passed': 0,
            'filtered': {},
        }

    def _record_filtered(self, reason: str):
        self.stats['filtered'][reason] = self.stats['filtered'].get(reason, 0) + 1

    def filter_message(self, message, channel: str) -> ParsedMessage | None:
        self.stats['total'] += 1
        text = message.text or message.caption or ''

        # Слой 1: структурный мусор
        is_garbage, reason = is_structural_garbage(message)
        if is_garbage:
            self._record_filtered(reason)
            return None

        # Слой 2: реклама
        if looks_like_ad(message):
            self._record_filtered('ad')
            return None

        # Слой 3: вакансии
        if is_vacancy(text):
            self._record_filtered('vacancy')
            return None

        # Слой 4: запрещённые тематики
        forbidden = get_forbidden_category(text)
        if forbidden:
            self._record_filtered(f'forbidden_{forbidden}')
            return None

        # Слой 5: дубли
        if self.dedup.is_duplicate(message.id, text):
            self._record_filtered('duplicate')
            return None

        self.stats['passed'] += 1
        return ParsedMessage(
            message_id=message.id,
            channel=channel,
            text=text,
            date=message.date,
            views=getattr(message, 'views', None),
            simhash=text_to_simhash(text),
        )

    async def parse_channel(self, channel: str) -> list[ParsedMessage]:
        results = []
        logger.info(f'Парсим {channel}...')
        try:
            async for message in client.iter_messages(channel, limit=self.limit):
                parsed = self.filter_message(message, channel)
                if parsed:
                    results.append(parsed)
                # Небольшая пауза между сообщениями — флоуд-контроль
                await asyncio.sleep(0.05)
        except Exception as e:
            logger.error(f'Ошибка парсинга {channel}: {e}')
        return results

    async def run(self) -> list[ParsedMessage]:
        all_messages = []
        for channel in self.channels:
            msgs = await self.parse_channel(channel)
            all_messages.extend(msgs)
            # Пауза между каналами
            await asyncio.sleep(2)

        logger.info(f'Итого: {self.stats["total"]} сообщений, прошло фильтры: {self.stats["passed"]}')
        logger.info(f'Отфильтровано: {self.stats["filtered"]}')
        return all_messages

Floud Control and Telegram Limits

Telegram limits the frequency of requests. When aggressive parsing, the account receives FloodWaitError - a temporary blocking.

python
from telethon.errors import FloodWaitError

async def safe_iter_messages(channel, limit=1000, pause=0.1):
    """iter_messages с автоматическим retry при FloodWait."""
    retries = 0
    async for message in client.iter_messages(channel, limit=limit):
        try:
            yield message
            await asyncio.sleep(pause)
        except FloodWaitError as e:
            wait_seconds = e.seconds + 5
            logger.warning(f'FloodWait: ждём {wait_seconds} секунд...')
            await asyncio.sleep(wait_seconds)
            retries += 1
            if retries > 5:
                logger.error('Слишком много FloodWait, прерываем канал')
                break

Telethon can work in Takeout Session mode, where some limits are lower. Useful for mass data export:

python
async with client.takeout(messages=True) as takeout:
    async for message in takeout.iter_messages(channel, wait_time=0.5):
        # обрабатываем
        pass

Storage: SQLite for local development

python
import aiosqlite

CREATE_TABLE = """
CREATE TABLE IF NOT EXISTS messages (
    id          INTEGER PRIMARY KEY,
    message_id  INTEGER NOT NULL,
    channel     TEXT NOT NULL,
    text        TEXT NOT NULL,
    date        TEXT NOT NULL,
    views       INTEGER,
    simhash     INTEGER NOT NULL,
    created_at  TEXT DEFAULT (datetime('now')),
    UNIQUE(channel, message_id)
);
CREATE INDEX IF NOT EXISTS idx_simhash ON messages(simhash);
CREATE INDEX IF NOT EXISTS idx_channel_date ON messages(channel, date);
"""

async def save_messages(messages: list[ParsedMessage], db_path: str = 'parser.db'):
    async with aiosqlite.connect(db_path) as db:
        await db.executescript(CREATE_TABLE)
        await db.executemany(
            """
            INSERT OR IGNORE INTO messages
                (message_id, channel, text, date, views, simhash)
            VALUES (?, ?, ?, ?, ?, ?)
            """,
            [
                (m.message_id, m.channel, m.text,
                 m.date.isoformat(), m.views, m.simhash)
                for m in messages
            ]
        )
        await db.commit()

For production - PostgreSQL indexed by simhash. Finding duplicates in the database becomes a quick SQL query instead of a loop in memory:

sql
-- Найти все сообщения с похожим симхэшем (расстояние ≤ 5 бит)
-- В PostgreSQL нет битового расстояния из коробки, используем функцию
CREATE OR REPLACE FUNCTION hamming_distance(a BIGINT, b BIGINT)
RETURNS INTEGER AS $$
BEGIN
    RETURN bit_count((a # b)::bit(64));
END;
$$ LANGUAGE plpgsql IMMUTABLE;

-- Найти потенциальные дубли для конкретного симхэша
SELECT channel, text, date
FROM messages
WHERE hamming_distance(simhash, $1) <= 5
  AND message_id != $2
LIMIT 20;

Realtime parsing: listening to new messages

In addition to retrospective parsing through iter_messages, you can subscribe to new posts through events:

python
from telethon import events

WATCH_CHANNELS = ['https://t.me/channel1', 'https://t.me/channel2']

parser = TelegramParser(channels=WATCH_CHANNELS)

@client.on(events.NewMessage(chats=WATCH_CHANNELS))
async def handle_new_message(event):
    channel = event.chat.username or str(event.chat_id)
    parsed = parser.filter_message(event.message, channel)
    if parsed:
        await save_messages([parsed])
        logger.info(f'Новое сообщение из {channel}: {parsed.text[:80]}...')

async def main():
    await client.start()
    logger.info('Слушаем новые сообщения...')
    await client.run_until_disconnected()

with client:
    client.loop.run_until_complete(main())

Careful with slowmode_enabled – if the slow mode is enabled in the group, some NewMessage events may not come. The solution is to periodically call client.catch_up():

python
async def periodic_catchup():
    while True:
        await client.catch_up()
        await asyncio.sleep(30)

Wybcoding: what to charge Claude Code

A full parser is a good candidate for wibcoding. A few things that Claude Code and Codex do well are:

Extension of filtering dictionaries:

plaintext
I have a list of jobs from Telegram channels.
Analyze 50 examples and regex patterns.
which cover all cases, but do not give false positives
Ordinary technical content. The result is a Python string list for re.compile.

** Classification of topics through Claude API:**

python
# Для сложных случаев — классификация через API вместо словарей
import anthropic

claude = anthropic.Anthropic()

def classify_message(text: str) -> dict:
    response = claude.messages.create(
        model="claude-sonnet-4-20250514",
        max_tokens=100,
        messages=[{
            "role": "user",
            "content": f"""Классифицируй это сообщение из Telegram-канала.
Ответь ТОЛЬКО JSON: {{"is_ad": bool, "is_vacancy": bool, "category": str, "confidence": float}}
Категории: tech_news, tutorial, opinion, tool_review, other
Сообщение: {text[:500]}"""
        }]
    )
    import json
    return json.loads(response.content[0].text)

This is expensive for large volume – use for borderline cases where dictionaries give uncertain results.


Common Mistakes and How to Avoid Them

Ошибка Последствие Решение
Парсить основным аккаунтом Бан, потеря доступа Отдельный аккаунт для парсинга
Нет задержки между запросами FloodWaitError, временный бан asyncio.sleep(0.05–0.5) между сообщениями
Хранить симхэши только в памяти При перезапуске дубли снова проходят Сохранять симхэши в БД, загружать при старте
Сравнивать строки через == для дедупликации Пропускать очевидные вариации Нормализовать + SimHash
Парсить закрытые каналы без вступления ChannelPrivateError Аккаунт должен быть участником
Игнорировать message.action Сервисные сообщения засоряют базу Всегда проверять is not None
Фиксированный лимит дат Пропускать новые сообщения при повторном запуске Хранить last_parsed_id и использовать min_id

Incremental parsing: Do not collect one twice

When re-starting, parsing from scratch is wasteful. The correct approach is to memorize the last processed message_id and start with it the next time you start:

python
async def get_last_message_id(db_path: str, channel: str) -> int:
    async with aiosqlite.connect(db_path) as db:
        cursor = await db.execute(
            'SELECT MAX(message_id) FROM messages WHERE channel = ?',
            (channel,)
        )
        row = await cursor.fetchone()
        return row[0] or 0

async def parse_channel_incremental(channel: str, db_path: str):
    min_id = await get_last_message_id(db_path, channel)
    results = []
    async for message in client.iter_messages(channel, min_id=min_id):
        parsed = parser.filter_message(message, channel)
        if parsed:
            results.append(parsed)
        await asyncio.sleep(0.05)
    if results:
        await save_messages(results, db_path)
    logger.info(f'{channel}: +{len(results)} новых сообщений (с ID > {min_id})')

Checklist finished parser

plaintext
Adjustment:
Separate Telegram account (not the main one)
api id and api hash in .env, not in code
● Session file outside the repository (in .gitignore)

Filters:
Structural: deviation of empty, service, too short
Forwards: message.fwd from is not None
Advertising: Patterns + Button Check on UTM
Jobs: regex for HR keywords
Prohibited topics: gambling, crypto scam, adult, loans
Deduplication: SimHash with a 5-bit threshold

Floud control:
Asyncio.sleep between messages (0.05–0.5 seconds)
Asyncio.sleep between channels (2-5 seconds)
FloodWaitError processing with e.seconds waiting

Storage:
UNIQUE (channel, message id) - no re-saving
Simhash is stored in DB – deduplication between sessions
● Incremental parsing through min id

Monitoring:
● Logistics of the reasons for the rejection of each filter
● Final statistics: total / passed / by category n

Outcome

A good Telegram parser is not iter_messages + сохранить в базу. It is a multi-layer filtration system where each layer removes its own type of garbage with minimal resources.

The order of layers is important: first, cheap structural checks (no text, forward, service), then patterns (advertising, vacancies, prohibited), then expensive deduplication through SimHash. This minimizes the load – most of the garbage goes away at the first checks.

Start small: take two or three channels, run a parser, look at the statistics of filtered messages. Configure thresholds for your data – there is no universal config, it all depends on the subject of channels and your requirements for data purity.


FAQ

**Is it legal to parse Telegram channels? ** Public channels are technically available to everyone. Telegram limits the mass collection of users from groups and sending them messages – this is clearly against the rules. Pursing content from public channels for analysis or aggregation is in the grey area: Telegram does not encourage this, but there is no direct ban on reading open content. Personal data of users is another story, FZ-152 is used there.

**Is it possible to parse through a bot, without a user account? ** The bot can only read messages that are sent to the chat room where it is a member. iter_messages for other public channels via the Bot API is not available. MTProto via Telethon/Pyrogram with a user account is the only option.

**How do you cut through closed channels? ** The account must be a member. For private channels by invite link – await client.join_channel(invite_link).

Why SimHash and not MD5 or SHA256? ** MD5 and SHA256 change whenever text changes. "Hello world" and "Hello world!" are different hashes. SimHash gives similar meanings for similar texts, which is what you need to deduplication content with small variations.

**How do you scale to hundreds of channels? ** Multiple user accounts with channel distribution between them. Celery or asyncio. Queue for the task queue. PostgreSQL with table partitioning by date. Redis to store SimHash index in memory with quick search.

$ cd ../ ← back to Telegram bots