Refactor code structure for improved readability and maintainability

This commit is contained in:
MaddoScientisto 2026-02-09 23:46:11 +01:00
commit e078cada3b
11 changed files with 1640 additions and 1184 deletions

6
modules/__init__.py Normal file
View file

@ -0,0 +1,6 @@
"""
Twitch Archive Modules
Refactored components for the Twitch Archive system.
"""
__version__ = '2.0.0'

158
modules/config.py Normal file
View file

@ -0,0 +1,158 @@
"""
Configuration management for Twitch Archive.
Handles loading global and per-streamer configuration files.
"""
import json
import pathlib
from typing import Dict, Any
from colorama import Fore, Style
from .constants import DEFAULT_CONFIG
class ConfigManager:
"""
Manages global and per-streamer configurations.
Loads global defaults from config/global.json and merges with per-streamer
configs from config/streamers/*.json.
"""
def __init__(self):
"""Initialize the configuration manager."""
self.config_dir = pathlib.Path(__file__).parent.parent / "config"
self.streamers_dir = self.config_dir / "streamers"
self.global_config = self._load_global_config()
def _load_global_config(self) -> Dict[str, Any]:
"""
Load global configuration from config/global.json.
Returns:
dict: Global configuration with defaults
"""
global_file = self.config_dir / "global.json"
# Start with DEFAULT_CONFIG as ultimate fallback
config = DEFAULT_CONFIG.copy()
# Try to load global config
if global_file.exists():
try:
with open(global_file, 'r', encoding='utf-8') as f:
user_config = json.load(f)
# Filter out comment fields and schema references
user_config = {k: v for k, v in user_config.items()
if not k.startswith('_') and k != '$schema'}
config.update(user_config)
print(f'{Fore.GREEN}✓ Global configuration loaded from config/global.json{Style.RESET_ALL}')
except json.JSONDecodeError as e:
print(f'{Fore.YELLOW}⚠ Warning: Invalid JSON in config/global.json: {e}{Style.RESET_ALL}')
print(f'{Fore.YELLOW} Using default configuration{Style.RESET_ALL}')
except Exception as e:
print(f'{Fore.YELLOW}⚠ Warning: Could not load config/global.json: {e}{Style.RESET_ALL}')
else:
print(f'{Fore.YELLOW}⚠ Warning: config/global.json not found{Style.RESET_ALL}')
print(f'{Fore.CYAN} → Create config/global.json with default settings{Style.RESET_ALL}')
return config
def load_streamer_config(self, username: str) -> Dict[str, Any]:
"""
Load configuration for a specific streamer.
Merges global config with streamer-specific overrides.
Args:
username: Twitch username
Returns:
dict: Complete configuration for the streamer
"""
# Start with global config
config = self.global_config.copy()
# Load streamer-specific config
streamer_file = self.streamers_dir / f"{username}.json"
if streamer_file.exists():
try:
with open(streamer_file, 'r', encoding='utf-8') as f:
streamer_config = json.load(f)
# Filter out comments and schema references
streamer_config = {k: v for k, v in streamer_config.items()
if not k.startswith('_') and k != '$schema'}
# Merge streamer config (overrides global)
config.update(streamer_config)
print(f'{Fore.GREEN}✓ Loaded config for {username}{Style.RESET_ALL}')
except json.JSONDecodeError as e:
print(f'{Fore.YELLOW}⚠ Warning: Invalid JSON in {streamer_file}: {e}{Style.RESET_ALL}')
except Exception as e:
print(f'{Fore.YELLOW}⚠ Warning: Could not load {streamer_file}: {e}{Style.RESET_ALL}')
else:
# Create default config for new streamer
print(f'{Fore.CYAN}→ Creating default config for new streamer: {username}{Style.RESET_ALL}')
self.create_default_streamer_config(username)
config['username'] = username
config['enabled'] = True
# Ensure username is set
config['username'] = username
return config
def create_default_streamer_config(self, username: str) -> None:
"""
Create a default configuration file for a new streamer.
Args:
username: Twitch username
"""
# Ensure streamers directory exists
self.streamers_dir.mkdir(parents=True, exist_ok=True)
streamer_file = self.streamers_dir / f"{username}.json"
default_config = {
"$schema": "../streamer.schema.json",
"username": username,
"enabled": True
}
try:
with open(streamer_file, 'w', encoding='utf-8') as f:
json.dump(default_config, f, indent=2)
print(f'{Fore.GREEN}✓ Created config file: config/streamers/{username}.json{Style.RESET_ALL}')
print(f'{Fore.CYAN} → Edit the file to add custom settings or overrides{Style.RESET_ALL}')
except Exception as e:
print(f'{Fore.RED}✗ Could not create config file for {username}: {e}{Style.RESET_ALL}')
def get_all_enabled_streamers(self) -> list:
"""
Get list of all enabled streamers.
Returns:
list: List of usernames configured and enabled
"""
if not self.streamers_dir.exists():
return []
enabled_streamers = []
for config_file in self.streamers_dir.glob("*.json"):
try:
with open(config_file, 'r', encoding='utf-8') as f:
config = json.load(f)
# Filter comments and schema references
config = {k: v for k, v in config.items()
if not k.startswith('_') and k != '$schema'}
if config.get('enabled', False):
username = config.get('username') or config_file.stem
enabled_streamers.append(username)
except Exception as e:
print(f'{Fore.YELLOW}⚠ Warning: Could not read {config_file}: {e}{Style.RESET_ALL}')
return enabled_streamers

46
modules/constants.py Normal file
View file

