How to make a parser of Telegram channels without garbage: filtering spam, doubles, vacancies and advertising
Main chat
A chat for vibe coders: news, guides, live cases, marketplace, and finding executors.
*Relevant for Telethon 1.43 and Pyrogram 2.x, June 2026 *
Writing a parser of Telegram channels is not difficult. To write a parser that does not clutter the base with advertising, the same forwards and “requires a sales manager” – already requires architectural solutions.
Out of the box, iter_messages gives everything in a row: advertising integrations, forwards of one post from ten channels at once, technical service messages, vacancies. Without a layer of filtering, you’re not collecting content, but noise.
In this article - a complete analysis: how the parser works, what filters are needed, how to implement them without unnecessary dependencies. With code, architectural solutions and an explanation of why.
What is “junk” in the context of canal parsing
Before filtering, let us determine what we are removing:
| Тип мусора | Признак | Почему проблема |
|---|---|---|
| Рекламные интеграции | Нативный промопост, пометка sponsored |
Не контент канала, а чужой материал |
| Форварды | message.fwd_from is not None |
Один пост тиражируется в десятках каналов |
| Дубли | Похожий текст с отличием в эмодзи/пунктуации | Засоряют выборку, ломают анализ |
| Вакансии | Ключевые слова: «требуется», «зарплата», «оформление по ТК» | Не тематический контент, фоновый шум |
| Сервисные сообщения | message.action is not None |
«Пользователь вступил», «изменилось фото» — технические события |
| Короткие бессодержательные | Длина < 50 символов | «🔥🔥🔥», «Подписывайся!» |
| Запрещённые тематики | Казино, крипто-хайп, микрозаймы | Юридические и репутационные риски |
Stack and setup
Telethon vs Pyrogram: What to Choose
Both run on top of MTProto, Telegram’s own protocol. The fundamental difference:
Telethon is an asynchronous library in Python, more mature, better documentation, used more often for parsing. Current stable version 1.43.
Pyrogram is an alternative with a slightly more user-friendly API for beginners. Version 2.x.
For parsing channels with heavy filtration, the difference is insignificant. The article shows examples on Telethon, as it is more common in this scenario.
Obtaining API keys
The parser works through MTProto as a user account, not a bot. iter_messages is not available for other channels.
- Go to my.telegram.org
- Log in on the phone number
- “API Development Tools” Create an Application
- We get
api_id(number) andapi_hash(line)
Use a separate account for parsing, not the main one. With aggressive parsing, the account may be restricted.
Installation
pip install telethon asyncio aiofiles python-dotenv
# Для хранения в SQLite
pip install aiosqlite
# Для симхэша (дедупликация)
pip install simhash
# .env
API_ID=12345678
API_HASH=your_api_hash_here
SESSION_NAME=parser_session
Basic parser without filters
First, the working skeleton, then add the filters in layers.
import asyncio
import os
from telethon import TelegramClient
from dotenv import load_dotenv
load_dotenv()
client = TelegramClient(
os.getenv('SESSION_NAME'),
int(os.getenv('API_ID')),
os.getenv('API_HASH')
)
CHANNELS = [
'https://t.me/example_channel_1',
'https://t.me/example_channel_2',
]
async def parse_channel(channel_url: str, limit: int = 500):
messages = []
async for message in client.iter_messages(channel_url, limit=limit):
messages.append(message)
return messages
async def main():
await client.start()
for channel in CHANNELS:
msgs = await parse_channel(channel)
print(f'{channel}: получено {len(msgs)} сообщений')
with client:
client.loop.run_until_complete(main())
It works, but it will bring it back. Let's move on to filtering.
Layer 1: filtering by message structure
The first and cheapest layer is to check the metadata of the Message object without text analysis.
def is_structural_garbage(message) -> tuple[bool, str]:
"""
Возвращает (True, причина) если сообщение — структурный мусор.
Проверки от дешёвых к дорогим.
"""
# Пустое сообщение (только медиа без подписи, или сервисное)
if not message.text and not message.caption:
return True, 'no_text'
# Сервисное сообщение: вступление, смена фото, пин и т.д.
if message.action is not None:
return True, 'service_action'
# Форвард — чужой контент
if message.fwd_from is not None:
return True, 'forward'
text = message.text or message.caption or ''
# Слишком короткое — скорее всего анонс или эмодзи-реакция
if len(text.strip()) < 50:
return True, 'too_short'
# Спонсированное сообщение (реклама через Telegram Ads)
# Помечается флагом sponsored в некоторых версиях API
if getattr(message, 'sponsored', False):
return True, 'sponsored'
return False, ''
This filter is synchronous and fast - apply first, before any other checks.
Layer 2: vacancy filtering
Jobs are the most popular unwanted type of content in thematic IT channels. A simple vocabulary filter covers 90% of cases.
import
VACANCY PATTERNS = [
# Direct job markers
R'required?
R'Are you open?\S+Vacancy?
r'vacancy\s*:',
r'we are looking for \s+ (developer |Manager |Analytics)',
r'invite \s+(in\s+team |developer |specialist)',
Working conditions
r'formation\s+by\s+tc',
R'Official \s + Employment',
r'complete\s+(work\s+)?day',
R'remote? \s+work',
r'hybrid\s+format',
#Salary
r's salary\s+from',
r'grade\s*(junior|middle|senior)',
r'\d+\s*(thousand|k)\s*/?\s*me',
r'Income\s+ from\s+\d+',
#HR markers
r'summary\s+on',
r'hr@',
r'response\s+in\s+ls',
r'write\s+in\s+l',
r'stack\s*:',
r'requirements\s*:',
r'obligations\s*:',
r'conditions\s*:',
]
vacancy re = re.compile()
'|'.join (VACANCY PATTERNS),
re.IGNORECASE | re.UNICODE
)
def is vacancy(text: str) -> bool:
return bool( vacancy re.search(text))
The dictionary approach works well for unambiguous cases, but gives false positives on "looking for a solution to the problem" or "mindfulness is required." Balance the threshold for your audience: for highly specialized channels, accuracy is high, for general channels, fine-tuning may be required.
Layer 3: Filtering Prohibited Subjects
Prohibited topics include casinos, illegal financial schemes, pseudo-investments, adult content.
FORBIDDEN_PATTERNS = {
'gambling': [
r'казино', r'ставки\s+на\s+спорт', r'онлайн.?казино',
r'слоты', r'рулетка', r'1xbet', r'1win', r'melbet',
r'бонус\s+за\s+депозит',
],
'crypto_scam': [
r'х\d+\s+за\s+\d+\s+дней',
r'пассивный\s+доход\s+от',
r'вложи\s+и\s+получи',
r'майнинг.{0,20}доход',
r'крипт[оа].{0,20}заработ',
r'pump.{0,10}группа',
r'инсайд.{0,10}сигнал',
],
'loans': [
r'займ\s+без\s+отказа',
r'микрозайм',
r'мфо',
r'кредит\s+без\s+справок',
r'деньги\s+в\s+долг\s+срочно',
],
'adult': [
r'18\+.{0,20}(фото|видео|контент)',
r'только\s+для\s+взрослых',
],
}
_forbidden_res = {
category: re.compile('|'.join(patterns), re.IGNORECASE | re.UNICODE)
for category, patterns in FORBIDDEN_PATTERNS.items()
}
def get_forbidden_category(text: str) -> str | None:
for category, pattern in _forbidden_res.items():
if pattern.search(text):
return category
return None
This dictionary is not universal - it is a starting point. Each project should be supplemented with real data.
Layer 4: Deduplication through Simhash
The most technically interesting layer. Objective: Determine that two texts are actually the same, even if they differ in emoji, punctuation, or small edits.
Why not MD5? The hash will change if you change at least one character. “Hello World” and “Hello World” are different MD5s, but the same content.
SimHash is an algorithm for fuzzy search of duplicates. Similar texts give similar hashes (the difference in several bits). Used by Google to deduplication web pages.
from simhash import Simhash
def text_to_simhash(text: str) -> int:
"""Нормализуем текст и считаем симхэш."""
# Убираем всё кроме слов — эмодзи, пунктуация, лишние пробелы
normalized = re.sub(r'[^\w\s]', ' ', text.lower())
normalized = re.sub(r'\s+', ' ', normalized).strip()
# Разбиваем на токены (слова и биграммы для лучшей точности)
words = normalized.split()
tokens = words + [f'{a} {b}' for a, b in zip(words, words[1:])]
return Simhash(tokens).value
def simhash_distance(h1: int, h2: int) -> int:
"""Расстояние Хэмминга между двумя симхэшами."""
xor = h1 ^ h2
return bin(xor).count('1')
class DuplicateDetector:
def __init__(self, threshold: int = 5):
# threshold: максимальное расстояние Хэмминга для «дубля»
# 0 = точное совпадение, 5 = тексты ~90% похожи, 10 = ~70%
self.threshold = threshold
self.seen_hashes: list[tuple[int, int]] = [] # (message_id, simhash)
def is_duplicate(self, message_id: int, text: str) -> bool:
h = text_to_simhash(text)
for seen_id, seen_hash in self.seen_hashes:
if seen_id == message_id:
continue
if simhash_distance(h, seen_hash) <= self.threshold:
return True
self.seen_hashes.append((message_id, h))
return False
With a large base, linear search on seen_hashes is slow. For thousands of records, use SimHash LSH (Locality-Sensitive Hashing) or store hashes in PostgreSQL with a bit distance search.
Layer 5: Filtering Advertising Integrations
Native adverts are more difficult to detect – they are written as a regular post. Several signs that work in combination:
AD PATTERNS = []
# Calls and UTM patterns
R'click \s+ on \s+link',
r'click here',
r'utm source',
r'utm medium',
r'utm campaign',
#Partner markers
r'advertising',
r'advertising\s+post',
r'on\s+rights\s+advertising',
r'partner's/s+material',
r'promo',
#Referral patterns
r'promocode\s+[A-Z0-9]{3,}',
r'discount\s+\d+%\s+ by\s+promo code',
# Characteristic designs
r'Subscribe \s+ to \s+channel',
r's+ to s+@\w+',
r' go \s+ to \s+@\w+',
]
ad re = re.compile('|'.join(AD PATTERNS), re.IGNORECASE | re.UNICODE)
def looks like ad(message) -> bool:
text = message.text or message.caption or '
# Direct markers
if ad re.search(text):
True, true return.
# buttons with UTM or referral links
if message.reply markup:
for row in (message.reply markup.rows or []):
for button in (row.buttons or []):
url = getattr(button, 'url', '' or '')
if 'utm ' in url or any()
p in url for p in ['ref=', 'promo=', 'aff=']
:
True, true return.
return
Assembly: Full filtration pipeline
Combine all layers into a single pipeline with logging the causes of deviation.
import asyncio
import logging
from dataclasses import dataclass
from datetime import datetime
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger('tg_parser')
@dataclass
class ParsedMessage:
message_id: int
channel: str
text: str
date: datetime
views: int | None
simhash: int
class TelegramParser:
def __init__(self, channels: list[str], limit_per_channel: int = 1000):
self.channels = channels
self.limit = limit_per_channel
self.dedup = DuplicateDetector(threshold=5)
self.stats = {
'total': 0,
'passed': 0,
'filtered': {},
}
def _record_filtered(self, reason: str):
self.stats['filtered'][reason] = self.stats['filtered'].get(reason, 0) + 1
def filter_message(self, message, channel: str) -> ParsedMessage | None:
self.stats['total'] += 1
text = message.text or message.caption or ''
# Слой 1: структурный мусор
is_garbage, reason = is_structural_garbage(message)
if is_garbage:
self._record_filtered(reason)
return None
# Слой 2: реклама
if looks_like_ad(message):
self._record_filtered('ad')
return None
# Слой 3: вакансии
if is_vacancy(text):
self._record_filtered('vacancy')
return None
# Слой 4: запрещённые тематики
forbidden = get_forbidden_category(text)
if forbidden:
self._record_filtered(f'forbidden_{forbidden}')
return None
# Слой 5: дубли
if self.dedup.is_duplicate(message.id, text):
self._record_filtered('duplicate')
return None
self.stats['passed'] += 1
return ParsedMessage(
message_id=message.id,
channel=channel,
text=text,
date=message.date,
views=getattr(message, 'views', None),
simhash=text_to_simhash(text),
)
async def parse_channel(self, channel: str) -> list[ParsedMessage]:
results = []
logger.info(f'Парсим {channel}...')
try:
async for message in client.iter_messages(channel, limit=self.limit):
parsed = self.filter_message(message, channel)
if parsed:
results.append(parsed)
# Небольшая пауза между сообщениями — флоуд-контроль
await asyncio.sleep(0.05)
except Exception as e:
logger.error(f'Ошибка парсинга {channel}: {e}')
return results
async def run(self) -> list[ParsedMessage]:
all_messages = []
for channel in self.channels:
msgs = await self.parse_channel(channel)
all_messages.extend(msgs)
# Пауза между каналами
await asyncio.sleep(2)
logger.info(f'Итого: {self.stats["total"]} сообщений, прошло фильтры: {self.stats["passed"]}')
logger.info(f'Отфильтровано: {self.stats["filtered"]}')
return all_messages
Floud Control and Telegram Limits
Telegram limits the frequency of requests. When aggressive parsing, the account receives FloodWaitError - a temporary blocking.
from telethon.errors import FloodWaitError
async def safe_iter_messages(channel, limit=1000, pause=0.1):
"""iter_messages с автоматическим retry при FloodWait."""
retries = 0
async for message in client.iter_messages(channel, limit=limit):
try:
yield message
await asyncio.sleep(pause)
except FloodWaitError as e:
wait_seconds = e.seconds + 5
logger.warning(f'FloodWait: ждём {wait_seconds} секунд...')
await asyncio.sleep(wait_seconds)
retries += 1
if retries > 5:
logger.error('Слишком много FloodWait, прерываем канал')
break
Telethon can work in Takeout Session mode, where some limits are lower. Useful for mass data export:
async with client.takeout(messages=True) as takeout:
async for message in takeout.iter_messages(channel, wait_time=0.5):
# обрабатываем
pass
Storage: SQLite for local development
import aiosqlite
CREATE_TABLE = """
CREATE TABLE IF NOT EXISTS messages (
id INTEGER PRIMARY KEY,
message_id INTEGER NOT NULL,
channel TEXT NOT NULL,
text TEXT NOT NULL,
date TEXT NOT NULL,
views INTEGER,
simhash INTEGER NOT NULL,
created_at TEXT DEFAULT (datetime('now')),
UNIQUE(channel, message_id)
);
CREATE INDEX IF NOT EXISTS idx_simhash ON messages(simhash);
CREATE INDEX IF NOT EXISTS idx_channel_date ON messages(channel, date);
"""
async def save_messages(messages: list[ParsedMessage], db_path: str = 'parser.db'):
async with aiosqlite.connect(db_path) as db:
await db.executescript(CREATE_TABLE)
await db.executemany(
"""
INSERT OR IGNORE INTO messages
(message_id, channel, text, date, views, simhash)
VALUES (?, ?, ?, ?, ?, ?)
""",
[
(m.message_id, m.channel, m.text,
m.date.isoformat(), m.views, m.simhash)
for m in messages
]
)
await db.commit()
For production - PostgreSQL indexed by simhash. Finding duplicates in the database becomes a quick SQL query instead of a loop in memory:
-- Найти все сообщения с похожим симхэшем (расстояние ≤ 5 бит)
-- В PostgreSQL нет битового расстояния из коробки, используем функцию
CREATE OR REPLACE FUNCTION hamming_distance(a BIGINT, b BIGINT)
RETURNS INTEGER AS $$
BEGIN
RETURN bit_count((a # b)::bit(64));
END;
$$ LANGUAGE plpgsql IMMUTABLE;
-- Найти потенциальные дубли для конкретного симхэша
SELECT channel, text, date
FROM messages
WHERE hamming_distance(simhash, $1) <= 5
AND message_id != $2
LIMIT 20;
Realtime parsing: listening to new messages
In addition to retrospective parsing through iter_messages, you can subscribe to new posts through events:
from telethon import events
WATCH_CHANNELS = ['https://t.me/channel1', 'https://t.me/channel2']
parser = TelegramParser(channels=WATCH_CHANNELS)
@client.on(events.NewMessage(chats=WATCH_CHANNELS))
async def handle_new_message(event):
channel = event.chat.username or str(event.chat_id)
parsed = parser.filter_message(event.message, channel)
if parsed:
await save_messages([parsed])
logger.info(f'Новое сообщение из {channel}: {parsed.text[:80]}...')
async def main():
await client.start()
logger.info('Слушаем новые сообщения...')
await client.run_until_disconnected()
with client:
client.loop.run_until_complete(main())
Careful with slowmode_enabled – if the slow mode is enabled in the group, some NewMessage events may not come. The solution is to periodically call client.catch_up():
async def periodic_catchup():
while True:
await client.catch_up()
await asyncio.sleep(30)
Wybcoding: what to charge Claude Code
A full parser is a good candidate for wibcoding. A few things that Claude Code and Codex do well are:
Extension of filtering dictionaries:
I have a list of jobs from Telegram channels.
Analyze 50 examples and regex patterns.
which cover all cases, but do not give false positives
Ordinary technical content. The result is a Python string list for re.compile.
** Classification of topics through Claude API:**
# Для сложных случаев — классификация через API вместо словарей
import anthropic
claude = anthropic.Anthropic()
def classify_message(text: str) -> dict:
response = claude.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=100,
messages=[{
"role": "user",
"content": f"""Классифицируй это сообщение из Telegram-канала.
Ответь ТОЛЬКО JSON: {{"is_ad": bool, "is_vacancy": bool, "category": str, "confidence": float}}
Категории: tech_news, tutorial, opinion, tool_review, other
Сообщение: {text[:500]}"""
}]
)
import json
return json.loads(response.content[0].text)
This is expensive for large volume – use for borderline cases where dictionaries give uncertain results.
Common Mistakes and How to Avoid Them
| Ошибка | Последствие | Решение |
|---|---|---|
| Парсить основным аккаунтом | Бан, потеря доступа | Отдельный аккаунт для парсинга |
| Нет задержки между запросами | FloodWaitError, временный бан | asyncio.sleep(0.05–0.5) между сообщениями |
| Хранить симхэши только в памяти | При перезапуске дубли снова проходят | Сохранять симхэши в БД, загружать при старте |
Сравнивать строки через == для дедупликации |
Пропускать очевидные вариации | Нормализовать + SimHash |
| Парсить закрытые каналы без вступления | ChannelPrivateError |
Аккаунт должен быть участником |
Игнорировать message.action |
Сервисные сообщения засоряют базу | Всегда проверять is not None |
| Фиксированный лимит дат | Пропускать новые сообщения при повторном запуске | Хранить last_parsed_id и использовать min_id |
Incremental parsing: Do not collect one twice
When re-starting, parsing from scratch is wasteful. The correct approach is to memorize the last processed message_id and start with it the next time you start:
async def get_last_message_id(db_path: str, channel: str) -> int:
async with aiosqlite.connect(db_path) as db:
cursor = await db.execute(
'SELECT MAX(message_id) FROM messages WHERE channel = ?',
(channel,)
)
row = await cursor.fetchone()
return row[0] or 0
async def parse_channel_incremental(channel: str, db_path: str):
min_id = await get_last_message_id(db_path, channel)
results = []
async for message in client.iter_messages(channel, min_id=min_id):
parsed = parser.filter_message(message, channel)
if parsed:
results.append(parsed)
await asyncio.sleep(0.05)
if results:
await save_messages(results, db_path)
logger.info(f'{channel}: +{len(results)} новых сообщений (с ID > {min_id})')
Checklist finished parser
Adjustment:
Separate Telegram account (not the main one)
api id and api hash in .env, not in code
● Session file outside the repository (in .gitignore)
Filters:
Structural: deviation of empty, service, too short
Forwards: message.fwd from is not None
Advertising: Patterns + Button Check on UTM
Jobs: regex for HR keywords
Prohibited topics: gambling, crypto scam, adult, loans
Deduplication: SimHash with a 5-bit threshold
Floud control:
Asyncio.sleep between messages (0.05–0.5 seconds)
Asyncio.sleep between channels (2-5 seconds)
FloodWaitError processing with e.seconds waiting
Storage:
UNIQUE (channel, message id) - no re-saving
Simhash is stored in DB – deduplication between sessions
● Incremental parsing through min id
Monitoring:
● Logistics of the reasons for the rejection of each filter
● Final statistics: total / passed / by category n
Outcome
A good Telegram parser is not iter_messages + сохранить в базу. It is a multi-layer filtration system where each layer removes its own type of garbage with minimal resources.
The order of layers is important: first, cheap structural checks (no text, forward, service), then patterns (advertising, vacancies, prohibited), then expensive deduplication through SimHash. This minimizes the load – most of the garbage goes away at the first checks.
Start small: take two or three channels, run a parser, look at the statistics of filtered messages. Configure thresholds for your data – there is no universal config, it all depends on the subject of channels and your requirements for data purity.
FAQ
**Is it legal to parse Telegram channels? ** Public channels are technically available to everyone. Telegram limits the mass collection of users from groups and sending them messages – this is clearly against the rules. Pursing content from public channels for analysis or aggregation is in the grey area: Telegram does not encourage this, but there is no direct ban on reading open content. Personal data of users is another story, FZ-152 is used there.
**Is it possible to parse through a bot, without a user account? **
The bot can only read messages that are sent to the chat room where it is a member. iter_messages for other public channels via the Bot API is not available. MTProto via Telethon/Pyrogram with a user account is the only option.
**How do you cut through closed channels? **
The account must be a member. For private channels by invite link – await client.join_channel(invite_link).
Why SimHash and not MD5 or SHA256? ** MD5 and SHA256 change whenever text changes. "Hello world" and "Hello world!" are different hashes. SimHash gives similar meanings for similar texts, which is what you need to deduplication content with small variations.
**How do you scale to hundreds of channels? ** Multiple user accounts with channel distribution between them. Celery or asyncio. Queue for the task queue. PostgreSQL with table partitioning by date. Redis to store SimHash index in memory with quick search.