Как запустить? - думаю, что у пользователей данного ресурса с этим не должно возникнуть трудностей.
Собирает базы долго, но успешно. Логику можно доработать. Есть с чем работать. Сделал менее чем за пару часов суммарно с DeepSeek)
Python:
import os
import sqlite3
import argparse
import logging
import re
from datetime import datetime
from tqdm import tqdm
from telethon import TelegramClient, functions, types
from telethon.errors import ChannelPrivateError, ChatAdminRequiredError
from telethon.tl.functions.channels import GetFullChannelRequest
from telethon.tl.functions.messages import GetFullChatRequest
from telethon.tl.functions.stories import GetPeerMaxIDsRequest
class StatisticsCollector:
def __init__(self):
self.start_time = datetime.now()
self.chats_processed = 0
self.chats_skipped = 0
self.users_collected = 0
self.users_with_stories = 0
self.users_without_stories = 0
self.users_moved_to_with_stories = 0
self.users_moved_to_without_stories = 0
self.messages_processed = 0
self.reactions_collected = 0
self.comments_collected = 0
self.batches_processed = 0
self.errors_occurred = 0
self.channel_stats = {}
self.chat_stats = {}
def record_chat_processed(self, title, type):
self.chats_processed += 1
if type == 'Channel':
self.channel_stats[title] = self.channel_stats.get(title, 0) + 1
else:
self.chat_stats[title] = self.chat_stats.get(title, 0) + 1
def record_chat_skipped(self):
self.chats_skipped += 1
def record_user_collected(self):
self.users_collected += 1
def record_message_processed(self):
self.messages_processed += 1
def record_reaction_collected(self):
self.reactions_collected += 1
def record_comment_collected(self):
self.comments_collected += 1
def record_batch_processed(self):
self.batches_processed += 1
def record_error(self):
self.errors_occurred += 1
def record_user_with_stories(self):
self.users_with_stories += 1
def record_user_without_stories(self):
self.users_without_stories += 1
def record_user_moved_to_with_stories(self):
self.users_moved_to_with_stories += 1
def record_user_moved_to_without_stories(self):
self.users_moved_to_without_stories += 1
def get_execution_time(self):
return datetime.now() - self.start_time
def generate_report(self):
report = "\n" + "=" * 50 + "\n"
report += " TELEGRAM USER SCRAPER STATISTICS\n"
report += "=" * 50 + "\n"
report += f"\nExecution Time: {self.get_execution_time()}\n"
report += "\n=== CHAT PROCESSING ===\n"
report += f"Total Chats Processed: {self.chats_processed}\n"
report += f"Chats Skipped: {self.chats_skipped}\n"
if self.channel_stats:
report += f"\nChannels Processed ({len(self.channel_stats)}):\n"
for channel, count in self.channel_stats.items():
report += f" - {channel}: {count} operations\n"
if self.chat_stats:
report += f"\nChats Processed ({len(self.chat_stats)}):\n"
for chat, count in self.chat_stats.items():
report += f" - {chat}: {count} operations\n"
report += "\n=== USER COLLECTION ===\n"
report += f"Total Users Collected: {self.users_collected}\n"
report += f"Users with Stories: {self.users_with_stories}\n"
report += f"Users without Stories: {self.users_without_stories}\n"
if self.users_moved_to_with_stories:
report += f"Users Moved to 'with stories': {self.users_moved_to_with_stories}\n"
if self.users_moved_to_without_stories:
report += f"Users Moved to 'without stories': {self.users_moved_to_without_stories}\n"
report += "\n=== CONTENT PROCESSING ===\n"
report += f"Messages Processed: {self.messages_processed}\n"
report += f"Reactions Collected: {self.reactions_collected}\n"
report += f"Comments Collected: {self.comments_collected}\n"
report += f"Batches Processed: {self.batches_processed}\n"
report += "\n=== ERRORS ===\n"
report += f"Total Errors Occurred: {self.errors_occurred}\n"
report += "\n" + "=" * 50 + "\n"
return report
class TelegramUserScraper:
def __init__(self, api_id, api_hash, session_name='session', debug=False):
self.api_id = api_id
self.api_hash = api_hash
self.debug = debug
self.client = TelegramClient(session_name, api_id, api_hash)
self.temp_db = sqlite3.connect(':memory:')
self.stats = StatisticsCollector()
self._init_temp_db()
# Настройка логирования
logging.basicConfig(
level=logging.DEBUG if debug else logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
self.logger = logging.getLogger('TelegramScraper')
def _init_temp_db(self):
cursor = self.temp_db.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS temp_users
(
user_id
INTEGER,
group_id
INTEGER,
username
TEXT,
first_name
TEXT,
last_name
TEXT,
access_hash
INTEGER,
name
TEXT,
group_title
TEXT,
PRIMARY
KEY
(
user_id,
group_id
))
''')
self.temp_db.commit()
async def connect(self):
await self.client.start()
self.logger.info("Client created successfully")
async def get_chats(self):
self.logger.debug("Fetching dialogs...")
dialogs = await self.client.get_dialogs()
valid_chats = []
for dialog in dialogs:
entity = dialog.entity
if isinstance(entity, (types.Channel, types.Chat)):
chat_info = {
'id': entity.id,
'title': entity.title,
'type': 'Channel' if isinstance(entity, types.Channel) else 'Chat'
}
valid_chats.append(chat_info)
self.logger.debug(f"Found chat: {entity.title} ({entity.id}, type: {type(entity).__name__}")
self.logger.info(f"Found {len(valid_chats)} valid chats/channels")
return valid_chats
async def _get_chat_info(self, chat_id, is_channel):
self.logger.debug(f"Getting info for chat {chat_id} (is_channel: {is_channel})")
if is_channel:
full_info = await self.client(GetFullChannelRequest(chat_id))
self.logger.debug(f"Channel info: participants_hidden={full_info.full_chat.participants_hidden}, "
f"can_view_participants={full_info.full_chat.can_view_participants}")
return {
'participants_hidden': full_info.full_chat.participants_hidden,
'can_view_participants': full_info.full_chat.can_view_participants
}
else:
full_info = await self.client(GetFullChatRequest(chat_id))
self.logger.debug(f"Chat info: participants_hidden={full_info.full_chat.participants_hidden}")
return {
'participants_hidden': full_info.full_chat.participants_hidden
}
async def scrape_users(self, chat_ids):
self.logger.info(f"Starting scraping for {len(chat_ids)} chats")
for chat_id in tqdm(chat_ids, desc="Processing chats"):
try:
self.logger.debug(f"Processing chat {chat_id}")
entity = await self.client.get_entity(chat_id)
is_channel = isinstance(entity, types.Channel)
chat_info = await self._get_chat_info(chat_id, is_channel)
if is_channel:
self.logger.debug("Processing as channel")
await self._process_channel(entity, chat_info)
else:
self.logger.debug("Processing as chat")
await self._process_chat(entity, chat_info)
# Запись статистики об успешной обработке чата
self.stats.record_chat_processed(entity.title, 'Channel' if is_channel else 'Chat')
except (ChannelPrivateError, ChatAdminRequiredError, ValueError) as e:
self.logger.error(f"Error processing chat {chat_id}: {str(e)}")
self.stats.record_error()
self.stats.record_chat_skipped()
continue
self.logger.info("Finished scraping chats")
# Сбор статистики по пользователям
cursor = self.temp_db.cursor()
cursor.execute("SELECT COUNT(*) FROM temp_users")
total_users = cursor.fetchone()[0]
self.logger.info(f"Total unique users collected: {total_users}")
async def _process_channel(self, channel, chat_info):
# Всегда собираем участников, если возможно
if not chat_info['participants_hidden'] and chat_info['can_view_participants']:
self.logger.info(f"Channel {channel.title} has visible participants, collecting directly")
try:
await self._collect_participants(channel)
except Exception as e:
self.logger.error(f"Error collecting participants: {str(e)}")
self.stats.record_error()
# Всегда собираем из сообщений, реакций и комментариев
self.logger.info(f"Processing messages for channel: {channel.title}")
await self._collect_from_messages(channel)
async def _process_chat(self, chat, chat_info):
# Всегда собираем участников, если возможно
if not chat_info['participants_hidden']:
self.logger.info(f"Chat {chat.title} has visible participants, collecting directly")
try:
await self._collect_participants(chat)
except Exception as e:
self.logger.error(f"Error collecting participants: {str(e)}")
self.stats.record_error()
# Всегда собираем из сообщений и реакций
self.logger.info(f"Processing messages for chat: {chat.title}")
await self._collect_from_messages(chat)
async def _collect_participants(self, entity):
try:
self.logger.debug(f"Collecting participants for {entity.title}")
participants = await self.client.get_participants(entity)
self.logger.info(f"Found {len(participants)} participants for {entity.title}")
for user in participants:
self._save_user_to_temp(user, entity)
self.stats.record_user_collected()
except Exception as e:
self.logger.error(f"Error collecting participants for {entity.title}: {str(e)}")
self.stats.record_error()
# Не переключаемся автоматически на сбор сообщений, так как он будет выполнен после
async def _collect_from_messages(self, entity):
try:
self.logger.debug(f"Collecting messages for {entity.title}")
message_count = 0
async for message in self.client.iter_messages(entity.id, limit=10000):
if message:
message_count += 1
self.stats.record_message_processed()
await self._process_message(message, entity)
# Логируем каждое 100-е сообщение в режиме отладки
if self.debug and message_count % 100 == 0:
self.logger.debug(f"Processed {message_count} messages for {entity.title}")
self.logger.info(f"Processed {message_count} messages for {entity.title}")
except Exception as e:
self.logger.error(f"Error collecting messages for {entity.title}: {str(e)}")
self.stats.record_error()
async def _process_message(self, message, entity):
try:
if message.sender and isinstance(message.sender, types.User):
self._save_user_to_temp(message.sender, entity)
self.stats.record_user_collected()
if message.reactions:
await self._collect_reactions(message, entity)
if isinstance(entity, types.Channel):
await self._collect_comments(message, entity)
except Exception as e:
self.logger.error(f"Error processing message {message.id}: {str(e)}")
self.stats.record_error()
async def _collect_reactions(self, message, entity):
try:
self.logger.debug(f"Collecting reactions for message {message.id}")
reaction_count = 0
offset = ""
limit = 100 # Максимальное количество за запрос
total_reactions = 0
while True:
# Получаем список реакций с пагинацией
result = await self.client(functions.messages.GetMessageReactionsListRequest(
peer=entity,
id=message.id,
limit=limit,
reaction=None, # Все реакции
offset=offset
))
# Обрабатываем полученные реакции
for reaction in result.reactions:
if isinstance(reaction.peer_id, types.PeerUser):
user_id = reaction.peer_id.user_id
try:
# Получаем полный объект пользователя
user = await self.client.get_entity(types.PeerUser(user_id))
if isinstance(user, types.User):
reaction_count += 1
self._save_user_to_temp(user, entity)
self.stats.record_user_collected()
self.stats.record_reaction_collected()
except Exception as e:
self.logger.error(f"Error getting user {user_id}: {str(e)}")
self.stats.record_error()
total_reactions += len(result.reactions)
# Проверяем, есть ли еще реакции
if not result.next_offset:
break
# Устанавливаем оффсет для следующей страницы
offset = result.next_offset
# Делаем небольшую паузу между запросами
await asyncio.sleep(1)
if reaction_count > 0:
self.logger.info(
f"Found {reaction_count} reactions for message {message.id} (total items: {total_reactions})")
except Exception as e:
self.logger.error(f"Error collecting reactions for message {message.id}: {str(e)}")
self.stats.record_error()
async def _collect_comments(self, message, entity):
try:
self.logger.debug(f"Collecting comments for message {message.id}")
comment_count = 0
async for comment in self.client.iter_messages(entity, reply_to=message.id):
if comment.sender and isinstance(comment.sender, types.User):
comment_count += 1
self._save_user_to_temp(comment.sender, entity)
self.stats.record_user_collected()
self.stats.record_comment_collected()
if comment_count > 0:
self.logger.debug(f"Found {comment_count} comments for message {message.id}")
except Exception as e:
self.logger.error(f"Error collecting comments for message {message.id}: {str(e)}")
self.stats.record_error()
def _save_user_to_temp(self, user, entity):
try:
cursor = self.temp_db.cursor()
name = f"{user.first_name or ''} {user.last_name or ''}".strip()
# Проверяем, существует ли пользователь
cursor.execute("SELECT 1 FROM temp_users WHERE user_id = ? AND group_id = ?",
(user.id, entity.id))
exists = cursor.fetchone()
if not exists:
cursor.execute('''
INSERT INTO temp_users
(user_id, group_id, username, first_name, last_name, access_hash, name, group_title)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
''', (
user.id,
entity.id,
user.username,
user.first_name,
user.last_name,
user.access_hash,
name,
entity.title
))
self.temp_db.commit()
if self.debug:
self.logger.debug(f"Saved new user: {user.id} in group {entity.title}")
else:
if self.debug:
self.logger.debug(f"User already exists: {user.id} in group {entity.title}")
except sqlite3.Error as e:
self.logger.error(f"Database error saving user {user.id}: {str(e)}")
self.stats.record_error()
async def filter_users_by_stories(self, with_stories_db, without_stories_db):
self.logger.info("Filtering users by stories presence")
cursor = self.temp_db.cursor()
cursor.execute("SELECT * FROM temp_users")
users = cursor.fetchall()
self.logger.info(f"Total users collected: {len(users)}")
# Создаем таблицы, даже если пользователей нет
self._ensure_table_exists(with_stories_db)
self._ensure_table_exists(without_stories_db)
# Собираем пользователей для батч-проверки
user_peers = []
valid_users = []
for user in users:
try:
if user[5]: # Проверяем наличие access_hash
peer = types.InputPeerUser(user_id=user[0], access_hash=user[5])
user_peers.append(peer)
valid_users.append(user)
except Exception as e:
self.logger.error(f"Error creating peer for user {user[0]}: {str(e)}")
self.stats.record_error()
continue
self.logger.info(f"Checking stories for {len(valid_users)} users with access_hash")
# Проверяем истории батчами
batch_size = 100
has_stories_map = {}
for i in range(0, len(user_peers), batch_size):
batch = user_peers[i:i + batch_size]
self.logger.debug(f"Checking batch {i // batch_size + 1}/{(len(user_peers) - 1) // batch_size + 1}")
self.stats.record_batch_processed()
try:
max_ids = await self.client(GetPeerMaxIDsRequest(id=batch))
for j, max_id in enumerate(max_ids):
user_id = batch[j].user_id
has_stories = max_id > 0
has_stories_map[user_id] = has_stories
if self.debug:
self.logger.debug(f"User {user_id} has stories: {has_stories} (max_id: {max_id})")
except Exception as e:
self.logger.error(f"Error checking stories batch: {str(e)}")
self.stats.record_error()
# Помечаем весь батч как без историй при ошибке
for peer in batch:
has_stories_map[peer.user_id] = False
# Сохраняем пользователей в соответствующие базы
with_stories_count = 0
without_stories_count = 0
for user in tqdm(valid_users, desc="Saving users"):
user_id = user[0]
has_stories = has_stories_map.get(user_id, False)
self._save_to_permanent_db(user, has_stories, with_stories_db, without_stories_db)
if has_stories:
with_stories_count += 1
self.stats.record_user_with_stories()
else:
without_stories_count += 1
self.stats.record_user_without_stories()
self.logger.info(f"Users with stories: {with_stories_count}")
self.logger.info(f"Users without stories: {without_stories_count}")
def _ensure_table_exists(self, db_path):
try:
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS users
(
user_id
INTEGER,
group_id
INTEGER,
username
TEXT,
first_name
TEXT,
last_name
TEXT,
access_hash
INTEGER,
name
TEXT,
group_title
TEXT,
PRIMARY
KEY
(
user_id,
group_id
))
''')
conn.commit()
conn.close()
self.logger.debug(f"Ensured table exists in {db_path}")
except Exception as e:
self.logger.error(f"Error ensuring table exists: {str(e)}")
self.stats.record_error()
def _save_to_permanent_db(self, user_data, has_stories, with_stories_db, without_stories_db):
try:
conn = sqlite3.connect(with_stories_db if has_stories else without_stories_db)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS users
(
user_id
INTEGER,
group_id
INTEGER,
username
TEXT,
first_name
TEXT,
last_name
TEXT,
access_hash
INTEGER,
name
TEXT,
group_title
TEXT,
PRIMARY
KEY
(
user_id,
group_id
))
''')
cursor.execute('''
INSERT
OR IGNORE INTO users
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
''', user_data)
conn.commit()
conn.close()
if self.debug:
status = "with stories" if has_stories else "without stories"
self.logger.debug(f"Saved user {user_data[0]} to {status} database")
except sqlite3.Error as e:
self.logger.error(f"Database error saving user {user_data[0]}: {str(e)}")
self.stats.record_error()
async def check_stories_appeared(self, without_stories_db, with_stories_db):
self.logger.info("Checking for new stories")
self._ensure_table_exists(without_stories_db)
self._ensure_table_exists(with_stories_db)
conn = sqlite3.connect(without_stories_db)
cursor = conn.cursor()
cursor.execute("SELECT * FROM users")
users = cursor.fetchall()
conn.close()
self.logger.info(f"Found {len(users)} users without stories")
# Собираем пользователей для батч-проверки
user_peers = []
valid_users = []
for user in users:
try:
if user[5]: # Проверяем наличие access_hash
peer = types.InputPeerUser(user_id=user[0], access_hash=user[5])
user_peers.append(peer)
valid_users.append(user)
except Exception as e:
self.logger.error(f"Error creating peer for user {user[0]}: {str(e)}")
self.stats.record_error()
continue
self.logger.info(f"Checking {len(valid_users)} users for new stories")
# Проверяем истории батчами
batch_size = 100
has_stories_map = {}
for i in range(0, len(user_peers), batch_size):
batch = user_peers[i:i + batch_size]
self.logger.debug(f"Checking batch {i // batch_size + 1}/{(len(user_peers) - 1) // batch_size + 1}")
self.stats.record_batch_processed()
try:
max_ids = await self.client(GetPeerMaxIDsRequest(id=batch))
for j, max_id in enumerate(max_ids):
user_id = batch[j].user_id
has_stories = max_id > 0
has_stories_map[user_id] = has_stories
if self.debug and has_stories:
self.logger.debug(f"User {user_id} now has stories (max_id: {max_id})")
except Exception as e:
self.logger.error(f"Error checking stories batch: {str(e)}")
self.stats.record_error()
# Помечаем весь батч как без историй при ошибке
for peer in batch:
has_stories_map[peer.user_id] = False
# Переносим пользователей с историями
moved_count = 0
for user in tqdm(valid_users, desc="Moving users with new stories"):
user_id = user[0]
if has_stories_map.get(user_id, False):
self._move_user(user, without_stories_db, with_stories_db)
moved_count += 1
self.stats.record_user_moved_to_with_stories()
if self.debug:
self.logger.debug(f"Moved user {user_id} to with_stories database")
self.logger.info(f"Moved {moved_count} users to with_stories database")
async def check_stories_disappeared(self, with_stories_db, without_stories_db):
self.logger.info("Checking for disappeared stories")
self._ensure_table_exists(with_stories_db)
self._ensure_table_exists(without_stories_db)
conn = sqlite3.connect(with_stories_db)
cursor = conn.cursor()
cursor.execute("SELECT * FROM users")
users = cursor.fetchall()
conn.close()
self.logger.info(f"Found {len(users)} users with stories")
# Собираем пользователей для батч-проверки
user_peers = []
valid_users = []
for user in users:
try:
if user[5]: # Проверяем наличие access_hash
peer = types.InputPeerUser(user_id=user[0], access_hash=user[5])
user_peers.append(peer)
valid_users.append(user)
except Exception as e:
self.logger.error(f"Error creating peer for user {user[0]}: {str(e)}")
self.stats.record_error()
continue
self.logger.info(f"Checking {len(valid_users)} users for disappeared stories")
# Проверяем истории батчами
batch_size = 100
has_stories_map = {}
for i in range(0, len(user_peers), batch_size):
batch = user_peers[i:i + batch_size]
self.logger.debug(f"Checking batch {i // batch_size + 1}/{(len(user_peers) - 1) // batch_size + 1}")
self.stats.record_batch_processed()
try:
max_ids = await self.client(GetPeerMaxIDsRequest(id=batch))
for j, max_id in enumerate(max_ids):
user_id = batch[j].user_id
has_stories = max_id > 0
has_stories_map[user_id] = has_stories
if self.debug and not has_stories:
self.logger.debug(f"User {user_id} no longer has stories (max_id: {max_id})")
except Exception as e:
self.logger.error(f"Error checking stories batch: {str(e)}")
self.stats.record_error()
# Помечаем весь батч как без историй при ошибке
for peer in batch:
has_stories_map[peer.user_id] = False
# Переносим пользователей без историй
moved_count = 0
for user in tqdm(valid_users, desc="Moving users without stories"):
user_id = user[0]
if not has_stories_map.get(user_id, True):
self._move_user(user, with_stories_db, without_stories_db)
moved_count += 1
self.stats.record_user_moved_to_without_stories()
if self.debug:
self.logger.debug(f"Moved user {user_id} to without_stories database")
self.logger.info(f"Moved {moved_count} users to without_stories database")
def _move_user(self, user_data, from_db, to_db):
try:
# Удаляем из исходной базы
conn_from = sqlite3.connect(from_db)
cursor_from = conn_from.cursor()
cursor_from.execute("DELETE FROM users WHERE user_id=? AND group_id=?",
(user_data[0], user_data[1]))
conn_from.commit()
conn_from.close()
# Добавляем в целевую базу
conn_to = sqlite3.connect(to_db)
cursor_to = conn_to.cursor()
cursor_to.execute('''
CREATE TABLE IF NOT EXISTS users
(
user_id
INTEGER,
group_id
INTEGER,
username
TEXT,
first_name
TEXT,
last_name
TEXT,
access_hash
INTEGER,
name
TEXT,
group_title
TEXT,
PRIMARY
KEY
(
user_id,
group_id
))
''')
cursor_to.execute('''
INSERT
OR IGNORE INTO users
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
''', user_data)
conn_to.commit()
conn_to.close()
if self.debug:
self.logger.debug(f"Moved user {user_data[0]} from {from_db} to {to_db}")
except Exception as e:
self.logger.error(f"Error moving user {user_data[0]}: {str(e)}")
self.stats.record_error()
def export_to_csv(self, db_path, csv_path):
try:
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# Гарантируем существование таблицы
cursor.execute('''
CREATE TABLE IF NOT EXISTS users
(
user_id
INTEGER,
group_id
INTEGER,
username
TEXT,
first_name
TEXT,
last_name
TEXT,
access_hash
INTEGER,
name
TEXT,
group_title
TEXT,
PRIMARY
KEY
(
user_id,
group_id
))
''')
cursor.execute("SELECT * FROM users")
users = cursor.fetchall()
with open(csv_path, 'w', encoding='utf-8') as f:
f.write("user_id,group_id,username,first_name,last_name,access_hash,name,group_title\n")
for user in users:
f.write(','.join(str(field) if field is not None else '' for field in user) + '\n')
self.logger.info(f"Exported {len(users)} users to {csv_path}")
conn.close()
return len(users)
except Exception as e:
self.logger.error(f"Error exporting to CSV: {str(e)}")
self.stats.record_error()
return 0
def parse_chat_selection(input_str, max_index):
"""Разбирает строку выбора чатов с поддержкой диапазонов и ключевых слов"""
# Удаляем пробелы и переводим в нижний регистр
input_str = input_str.replace(" ", "").lower()
# Обработка специальных ключевых слов
if input_str == "all":
return list(range(0, max_index + 1))
selected_indices = set()
parts = input_str.split(',')
for part in parts:
# Обработка диапазонов (например, 1-5)
if '-' in part:
try:
start, end = map(int, part.split('-'))
# Корректируем диапазон в соответствии с доступными индексами
start = max(0, min(start, max_index))
end = max(0, min(end, max_index))
if start > end:
start, end = end, start
selected_indices.update(range(start, end + 1))
except ValueError:
print(f"Неверный диапазон: {part}. Пропускаю.")
# Обработка отдельных чисел
else:
try:
index = int(part)
if 0 <= index <= max_index:
selected_indices.add(index)
else:
print(f"Индекс {index} вне диапазона (0-{max_index}). Пропускаю.")
except ValueError:
print(f"Неверный формат: {part}. Пропускаю.")
return sorted(selected_indices)
async def main():
parser = argparse.ArgumentParser(description='Telegram User Scraper')
parser.add_argument('--api-id', type=int, default=os.getenv('TELEGRAM_API_ID'))
parser.add_argument('--api-hash', type=str, default=os.getenv('TELEGRAM_API_HASH'))
parser.add_argument('--check-stories', action='store_true', help='Check for new stories')
parser.add_argument('--check-no-stories', action='store_true', help='Check for disappeared stories')
parser.add_argument('--export-csv', type=str, help='Export database to CSV')
parser.add_argument('--debug', action='store_true', help='Enable debug mode with detailed logging')
args = parser.parse_args()
if not args.api_id or not args.api_hash:
print("Please provide API credentials")
return
scraper = TelegramUserScraper(args.api_id, args.api_hash, debug=args.debug)
await scraper.connect()
with_stories_db = 'with_stories.db'
without_stories_db = 'without_stories.db'
export_count = 0
try:
if args.check_stories:
await scraper.check_stories_appeared(without_stories_db, with_stories_db)
elif args.check_no_stories:
await scraper.check_stories_disappeared(with_stories_db, without_stories_db)
elif args.export_csv:
export_count = scraper.export_to_csv(with_stories_db, args.export_csv)
else:
chats = await scraper.get_chats()
max_index = len(chats) - 1
if max_index < 0:
print("No chats found. Exiting.")
return
print("Available chats/channels:")
for idx, chat in enumerate(chats):
print(f"{idx}. {chat['title']} ({chat['type']})")
# Инструкция по выбору чатов
print("\nВыберите чаты/каналы для обработки:")
print(" - Введите номера через запятую (например, 0,2,5)")
print(" - Используйте диапазоны (например, 1-3,7-9)")
print(" - Используйте 'all' для выбора всех чатов")
print(f"Доступные номера: 0-{max_index}")
selection = input("Введите номера чатов: ")
# Парсим выбор пользователя
selected_indices = parse_chat_selection(selection, max_index)
if not selected_indices:
print("Не выбрано ни одного чата. Выход.")
return
print(f"Выбрано чатов: {len(selected_indices)}")
selected_chats = [chats[idx]['id'] for idx in selected_indices]
await scraper.scrape_users(selected_chats)
await scraper.filter_users_by_stories(with_stories_db, without_stories_db)
export = input("Export users with stories to CSV? (y/n): ")
if export.lower() == 'y':
csv_name = input("Enter CSV filename: ")
export_count = scraper.export_to_csv(with_stories_db, csv_name)
finally:
# Всегда выводим статистику в конце
print(scraper.stats.generate_report())
# Если экспортировали в CSV, добавляем информацию в отчет
if export_count:
print(f"Successfully exported {export_count} users to CSV")
if __name__ == '__main__':
import asyncio
asyncio.run(main())