@ -0,0 +1,46 @@
"""
Constants and default configuration values for Twitch Archive.
"""
# API Endpoints
TWITCH_OAUTH_URL = "https://id.twitch.tv/oauth2/token"
TWITCH_API_URL = "https://api.twitch.tv/helix"
TWITCH_GQL_URL = "https://gql.twitch.tv/gql"
TWITCH_GQL_CLIENT_ID = "kimne78kx3ncx6brgo4mv6wki5h1ko"
# File prefixes for different content types
PREFIX_LIVE = "LIVE_"
PREFIX_VOD = "VOD_"
PREFIX_CHAT = "CHAT_"
PREFIX_METADATA = "METADA_" # Note: keeping original typo for compatibility
# Default configuration values
DEFAULT_CONFIG = {
'username': 'your_twitch_username',
'quality': 'best',
'root_path': 'archive',
'rclone_path': 'remote:path/to/streams',
'refresh': 60.0,
'streamlink_ttvlol': False,
'notifications': False,
'downloadMETADATA': True,
'downloadVOD': True,
'downloadCHAT': True,
'downloadLiveCHAT': True,
'vodTimeout': 300,
'uploadCloud': True,
'deleteFiles': False,
'onlyRaw': False,
'cleanRaw': True,
'hls_segments': 3,
'hls_segmentsVOD': 10,
# FFmpeg 8.0+ Enhancement Options
'ffmpeg_hwaccel': 'auto', # Hardware acceleration: 'auto', 'nvenc', 'qsv', 'amf', 'vaapi', 'none'
'ffmpeg_threads': 0, # Thread count (0 = auto-detect)
'ffmpeg_audio_codec': 'aac', # Audio codec for audio-only streams
'ffmpeg_audio_samplerate': 48000, # Audio sample rate (48000 recommended for broadcasts)
'ffmpeg_audio_bitrate': '192k', # Audio bitrate
'ffmpeg_error_recovery': True, # Enable error recovery for corrupted streams
'ffmpeg_faststart': True, # Enable faststart for MP4 (better streaming compatibility)
'ffmpeg_progress': False # Show encoding progress
}

281
modules/downloader.py Normal file
View file

@ -0,0 +1,281 @@
"""
VOD and chat downloading functionality using TwitchDownloaderCLI.
"""
import os
import subprocess
from typing import Dict, Any, Optional
from colorama import Fore, Style
from .utils import get_bin_path
class ContentDownloader:
"""Handles VOD and chat downloading using TwitchDownloaderCLI."""
def __init__(self, twitch_downloader_path: str, ffmpeg_path: str, config: dict):
"""
Initialize the content downloader.
Args:
twitch_downloader_path: Path to TwitchDownloaderCLI executable
ffmpeg_path: Path to FFmpeg executable
config: Configuration dictionary
"""
self.twitch_downloader_path = twitch_downloader_path
self.ffmpeg_path = ffmpeg_path
self.quality = config.get('quality', 'best')
self.hls_segments_vod = config.get('hls_segmentsVOD', 10)
self.download_vod = config.get('downloadVOD', True)
self.download_chat = config.get('downloadCHAT', True)
self.download_live_chat = config.get('downloadLiveCHAT', True)
def download_vod(self, vod_info: Dict[str, Any], output_path: str) -> bool:
"""
Download VOD using TwitchDownloaderCLI.
Args:
vod_info: VOD metadata from Twitch API
output_path: Path where the VOD will be saved
Returns:
bool: True if download succeeded, False otherwise
"""
if not self.download_vod:
return False
print(f'\n{Fore.CYAN}Downloading VOD: {vod_info["title"]}{Style.RESET_ALL}')
# Extract numeric VOD ID
vod_id = vod_info["id"]
if isinstance(vod_id, str) and vod_id.startswith('v'):
vod_id = vod_id[1:]
vod_url = f"https://www.twitch.tv/videos/{vod_id}"
print(f'{Fore.YELLOW}VOD URL: {vod_url}{Style.RESET_ALL}')
bin_path = get_bin_path()
cmd = [
self.twitch_downloader_path,
'videodownload',
'-u', vod_url,
'-q', self.quality,
'-t', str(self.hls_segments_vod),
'--ffmpeg-path', self.ffmpeg_path,
'--temp-path', os.path.join(bin_path, 'temp'),
'--collision', 'Rename',
'-o', output_path
]
try:
result = subprocess.call(cmd)
if result == 0:
print(f'{Fore.GREEN}✓ VOD downloaded{Style.RESET_ALL}')
return True
else:
print(f'{Fore.RED}✗ VOD download failed with exit code: {result}{Style.RESET_ALL}')
return False
except Exception as e:
print(f'{Fore.RED}✗ VOD download failed: {str(e)}{Style.RESET_ALL}')
return False
def download_chat_json(self, vod_id: str, json_path: str) -> bool:
"""
Download chat JSON for a VOD.
Args:
vod_id: VOD ID
json_path: Path to save chat JSON
Returns:
bool: True if succeeded, False otherwise
"""
# Remove 'v' prefix if present
if isinstance(vod_id, str) and vod_id.startswith('v'):
vod_id = vod_id[1:]
print(f'{Fore.YELLOW}Downloading chat JSON for VOD {vod_id}...{Style.RESET_ALL}')
try:
result = subprocess.call([
self.twitch_downloader_path, 'chatdownload',
'--id', vod_id,
'--embed-images',
'--collision', 'Rename',
'-o', json_path
])
if result != 0:
print(f'{Fore.RED}✗ Chat JSON download failed with exit code: {result}{Style.RESET_ALL}')
return False
if not os.path.exists(json_path):
print(f'{Fore.RED}✗ Chat JSON file was not created{Style.RESET_ALL}')
return False
print(f'{Fore.GREEN}✓ Chat JSON downloaded{Style.RESET_ALL}')
return True
except Exception as e:
print(f'{Fore.RED}✗ Chat download failed: {str(e)}{Style.RESET_ALL}')
return False
def render_chat(self, json_path: str, video_path: str, output_args: str) -> bool:
"""
Render chat JSON as a video.
Args:
json_path: Path to chat JSON file
video_path: Path to save rendered chat video
output_args: FFmpeg output arguments for encoding
Returns:
bool: True if succeeded, False otherwise
"""
if not os.path.exists(json_path):
print(f'{Fore.RED}✗ Chat JSON file not found: {json_path}{Style.RESET_ALL}')
return False
bin_path = get_bin_path()
# Chat rendering settings
chat_settings = [
'--background-color', '#FF111111',
'-w', '500',
'-h', '1080',
'--outline',
'-f', 'Arial',
'--font-size', '22',
'--update-rate', '1.0',
'--offline',
'--output-args', output_args,
'--ffmpeg-path', self.ffmpeg_path,
'--temp-path', os.path.join(bin_path, 'temp'),
'--collision', 'Rename'
]
try:
print(f'{Fore.YELLOW}Rendering chat video...{Style.RESET_ALL}')
# Debug output
full_cmd = [self.twitch_downloader_path, 'chatrender', '-i', json_path, '-o', video_path] + chat_settings
print(f'{Fore.CYAN}DEBUG - Chat render command:{Style.RESET_ALL}')
print(f'{Fore.CYAN} Output args passed: {repr(output_args)}{Style.RESET_ALL}')
result = subprocess.call(full_cmd)
if result != 0:
print(f'{Fore.RED}✗ Chat render failed with exit code: {result}{Style.RESET_ALL}')
return False
print(f'{Fore.GREEN}✓ Chat rendered{Style.RESET_ALL}')
return True
except Exception as e:
print(f'{Fore.RED}✗ Chat rendering failed: {str(e)}{Style.RESET_ALL}')
return False
def download_and_render_chat(self, vod_info: Dict[str, Any], json_path: str,
video_path: str, output_args: str) -> bool:
"""
Download chat logs and render them as video.
Args:
vod_info: VOD metadata from Twitch API
json_path: Path to save chat JSON
video_path: Path to save rendered chat video
output_args: FFmpeg output arguments for encoding
Returns:
bool: True if succeeded, False otherwise
"""
if not self.download_chat:
return False
print(f'\n{Fore.CYAN}Downloading chat: {vod_info["title"]}{Style.RESET_ALL}')
# Extract numeric VOD ID
vod_id = vod_info["id"]
# Download chat JSON
if not self.download_chat_json(vod_id, json_path):
return False
# Render chat video
return self.render_chat(json_path, video_path, output_args)
def start_live_chat_download(self, vod_id: str, json_path: str) -> Optional[subprocess.Popen]:
"""
Start downloading live chat in the background while stream is recording.
Args:
vod_id: The VOD/stream ID to download chat from
json_path: Path to save chat JSON
Returns:
subprocess.Popen: The process handle, or None if failed to start
"""
if not self.download_live_chat:
return None
print(f'\n{Fore.CYAN}Starting live chat download...{Style.RESET_ALL}')
# Remove 'v' prefix if present
if isinstance(vod_id, str) and vod_id.startswith('v'):
vod_id = vod_id[1:]
try:
cmd = [
self.twitch_downloader_path, 'chatdownload',
'--id', vod_id,
'--embed-images',
'--collision', 'Rename',
'-o', json_path
]
print(f'{Fore.YELLOW}Live chat download started in background for VOD {vod_id}{Style.RESET_ALL}')
process = subprocess.Popen(
cmd,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL
)
return process
except Exception as e:
print(f'{Fore.RED}✗ Failed to start live chat download: {str(e)}{Style.RESET_ALL}')
return None
def wait_for_chat_download(self, process: Optional[subprocess.Popen],
json_path: str, timeout: int = 300) -> bool:
"""
Wait for live chat download process to complete.
Args:
process: The chat download process handle
json_path: Path where chat JSON should be saved
timeout: Maximum time to wait in seconds
Returns:
bool: True if chat download succeeded, False otherwise
"""
if process is None:
return False
try:
print(f'{Fore.YELLOW}Waiting for live chat download to complete...{Style.RESET_ALL}')
return_code = process.wait(timeout=timeout)
if return_code == 0 and os.path.exists(json_path):
print(f'{Fore.GREEN}✓ Live chat JSON downloaded{Style.RESET_ALL}')
return True
else:
print(f'{Fore.RED}✗ Live chat download failed (exit code: {return_code}){Style.RESET_ALL}')
return False
except subprocess.TimeoutExpired:
print(f'{Fore.YELLOW}⚠ Live chat download timed out, terminating...{Style.RESET_ALL}')
process.terminate()
return False
except Exception as e:
print(f'{Fore.RED}✗ Error waiting for chat download: {str(e)}{Style.RESET_ALL}')
return False

237
modules/file_manager.py Normal file
View file

@ -0,0 +1,237 @@
"""
Cloud storage and file management for Twitch Archive.
"""
import os
import json
import pathlib
import subprocess
from typing import List
from colorama import Fore, Style
from .constants import PREFIX_LIVE, PREFIX_VOD, PREFIX_CHAT, PREFIX_METADATA
from .utils import get_bin_path
class FileManager:
"""Handles file operations, cloud uploads, and cleanup."""
def __init__(self, root_path: str, username: str, config: dict):
"""
Initialize the file manager.
Args:
root_path: Root directory for archives
username: Twitch username
config: Configuration dictionary
"""
self.root_path = pathlib.Path(root_path)
self.username = username
self.upload_cloud = config.get('uploadCloud', True)
self.delete_files = config.get('deleteFiles', False)
self.clean_raw = config.get('cleanRaw', True)
self.download_vod = config.get('downloadVOD', True)
self.download_chat = config.get('downloadCHAT', True)
self.download_metadata = config.get('downloadMETADATA', True)
self.rclone_path = config.get('rclone_path', 'remote:path')
# Initialize paths
self.raw_path = self.root_path / username / "video" / "raw"
self.video_path = self.root_path / username / "video"
self.chat_json_path = self.root_path / username / "chat" / "json"
self.chat_mp4_path = self.root_path / username / "chat"
self.metadata_path = self.root_path / username / "metadata"
self.log_file = self.root_path / ".log"
def initialize_directories(self) -> None:
"""Create all necessary directory structures."""
for path in [self.raw_path, self.video_path, self.chat_json_path,
self.chat_mp4_path, self.metadata_path]:
path.mkdir(parents=True, exist_ok=True)
# Create log file if it doesn't exist
if not self.log_file.exists():
self.log_file.touch()
def is_stream_processed(self, stream_id: str) -> bool:
"""
Check if a stream has already been processed.
Args:
stream_id: Unique identifier for the stream
Returns:
bool: True if already processed, False otherwise
"""
with open(self.log_file, 'r', encoding='utf-8') as f:
return stream_id in f.read()
def mark_stream_processed(self, stream_id: str) -> None:
"""Add stream to log file to prevent re-processing."""
with open(self.log_file, 'a', encoding='utf-8') as f:
f.write(f"{stream_id}\n")
def save_metadata(self, vod_info: dict, filename_base: str) -> None:
"""
Save VOD metadata to JSON file.
Args:
vod_info: VOD metadata from Twitch API
filename_base: Base filename (without extension)
"""
if not self.download_metadata:
return
metadata_path = self.metadata_path / f"{PREFIX_METADATA}{filename_base}.json"
with open(metadata_path, 'w', encoding='utf-8') as f:
json.dump(vod_info, f, ensure_ascii=False, indent=4)
print(f'{Fore.GREEN}✓ Metadata saved{Style.RESET_ALL}')
def clean_raw_file(self, raw_path: str) -> None:
"""
Delete raw .ts file if configured.
Args:
raw_path: Path to raw file
"""
if self.clean_raw and os.path.exists(raw_path):
print(f'{Fore.YELLOW}Deleting raw .ts file...{Style.RESET_ALL}')
os.remove(raw_path)
def upload_to_cloud(self, filename_base: str, notification_callback=None) -> bool:
"""
Upload archived files to cloud storage using rclone.
Args:
filename_base: Base filename (without prefixes/extensions)
notification_callback: Optional callback to send notifications
Returns:
bool: True if upload succeeded or is disabled, False if failed
"""
if not self.upload_cloud:
return True
print(f'\n{Fore.CYAN}Uploading to cloud storage...{Style.RESET_ALL}')
if notification_callback:
notification_callback(f'☁ Uploading - {filename_base}', 'Uploading files to cloud storage')
# Create list of files to upload
bin_path = get_bin_path()
upload_list_path = os.path.join(bin_path, 'temp', 'upload.txt')
# Ensure temp directory exists
os.makedirs(os.path.dirname(upload_list_path), exist_ok=True)
files_to_upload = [
f"{PREFIX_LIVE}{filename_base}.ts",
f"{PREFIX_LIVE}{filename_base}.mp4",
f"{PREFIX_LIVE}{filename_base}.mp3",
f"{PREFIX_VOD}{filename_base}.ts",
f"{PREFIX_VOD}{filename_base}.mp4",
f"{PREFIX_VOD}{filename_base}.mp3",
f"{PREFIX_METADATA}{filename_base}.json",
f"{PREFIX_CHAT}{filename_base}.json",
f"{PREFIX_CHAT}{filename_base}.mp4"
]
with open(upload_list_path, 'w') as f:
f.write('\n'.join(files_to_upload))
# Run rclone
try:
result = subprocess.call([
'rclone', 'copy',
str(self.root_path.resolve()),
self.rclone_path,
'--include-from', upload_list_path
])
# Clean up upload list
if os.path.exists(upload_list_path):
os.remove(upload_list_path)
if result == 0:
print(f'{Fore.GREEN}✓ Upload complete{Style.RESET_ALL}')
if notification_callback:
notification_callback(f'✓ Upload Success - {filename_base}', 'All files uploaded successfully')
return True
else:
print(f'{Fore.RED}✗ Upload failed (exit code: {result}){Style.RESET_ALL}')
print(f'{Fore.YELLOW}Files preserved locally due to upload failure{Style.RESET_ALL}')
if notification_callback:
notification_callback(f'✗ Upload Failed - {filename_base}',
f'Upload failed with code {result}. Files preserved locally.')
return False
except Exception as e:
print(f'{Fore.RED}✗ Upload error: {str(e)}{Style.RESET_ALL}')
return False
def delete_local_files(self, filename_base: str, live_raw_path: str,
live_proc_path: str, notification_callback=None) -> None:
"""
Delete local archive files after successful upload.
Args:
filename_base: Base filename (without prefixes/extensions)
live_raw_path: Path to live raw file
live_proc_path: Path to live processed file
notification_callback: Optional callback to send notifications
"""
print(f'\n{Fore.RED}{"=" * 60}{Style.RESET_ALL}')
print(f'{Fore.RED}⚠ DELETING LOCAL FILES{Style.RESET_ALL}')
print(f'{Fore.RED}{"=" * 60}{Style.RESET_ALL}\n')
if notification_callback:
notification_callback(f'🗑 Deleting - {filename_base}',
'Deleting local files after successful upload')
files_to_delete: List[str] = []
# Live files
if not self.clean_raw and os.path.exists(live_raw_path):
files_to_delete.append(live_raw_path)
if os.path.exists(live_proc_path):
files_to_delete.append(live_proc_path)
# VOD files
if self.download_vod:
vod_raw = self.raw_path / f"{PREFIX_VOD}{filename_base}.ts"
vod_mp4 = self.video_path / f"{PREFIX_VOD}{filename_base}.mp4"
vod_mp3 = self.video_path / f"{PREFIX_VOD}{filename_base}.mp3"
if not self.clean_raw and vod_raw.exists():
files_to_delete.append(str(vod_raw))
if vod_mp4.exists():
files_to_delete.append(str(vod_mp4))
if vod_mp3.exists():
files_to_delete.append(str(vod_mp3))
# Chat files
if self.download_chat:
chat_json = self.chat_json_path / f"{PREFIX_CHAT}{filename_base}.json"
chat_mp4 = self.chat_mp4_path / f"{PREFIX_CHAT}{filename_base}.mp4"
if chat_json.exists():
files_to_delete.append(str(chat_json))
if chat_mp4.exists():
files_to_delete.append(str(chat_mp4))
# Metadata files
if self.download_metadata:
metadata = self.metadata_path / f"{PREFIX_METADATA}{filename_base}.json"
if metadata.exists():
files_to_delete.append(str(metadata))
# Delete all files
for filepath in files_to_delete:
try:
print(f'{Fore.RED} Deleting: {os.path.basename(filepath)}{Style.RESET_ALL}')
os.remove(filepath)
except Exception as e:
print(f'{Fore.YELLOW} ⚠ Failed to delete {filepath}: {e}{Style.RESET_ALL}')
print(f'{Fore.RED}\n✓ Cleanup complete{Style.RESET_ALL}')

68
modules/notifications.py Normal file
View file

@ -0,0 +1,68 @@
"""
Email notification functionality for Twitch Archive.
"""
import os
import socket
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from colorama import Fore, Style
class NotificationManager:
"""Handles email notifications via Gmail SMTP."""
def __init__(self, enabled: bool = False, username: str = ""):
"""
Initialize the notification manager.
Args:
enabled: Whether notifications are enabled
username: Streamer username for notification subject
"""
self.enabled = enabled
self.username = username
def send(self, subject: str, content: str) -> None:
"""
Send email notification via Gmail SMTP.
Only sends if notifications are enabled in configuration.
Requires SENDER, RECEIVER, and PASSWD in .env file.
Args:
subject: Email subject line
content: Email body content
"""
if not self.enabled:
return
try:
sender = os.getenv("SENDER")
receiver = os.getenv("RECEIVER")
password = os.getenv("PASSWD")
if not all([sender, receiver, password]):
print(f'{Fore.YELLOW}⚠ Notification skipped: Missing email credentials in .env{Style.RESET_ALL}')
return
# Construct email
msg = MIMEMultipart()
msg['From'] = sender
msg['To'] = receiver
msg['Subject'] = f"{self.username} - {subject}"
body = f"Stream: {self.username}\n\n{content}"
msg.attach(MIMEText(body, 'plain'))
# Send via Gmail SMTP
with smtplib.SMTP('smtp.gmail.com', 587) as server:
server.starttls()
server.login(sender, password)
server.sendmail(sender, receiver, msg.as_string())
except socket.error as e:
print(f'{Fore.YELLOW}⚠ Notification failed: {str(e)}{Style.RESET_ALL}')
except Exception as e:
print(f'{Fore.YELLOW}⚠ Notification error: {str(e)}{Style.RESET_ALL}')

171
modules/processor.py Normal file
View file

@ -0,0 +1,171 @@
"""
Video/audio processing functionality using FFmpeg.
"""
import os
import subprocess
from colorama import Fore, Style
from .utils import detect_hardware_acceleration, get_hwaccel_encoder
class StreamProcessor:
"""Handles video/audio processing using FFmpeg."""
def __init__(self, os_type: str, ffmpeg_path: str, config: dict):
"""
Initialize the stream processor.
Args:
os_type: Operating system type ('windows' or 'linux')
ffmpeg_path: Path to FFmpeg executable
config: Configuration dictionary with FFmpeg settings
"""
self.os_type = os_type
self.ffmpeg_path = ffmpeg_path
self.quality = config.get('quality', 'best')
self.only_raw = config.get('onlyRaw', False)
self.ffmpeg_threads = config.get('ffmpeg_threads', 0)
self.ffmpeg_audio_codec = config.get('ffmpeg_audio_codec', 'aac')
self.ffmpeg_audio_samplerate = config.get('ffmpeg_audio_samplerate', 48000)
self.ffmpeg_audio_bitrate = config.get('ffmpeg_audio_bitrate', '192k')
self.ffmpeg_error_recovery = config.get('ffmpeg_error_recovery', True)
self.ffmpeg_faststart = config.get('ffmpeg_faststart', True)
self.ffmpeg_progress = config.get('ffmpeg_progress', False)
self.hwaccel_type = detect_hardware_acceleration(
config.get('ffmpeg_hwaccel', 'auto'),
os_type
)
def process_raw_stream(self, raw_path: str, output_path: str) -> None:
"""
Process raw .ts file into mp4/mp3 using ffmpeg.
Args:
raw_path: Path to the raw .ts file
output_path: Path for the processed output file
"""
if not os.path.exists(raw_path):
print(f'{Fore.YELLOW}⚠ Raw file not found, skipping processing{Style.RESET_ALL}')
return
if self.only_raw:
print(f'{Fore.CYAN}Keeping raw .ts file (onlyRaw mode){Style.RESET_ALL}')
return
print(f'{Fore.YELLOW}Processing raw stream file...{Style.RESET_ALL}')
# Build ffmpeg command based on quality
if self.quality == 'audio_only':
self._process_audio(raw_path, output_path)
else:
self._process_video(raw_path, output_path)
print(f'{Fore.GREEN}✓ Stream processed successfully{Style.RESET_ALL}')
def _process_audio(self, raw_path: str, output_path: str) -> None:
"""Process audio-only stream."""
# Audio-only conversion with modern AAC encoding
cmd = [
self.ffmpeg_path,
'-i', raw_path,
'-vn', # No video
'-c:a', self.ffmpeg_audio_codec,
'-ar', str(self.ffmpeg_audio_samplerate),
'-ac', '2', # Stereo
'-b:a', self.ffmpeg_audio_bitrate,
]
# Add threading for faster encoding
if self.ffmpeg_threads > 0:
cmd.extend(['-threads', str(self.ffmpeg_threads)])
# Add faststart for better streaming compatibility
if self.ffmpeg_faststart and output_path.endswith(('.mp4', '.m4a')):
cmd.extend(['-movflags', '+faststart'])
cmd.append(output_path)
# Run FFmpeg
if self.ffmpeg_progress:
subprocess.call(cmd)
else:
subprocess.call(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT)
def _process_video(self, raw_path: str, output_path: str) -> None:
"""Process video stream."""
cmd = [
self.ffmpeg_path,
'-y', # Overwrite output file
]
# Add hardware acceleration if enabled
if self.hwaccel_type and self.hwaccel_type != 'none':
print(f'{Fore.CYAN}Using hardware acceleration: {self.hwaccel_type}{Style.RESET_ALL}')
cmd.extend(['-hwaccel', 'auto'])
cmd.extend([
'-i', raw_path,
'-analyzeduration', '2147483647',
'-probesize', '2147483647',
])
# Threading support
if self.ffmpeg_threads >= 0:
cmd.extend(['-threads', str(self.ffmpeg_threads)])
# Error recovery options for corrupted streams
if self.ffmpeg_error_recovery:
cmd.extend([
'-fflags', '+genpts',
'-avoid_negative_ts', 'make_zero',
'-err_detect', 'ignore_err'
])
# Stream copy (fast, no re-encoding)
cmd.extend([
'-c:v', 'copy',
'-c:a', 'copy',
'-start_at_zero',
'-copyts',
])
# Add faststart for MP4 files
if self.ffmpeg_faststart and output_path.endswith('.mp4'):
cmd.extend(['-movflags', '+faststart'])
cmd.append(output_path)
# Run FFmpeg
if self.ffmpeg_progress:
subprocess.call(cmd)
else:
subprocess.call(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT)
def build_chat_output_args(self) -> str:
"""
Build FFmpeg output arguments for chat rendering with hardware acceleration support.
Returns:
str: Complete FFmpeg output arguments string with "{save_path}" placeholder
"""
if self.hwaccel_type and self.hwaccel_type != 'none':
encoder = get_hwaccel_encoder(self.hwaccel_type)
print(f'{Fore.CYAN}Using hardware-accelerated encoder for chat: {encoder}{Style.RESET_ALL}')
if 'nvenc' in encoder:
result = f'-c:v {encoder} -preset p4 -cq 18 -pix_fmt yuv420p "{{save_path}}"'
elif 'qsv' in encoder:
result = f'-c:v {encoder} -global_quality 18 -pix_fmt yuv420p "{{save_path}}"'
elif 'amf' in encoder:
result = f'-c:v {encoder} -qp_i 18 -pix_fmt yuv420p "{{save_path}}"'
elif 'vaapi' in encoder:
result = f'-c:v {encoder} -qp 18 -pix_fmt yuv420p "{{save_path}}"'
else:
result = f'-c:v libx264 -preset veryfast -crf 18 -pix_fmt yuv420p "{{save_path}}"'
else:
# Default software encoding
result = f'-c:v libx264 -preset veryfast -crf 18 -pix_fmt yuv420p "{{save_path}}"'
print(f'{Fore.CYAN}DEBUG - Generated output_args: {result}{Style.RESET_ALL}')
return result

105
modules/recorder.py Normal file
View file

@ -0,0 +1,105 @@
"""
Live stream recording functionality for Twitch Archive.
"""
import os
import subprocess
from typing import Dict, Any, Optional
from colorama import Fore, Style
class StreamRecorder:
"""Handles live stream recording using streamlink."""
def __init__(self, username: str, quality: str, refresh: float,
hls_segments: int, streamlink_ttvlol: bool, shutdown_callback=None):
"""
Initialize the stream recorder.
Args:
username: Twitch username to record
quality: Stream quality (e.g., 'best', '1080p60', 'audio_only')
refresh: Retry interval in seconds
hls_segments: Number of parallel HLS segments to download
streamlink_ttvlol: Enable ttv-lol proxy (deprecated)
shutdown_callback: Callable to check if shutdown was requested
"""
self.username = username
self.quality = quality
self.refresh = refresh
self.hls_segments = hls_segments
self.streamlink_ttvlol = streamlink_ttvlol
self.shutdown_callback = shutdown_callback
self.current_process = None
def record(self, stream_info: Dict[str, Any], output_path: str) -> bool:
"""
Record a live Twitch stream using streamlink.
Args:
stream_info: Stream metadata from Twitch API
output_path: Path where the raw .ts file will be saved
Returns:
bool: True if recording completed normally, False if interrupted
"""
print(f'\n{Fore.CYAN}{"=" * 60}{Style.RESET_ALL}')
print(f'{Fore.GREEN}🔴 STREAM STARTED: {stream_info["title"]}{Style.RESET_ALL}')
print(f'{Fore.CYAN}{"=" * 60}{Style.RESET_ALL}\n')
# Build streamlink command
cmd = [
'streamlink',
f'twitch.tv/{self.username}',
self.quality,
'--hls-live-restart',
'--retry-streams', str(int(self.refresh)),
'--force',
'-o', output_path
]
# Add segment threads for faster downloads (requires streamlink 5.0+)
if self.hls_segments > 1:
cmd.extend(['--stream-segment-threads', str(self.hls_segments)])
# Add ad-blocking if enabled (deprecated warning)
if self.streamlink_ttvlol:
print(f'{Fore.YELLOW}⚠ Warning: ttv-lol proxy option is deprecated in newer streamlink versions{Style.RESET_ALL}')
print(f'{Fore.YELLOW} Consider disabling streamlink_ttvlol in config or using alternative methods{Style.RESET_ALL}')
# Add authentication if available
oauth_token = os.getenv("OAUTH-PRIVATE-TOKEN", "")
if oauth_token and oauth_token != "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx":
cmd.extend(['--twitch-api-header', f'Authorization=OAuth {oauth_token}'])
# Show command being executed (hide OAuth token for security)
cmd_display = [c if 'OAuth' not in str(c) else 'Authorization=OAuth [HIDDEN]' for c in cmd]
print(f'{Fore.CYAN}Command: {" ".join(cmd_display)}{Style.RESET_ALL}')
# Record the stream (this blocks until stream ends)
print(f'{Fore.YELLOW}Recording stream...{Style.RESET_ALL}')
try:
self.current_process = subprocess.Popen(cmd)
return_code = self.current_process.wait()
self.current_process = None
if self.shutdown_callback and self.shutdown_callback():
print(f'{Fore.YELLOW}✓ Recording stopped by user{Style.RESET_ALL}')
# Return True so processing continues - we still want to process what was recorded
return True
print(f'{Fore.GREEN}✓ Stream recording complete{Style.RESET_ALL}')
return True
except Exception as e:
self.current_process = None
print(f'{Fore.RED}✗ Recording error: {str(e)}{Style.RESET_ALL}')
return False
def stop(self) -> None:
"""Stop the current recording process."""
if self.current_process:
try:
self.current_process.terminate()
print(f'{Fore.YELLOW}Stopping recording process...{Style.RESET_ALL}')
except Exception:
pass

140
modules/stream_monitor.py Normal file
View file

@ -0,0 +1,140 @@
"""
Stream monitoring and API interaction for Twitch Archive.
"""
import os
import sys
from typing import Dict, Optional, Any
import requests
from colorama import Fore, Style
from .constants import TWITCH_OAUTH_URL, TWITCH_API_URL, TWITCH_GQL_URL, TWITCH_GQL_CLIENT_ID
class StreamMonitor:
"""Handles Twitch API interactions for monitoring stream status."""
def __init__(self, username: str):
"""
Initialize the stream monitor.
Args:
username: Twitch username to monitor
"""
self.username = username
self._oauth_token = None
def get_oauth_token(self) -> str:
"""
Get OAuth token from Twitch API.
Uses CLIENT-ID and CLIENT-SECRET from environment variables.
Returns:
str: OAuth access token
Raises:
SystemExit: If authentication fails
"""
if self._oauth_token:
return self._oauth_token
try:
url = f"{TWITCH_OAUTH_URL}?client_id={os.getenv('CLIENT-ID')}&client_secret={os.getenv('CLIENT-SECRET')}&grant_type=client_credentials"
response = requests.post(url, timeout=15)
response.raise_for_status()
self._oauth_token = response.json()['access_token']
return self._oauth_token
except requests.exceptions.RequestException as e:
print(f'{Fore.RED}✗ ERROR: Failed to authenticate with Twitch API{Style.RESET_ALL}')
print(f'{Fore.YELLOW} {str(e)}{Style.RESET_ALL}')
print(f'{Fore.CYAN} → Check your CLIENT-ID and CLIENT-SECRET in the .env file{Style.RESET_ALL}')
sys.exit(1)
except KeyError:
print(f'{Fore.RED}✗ ERROR: Invalid response from Twitch API{Style.RESET_ALL}')
print(f'{Fore.CYAN} → Verify your CLIENT-ID and CLIENT-SECRET are correct{Style.RESET_ALL}')
sys.exit(1)
def validate_username(self) -> bool:
"""
Validate that the configured Twitch username exists.
Returns:
bool: True if username exists, False otherwise
Raises:
SystemExit: If username is invalid or doesn't exist
"""
try:
url = f'{TWITCH_API_URL}/users?login={self.username}'
headers = {
"Authorization": f"Bearer {self.get_oauth_token()}",
"Client-ID": os.getenv('CLIENT-ID')
}
response = requests.get(url, headers=headers, timeout=15)
response.raise_for_status()
data = response.json()
if not data.get('data'):
print(f'{Fore.RED}✗ ERROR: Twitch user "{self.username}" not found{Style.RESET_ALL}')
print(f'{Fore.CYAN} → Check the username in your config file{Style.RESET_ALL}')
sys.exit(1)
print(f'{Fore.GREEN}✓ Username "{self.username}" validated{Style.RESET_ALL}')
return True
except requests.exceptions.RequestException as e:
print(f'{Fore.RED}✗ ERROR: Could not validate username{Style.RESET_ALL}')
print(f'{Fore.YELLOW} {str(e)}{Style.RESET_ALL}')
sys.exit(1)
def check_stream_status(self) -> Optional[Dict[str, Any]]:
"""
Check if the configured user is currently live.
Returns:
dict: Stream information if live, None if offline
Raises:
SystemExit: If API request fails
"""
query = f'query{{user(login: "{self.username}") {{stream{{archiveVideo{{id}}title createdAt}}}}}}'
try:
response = requests.post(
TWITCH_GQL_URL,
json={'query': query},
headers={"Client-ID": TWITCH_GQL_CLIENT_ID},
timeout=15
)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
print(f'{Fore.RED}✗ ERROR: Failed to check stream status{Style.RESET_ALL}')
print(f'{Fore.YELLOW} {str(e)}{Style.RESET_ALL}')
sys.exit(1)
def get_latest_vod(self) -> Optional[Dict[str, Any]]:
"""
Get the most recent VOD for the configured user.
Returns:
dict: VOD information, or None if no VODs found
"""
query = f'query {{user(login: "{self.username}") {{videos(first: 1) {{edges {{node {{id title description recordedAt lengthSeconds animatedPreviewURL previewThumbnailURL(height: 1280, width: 720) thumbnailURLs(height: 1280, width: 720)}}}}}}}}}}'
try:
response = requests.post(
TWITCH_GQL_URL,
json={'query': query},
headers={"Client-ID": TWITCH_GQL_CLIENT_ID},
timeout=15
)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
print(f'{Fore.YELLOW}⚠ Warning: Could not fetch latest VOD{Style.RESET_ALL}')
print(f'{Fore.YELLOW} {str(e)}{Style.RESET_ALL}')
return None

231
modules/utils.py Normal file
View file

@ -0,0 +1,231 @@
"""
Utility functions and helpers for Twitch Archive.
"""
import os
import sys
import pathlib
import subprocess
from typing import Optional
from colorama import Fore, Style
def detect_operating_system() -> str:
"""
Detect the current operating system.
Returns:
str: 'windows' or 'linux'
Raises:
SystemExit: If OS is not supported
"""
if sys.platform.startswith('win32'):
return 'windows'
elif sys.platform.startswith('linux'):
return 'linux'
else:
print(f'{Fore.RED}✗ ERROR: Unsupported operating system: {sys.platform}{Style.RESET_ALL}')
print(f'{Fore.YELLOW} This script only supports Windows and Linux{Style.RESET_ALL}')
sys.exit(1)
def get_bin_path() -> str:
"""Get the path to the bin directory containing external tools."""
return str(pathlib.Path(__file__).parent.parent.resolve() / "bin")
def get_ffmpeg_executable(os_type: str) -> str:
"""
Get the platform-specific ffmpeg executable path.
Args:
os_type: Operating system type ('windows' or 'linux')
Returns:
str: Path to ffmpeg executable
"""
bin_path = get_bin_path()
if os_type == 'windows':
return os.path.join(bin_path, 'ffmpeg.exe')
return os.path.join(bin_path, 'ffmpeg')
def get_twitch_downloader_executable(os_type: str) -> str:
"""
Get the platform-specific TwitchDownloaderCLI executable path.
Args:
os_type: Operating system type ('windows' or 'linux')
Returns:
str: Path to TwitchDownloaderCLI executable
"""
bin_path = get_bin_path()
if os_type == 'windows':
return os.path.join(bin_path, 'TwitchDownloaderCLI.exe')
return os.path.join(bin_path, 'TwitchDownloaderCLI')
def get_unique_filename(filepath: str) -> str:
"""
Generate a unique filename by appending a counter if file already exists.
Args:
filepath: The desired file path
Returns:
str: A unique file path (original or with _N suffix)
Example:
If 'video.mp4' exists, returns 'video_1.mp4'
If 'video_1.mp4' also exists, returns 'video_2.mp4'
"""
if not os.path.exists(filepath):
return filepath
# Split into components
directory = os.path.dirname(filepath)
filename = os.path.basename(filepath)
name, ext = os.path.splitext(filename)
# Find next available counter
counter = 1
while True:
new_filepath = os.path.join(directory, f"{name}_{counter}{ext}")
if not os.path.exists(new_filepath):
return new_filepath
counter += 1
def verify_streamlink() -> bool:
"""
Verify that streamlink is available.
Returns:
bool: True if streamlink is available, False otherwise
"""
try:
result = subprocess.run(['streamlink', '--version'],
capture_output=True,
text=True,
timeout=5)
if result.returncode == 0:
version = result.stdout.strip().split()[1] if len(result.stdout.split()) > 1 else 'unknown'
print(f'{Fore.GREEN}✓ Streamlink v{version} found{Style.RESET_ALL}')
return True
else:
raise FileNotFoundError()
except (FileNotFoundError, subprocess.TimeoutExpired, IndexError):
print(f'{Fore.RED}✗ ERROR: Streamlink not found{Style.RESET_ALL}')
print(f'{Fore.CYAN} → Install streamlink: pip install streamlink{Style.RESET_ALL}')
print(f'{Fore.CYAN} → Or download from: https://streamlink.github.io/{Style.RESET_ALL}')
return False
def verify_ffmpeg(os_type: str) -> bool:
"""
Verify that ffmpeg is available.
Args:
os_type: Operating system type ('windows' or 'linux')
Returns:
bool: True if ffmpeg is available, False otherwise
"""
try:
ffmpeg_path = get_ffmpeg_executable(os_type)
if os.path.exists(ffmpeg_path):
print(f'{Fore.GREEN}✓ FFmpeg found at {ffmpeg_path}{Style.RESET_ALL}')
return True
else:
print(f'{Fore.YELLOW}⚠ Warning: FFmpeg not found at {ffmpeg_path}{Style.RESET_ALL}')
print(f'{Fore.YELLOW} → Download FFmpeg and place it in the bin/ folder{Style.RESET_ALL}')
return False
except Exception as e:
print(f'{Fore.YELLOW}⚠ Warning: Could not verify FFmpeg: {e}{Style.RESET_ALL}')
return False
def verify_twitch_downloader(os_type: str) -> bool:
"""
Verify that TwitchDownloaderCLI is available.
Args:
os_type: Operating system type ('windows' or 'linux')
Returns:
bool: True if TwitchDownloaderCLI is available, False otherwise
"""
try:
downloader_path = get_twitch_downloader_executable(os_type)
if os.path.exists(downloader_path):
print(f'{Fore.GREEN}✓ TwitchDownloaderCLI found{Style.RESET_ALL}')
return True
else:
print(f'{Fore.YELLOW}⚠ Warning: TwitchDownloaderCLI not found at {downloader_path}{Style.RESET_ALL}')
print(f'{Fore.YELLOW} → Download from: https://github.com/lay295/TwitchDownloader/releases{Style.RESET_ALL}')
return False
except Exception as e:
print(f'{Fore.YELLOW}⚠ Warning: Could not verify TwitchDownloaderCLI: {e}{Style.RESET_ALL}')
return False
def detect_hardware_acceleration(hwaccel_config: str, os_type: str) -> Optional[str]:
"""
Detect available hardware acceleration based on config and system.
Args:
hwaccel_config: Hardware acceleration configuration ('auto', 'nvenc', 'qsv', 'amf', 'vaapi', 'none')
os_type: Operating system type ('windows' or 'linux')
Returns:
str: Hardware acceleration type or None
"""
# If user explicitly set to 'none', disable hardware acceleration
if hwaccel_config == 'none':
return 'none'
# If user specified a particular type, use it
if hwaccel_config in ['nvenc', 'qsv', 'amf', 'vaapi']:
return hwaccel_config
# Auto-detect: try to determine available hardware
if hwaccel_config == 'auto':
# On Windows, NVIDIA is most common
if os_type == 'windows':
# Could check for nvidia-smi, but just return 'auto' for ffmpeg to decide
return 'auto'
else:
# On Linux, VAAPI is common for Intel/AMD, or NVENC for NVIDIA
# Let ffmpeg auto-detect
return 'auto'
return None
def get_hwaccel_encoder(hwaccel_type: str) -> str:
"""
Get the appropriate hardware-accelerated encoder for the given acceleration type.
Args:
hwaccel_type: Type of hardware acceleration ('nvenc', 'qsv', 'amf', 'vaapi', 'auto', 'none')
Returns:
str: FFmpeg encoder name (e.g., 'h264_nvenc', 'libx264')
"""
encoder_map = {
'nvenc': 'h264_nvenc', # NVIDIA
'qsv': 'h264_qsv', # Intel Quick Sync
'amf': 'h264_amf', # AMD
'vaapi': 'h264_vaapi', # Linux VA-API
}
if hwaccel_type in encoder_map:
return encoder_map[hwaccel_type]
elif hwaccel_type == 'auto':
# Try NVENC first (most common), fall back to libx264
# In real usage, auto will attempt to use what's available
return 'h264_nvenc'
else:
return 'libx264' # Software encoding fallback