TwitchDownloader/twitch-archive.py

2193 lines
No EOL
95 KiB
Python

"""
Twitch Archive - Automated Twitch Stream Recording & Archiving System
This script monitors a Twitch channel and automatically:
- Records live streams as they happen
- Downloads VODs (Video on Demand) after the stream ends
- Downloads and renders chat logs
- Saves stream metadata
- Uploads everything to cloud storage (optional)
Requirements:
- Python 3.7+
- External tools: streamlink, ffmpeg, TwitchDownloaderCLI, rclone (optional)
- Configuration file: config.json (copy from config.sample.json)
- Environment file: .env (for API credentials)
"""
# Standard library imports
import os
import sys
import time
import json
import socket
import smtplib
import pathlib
import subprocess
import getopt
import signal
from typing import Dict, Optional, Any
from datetime import datetime, timedelta
# Third-party imports
import requests
from colorama import Fore, Style
from pytz import timezone
from dotenv import load_dotenv, find_dotenv
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
# ============================================================================
# CONSTANTS - Configuration defaults and magic values
# ============================================================================
# API Endpoints
TWITCH_OAUTH_URL = "https://id.twitch.tv/oauth2/token"
TWITCH_API_URL = "https://api.twitch.tv/helix"
TWITCH_GQL_URL = "https://gql.twitch.tv/gql"
TWITCH_GQL_CLIENT_ID = "kimne78kx3ncx6brgo4mv6wki5h1ko"
# File prefixes for different content types
PREFIX_LIVE = "LIVE_"
PREFIX_VOD = "VOD_"
PREFIX_CHAT = "CHAT_"
PREFIX_METADATA = "METADA_" # Note: keeping original typo for compatibility
# Default configuration values
DEFAULT_CONFIG = {
'username': 'your_twitch_username',
'quality': 'best',
'root_path': 'archive',
'rclone_path': 'remote:path/to/streams',
'refresh': 60.0,
'streamlink_ttvlol': False,
'notifications': False,
'downloadMETADATA': True,
'downloadVOD': True,
'downloadCHAT': True,
'downloadLiveCHAT': True,
'vodTimeout': 300,
'uploadCloud': True,
'deleteFiles': False,
'onlyRaw': False,
'cleanRaw': True,
'hls_segments': 3,
'hls_segmentsVOD': 10,
# FFmpeg 8.0+ Enhancement Options
'ffmpeg_hwaccel': 'auto', # Hardware acceleration: 'auto', 'nvenc', 'qsv', 'amf', 'vaapi', 'none'
'ffmpeg_threads': 0, # Thread count (0 = auto-detect)
'ffmpeg_audio_codec': 'aac', # Audio codec for audio-only streams
'ffmpeg_audio_samplerate': 48000, # Audio sample rate (48000 recommended for broadcasts)
'ffmpeg_audio_bitrate': '192k', # Audio bitrate
'ffmpeg_error_recovery': True, # Enable error recovery for corrupted streams
'ffmpeg_faststart': True, # Enable faststart for MP4 (better streaming compatibility)
'ffmpeg_progress': False # Show encoding progress
}
# ============================================================================
# MAIN CLASS
# ============================================================================
class ConfigManager:
"""
Manages global and per-streamer configurations.
Loads global defaults from config/global.json and merges with per-streamer
configs from config/streamers/*.json.
"""
def __init__(self):
"""Initialize the configuration manager."""
self.config_dir = pathlib.Path(__file__).parent / "config"
self.streamers_dir = self.config_dir / "streamers"
self.global_config = self._load_global_config()
def _load_global_config(self) -> Dict[str, Any]:
"""
Load global configuration from config/global.json.
Returns:
dict: Global configuration with defaults
"""
global_file = self.config_dir / "global.json"
# Start with DEFAULT_CONFIG as ultimate fallback
config = DEFAULT_CONFIG.copy()
# Try to load global config
if global_file.exists():
try:
with open(global_file, 'r', encoding='utf-8') as f:
user_config = json.load(f)
# Filter out comment fields and schema references
user_config = {k: v for k, v in user_config.items()
if not k.startswith('_') and k != '$schema'}
config.update(user_config)
print(f'{Fore.GREEN}✓ Global configuration loaded from config/global.json{Style.RESET_ALL}')
except json.JSONDecodeError as e:
print(f'{Fore.YELLOW}⚠ Warning: Invalid JSON in config/global.json: {e}{Style.RESET_ALL}')
print(f'{Fore.YELLOW} Using default configuration{Style.RESET_ALL}')
except Exception as e:
print(f'{Fore.YELLOW}⚠ Warning: Could not load config/global.json: {e}{Style.RESET_ALL}')
else:
print(f'{Fore.YELLOW}⚠ Warning: config/global.json not found{Style.RESET_ALL}')
print(f'{Fore.CYAN} → Create config/global.json with default settings{Style.RESET_ALL}')
return config
def load_streamer_config(self, username: str) -> Dict[str, Any]:
"""
Load configuration for a specific streamer.
Merges global config with streamer-specific overrides.
Args:
username: Twitch username
Returns:
dict: Complete configuration for the streamer
"""
# Start with global config
config = self.global_config.copy()
# Load streamer-specific config
streamer_file = self.streamers_dir / f"{username}.json"
if streamer_file.exists():
try:
with open(streamer_file, 'r', encoding='utf-8') as f:
streamer_config = json.load(f)
# Filter out comments and schema references
streamer_config = {k: v for k, v in streamer_config.items()
if not k.startswith('_') and k != '$schema'}
# Merge streamer config (overrides global)
config.update(streamer_config)
print(f'{Fore.GREEN}✓ Loaded config for {username}{Style.RESET_ALL}')
except json.JSONDecodeError as e:
print(f'{Fore.YELLOW}⚠ Warning: Invalid JSON in {streamer_file}: {e}{Style.RESET_ALL}')
except Exception as e:
print(f'{Fore.YELLOW}⚠ Warning: Could not load {streamer_file}: {e}{Style.RESET_ALL}')
else:
# Create default config for new streamer
print(f'{Fore.CYAN}→ Creating default config for new streamer: {username}{Style.RESET_ALL}')
self.create_default_streamer_config(username)
config['username'] = username
config['enabled'] = True
# Ensure username is set
config['username'] = username
return config
def create_default_streamer_config(self, username: str) -> None:
"""
Create a default configuration file for a new streamer.
Args:
username: Twitch username
"""
# Ensure streamers directory exists
self.streamers_dir.mkdir(parents=True, exist_ok=True)
streamer_file = self.streamers_dir / f"{username}.json"
default_config = {
"$schema": "./streamer.schema.json",
"username": username,
"enabled": True
}
try:
with open(streamer_file, 'w', encoding='utf-8') as f:
json.dump(default_config, f, indent=2)
print(f'{Fore.GREEN}✓ Created config file: config/streamers/{username}.json{Style.RESET_ALL}')
print(f'{Fore.CYAN} → Edit the file to add custom settings or overrides{Style.RESET_ALL}')
except Exception as e:
print(f'{Fore.RED}✗ Could not create config file for {username}: {e}{Style.RESET_ALL}')
def get_all_enabled_streamers(self) -> list:
"""
Get list of all enabled streamers.
Returns:
list: List of usernames configured and enabled
"""
if not self.streamers_dir.exists():
return []
enabled_streamers = []
for config_file in self.streamers_dir.glob("*.json"):
try:
with open(config_file, 'r', encoding='utf-8') as f:
config = json.load(f)
# Filter comments and schema references
config = {k: v for k, v in config.items()
if not k.startswith('_') and k != '$schema'}
if config.get('enabled', False):
username = config.get('username') or config_file.stem
enabled_streamers.append(username)
except Exception as e:
print(f'{Fore.YELLOW}⚠ Warning: Could not read {config_file}: {e}{Style.RESET_ALL}')
return enabled_streamers
class TwitchArchive:
"""
Main class for the Twitch Archive system.
Handles monitoring a Twitch channel, recording live streams, and downloading
VODs, chat logs, and metadata. Can optionally upload to cloud storage.
"""
def __init__(self, config: Optional[Dict[str, Any]] = None):
"""
Initialize the TwitchArchive with configuration settings.
Args:
config: Configuration dictionary. If None, loads from legacy config.json
"""
if config is None:
# Legacy mode: load from config.json
self.load_config()
else:
# New mode: use provided config
for key, value in config.items():
setattr(self, key, value)
self.os = self._detect_operating_system()
self.paths_initialized = False
self.shutdown_requested = False
self.current_process = None
self.current_stream_data = {}
def load_config(self) -> None:
"""
Load configuration from config.json file.
Falls back to default configuration if file is not found or cannot be read.
Filters out comment fields (starting with '_') from the config.
"""
config_file = os.path.join(os.path.dirname(__file__), 'config.json')
# Start with default configuration
config = DEFAULT_CONFIG.copy()
# Try to load and merge user configuration
if os.path.exists(config_file):
try:
with open(config_file, 'r', encoding='utf-8') as f:
user_config = json.load(f)
# Filter out comment fields (those starting with '_')
user_config = {k: v for k, v in user_config.items() if not k.startswith('_')}
# Merge user config with defaults (user config takes precedence)
config.update(user_config)
print(f'{Fore.GREEN}✓ Configuration loaded from config.json{Style.RESET_ALL}')
except json.JSONDecodeError as e:
print(f'{Fore.YELLOW}⚠ Warning: Invalid JSON in config.json: {e}{Style.RESET_ALL}')
print(f'{Fore.YELLOW} Using default configuration{Style.RESET_ALL}')
except Exception as e:
print(f'{Fore.YELLOW}⚠ Warning: Could not load config.json: {e}{Style.RESET_ALL}')
print(f'{Fore.YELLOW} Using default configuration{Style.RESET_ALL}')
else:
print(f'{Fore.YELLOW}⚠ Warning: config.json not found{Style.RESET_ALL}')
print(f'{Fore.CYAN} → Copy config.sample.json to config.json and edit it with your settings{Style.RESET_ALL}')
# Set all configuration values as instance attributes
for key, value in config.items():
setattr(self, key, value)
def _detect_operating_system(self) -> str:
"""
Detect the current operating system.
Returns:
str: 'windows' or 'linux'
Raises:
SystemExit: If OS is not supported
"""
if sys.platform.startswith('win32'):
return 'windows'
elif sys.platform.startswith('linux'):
return 'linux'
else:
print(f'{Fore.RED}✗ ERROR: Unsupported operating system: {sys.platform}{Style.RESET_ALL}')
print(f'{Fore.YELLOW} This script only supports Windows and Linux{Style.RESET_ALL}')
sys.exit(1)
def _initialize_paths(self) -> None:
"""
Initialize all directory paths needed for archiving.
Creates the directory structure:
- root_path/username/video/raw/ (for raw .ts files)
- root_path/username/video/ (for processed videos)
- root_path/username/chat/json/ (for chat JSON files)
- root_path/username/chat/ (for rendered chat videos)
- root_path/username/metadata/ (for stream metadata)
"""
# Convert all paths to absolute paths
self.raw_path = pathlib.Path(self.root_path, self.username, "video", "raw").absolute()
self.video_path = pathlib.Path(self.root_path, self.username, "video").absolute()
self.chatJSON_path = pathlib.Path(self.root_path, self.username, "chat", "json").absolute()
self.chatMP4_path = pathlib.Path(self.root_path, self.username, "chat").absolute()
self.metadata_path = pathlib.Path(self.root_path, self.username, "metadata").absolute()
# Create directories if they don't exist
for path in [self.raw_path, self.video_path, self.chatJSON_path,
self.chatMP4_path, self.metadata_path]:
path.mkdir(parents=True, exist_ok=True)
# Create log file if it doesn't exist
log_file = pathlib.Path(self.root_path, ".log")
if not log_file.exists():
log_file.touch()
self.paths_initialized = True
def _load_environment_variables(self) -> None:
"""
Load environment variables from .env file.
Required variables:
- CLIENT-ID: Twitch API client ID
- CLIENT-SECRET: Twitch API client secret
- OAUTH-PRIVATE-TOKEN: Optional, for accessing subscriber-only streams
- SENDER: Email address for notifications (if enabled)
- RECEIVER: Email address to receive notifications (if enabled)
- PASSWD: Email password for sending notifications (if enabled)
Raises:
SystemExit: If .env file is not found
"""
if not load_dotenv(find_dotenv()):
print(f'{Fore.RED}✗ ERROR: .env file not found{Style.RESET_ALL}')
print(f'{Fore.CYAN} → Create a .env file with your Twitch API credentials{Style.RESET_ALL}')
print(f'{Fore.CYAN} → Required: CLIENT-ID, CLIENT-SECRET{Style.RESET_ALL}')
sys.exit(1)
def _print_configuration_summary(self) -> None:
"""Print a summary of the current configuration to the console."""
print(f'\n{Fore.CYAN}{"=" * 60}{Style.RESET_ALL}')
print(f'{Fore.CYAN}TWITCH ARCHIVE - Configuration Summary{Style.RESET_ALL}')
print(f'{Fore.CYAN}{"=" * 60}{Style.RESET_ALL}\n')
# Basic settings
print(f'Streamer: {Fore.GREEN}{self.username}{Style.RESET_ALL}')
print(f'Quality: {Fore.GREEN}{self.quality}{Style.RESET_ALL}')
print(f'Storage: {Fore.GREEN}{pathlib.Path(self.root_path).resolve()}{Style.RESET_ALL}')
print(f'Refresh rate: {Fore.GREEN}{self.refresh}s{Style.RESET_ALL}\n')
# Feature toggles
self._print_toggle('Email notifications', self.notifications)
self._print_toggle('Metadata download', self.downloadMETADATA)
self._print_toggle('VOD download', self.downloadVOD)
self._print_toggle('Chat download & render', self.downloadCHAT)
self._print_toggle('Cloud upload', self.uploadCloud)
# Warning messages
if self.deleteFiles:
print(f'\n{Fore.RED}⚠ WARNING: Files will be DELETED after processing{Style.RESET_ALL}')
if not self.uploadCloud:
print(f'{Fore.RED}⚠ CRITICAL: Files will be deleted WITHOUT cloud backup!{Style.RESET_ALL}')
print(f'{Fore.YELLOW} Press CTRL+C to stop and change configuration{Style.RESET_ALL}')
else:
print(f'\n{Fore.GREEN}✓ Files will be preserved locally{Style.RESET_ALL}')
print(f'\n{Fore.CYAN}{"=" * 60}{Style.RESET_ALL}\n')
def _print_toggle(self, label: str, value: bool) -> None:
"""Helper method to print a configuration toggle in a consistent format."""
status = f'{Fore.GREEN}Enabled{Style.RESET_ALL}' if value else f'{Fore.RED}Disabled{Style.RESET_ALL}'
print(f'{label}: {status}')
def run(self) -> None:
"""
Main entry point for the application.
Initializes environment, validates configuration, creates necessary
directories, and starts the monitoring loop.
"""
# Load environment variables
self._load_environment_variables()
# Validate username
self._validate_username()
# Initialize directory structure
self._initialize_paths()
# Verify streamlink is available
self._verify_dependencies()
# Print configuration summary
self._print_configuration_summary()
# Start monitoring
print(f"Monitoring {Fore.GREEN}{self.username}{Style.RESET_ALL} every {Fore.GREEN}{self.refresh}s{Style.RESET_ALL}")
self.send_notification("TWITCH ARCHIVE STARTED",
f"Monitoring {self.username} every {self.refresh} seconds.")
# Begin the main monitoring loop
self.loopcheck()
def _get_oauth_token(self) -> str:
"""
Get OAuth token from Twitch API.
Uses CLIENT-ID and CLIENT-SECRET from environment variables.
Returns:
str: OAuth access token
Raises:
SystemExit: If authentication fails
"""
try:
url = f"{TWITCH_OAUTH_URL}?client_id={os.getenv('CLIENT-ID')}&client_secret={os.getenv('CLIENT-SECRET')}&grant_type=client_credentials"
response = requests.post(url, timeout=15)
response.raise_for_status()
return response.json()['access_token']
except requests.exceptions.RequestException as e:
print(f'{Fore.RED}✗ ERROR: Failed to authenticate with Twitch API{Style.RESET_ALL}')
print(f'{Fore.YELLOW} {str(e)}{Style.RESET_ALL}')
print(f'{Fore.CYAN} → Check your CLIENT-ID and CLIENT-SECRET in the .env file{Style.RESET_ALL}')
sys.exit(1)
except KeyError:
print(f'{Fore.RED}✗ ERROR: Invalid response from Twitch API{Style.RESET_ALL}')
print(f'{Fore.CYAN} → Verify your CLIENT-ID and CLIENT-SECRET are correct{Style.RESET_ALL}')
sys.exit(1)
def _validate_username(self) -> None:
"""
Validate that the configured Twitch username exists.
Raises:
SystemExit: If username is invalid or doesn't exist
"""
try:
url = f'{TWITCH_API_URL}/users?login={self.username}'
headers = {
"Authorization": f"Bearer {self._get_oauth_token()}",
"Client-ID": os.getenv('CLIENT-ID')
}
response = requests.get(url, headers=headers, timeout=15)
response.raise_for_status()
data = response.json()
if not data.get('data'):
print(f'{Fore.RED}✗ ERROR: Twitch user "{self.username}" not found{Style.RESET_ALL}')
print(f'{Fore.CYAN} → Check the username in your config.json file{Style.RESET_ALL}')
sys.exit(1)
print(f'{Fore.GREEN}✓ Username "{self.username}" validated{Style.RESET_ALL}')
except requests.exceptions.RequestException as e:
print(f'{Fore.RED}✗ ERROR: Could not validate username{Style.RESET_ALL}')
print(f'{Fore.YELLOW} {str(e)}{Style.RESET_ALL}')
sys.exit(1)
def _verify_dependencies(self) -> None:
"""
Verify that required external dependencies are available.
Raises:
SystemExit: If required dependencies are not found
"""
# Check for streamlink
try:
result = subprocess.run(['streamlink', '--version'],
capture_output=True,
text=True,
timeout=5)
if result.returncode == 0:
version = result.stdout.strip().split()[1] if len(result.stdout.split()) > 1 else 'unknown'
print(f'{Fore.GREEN}✓ Streamlink v{version} found{Style.RESET_ALL}')
else:
raise FileNotFoundError()
except (FileNotFoundError, subprocess.TimeoutExpired, IndexError):
print(f'{Fore.RED}✗ ERROR: Streamlink not found{Style.RESET_ALL}')
print(f'{Fore.CYAN} → Install streamlink: pip install streamlink{Style.RESET_ALL}')
print(f'{Fore.CYAN} → Or download from: https://streamlink.github.io/{Style.RESET_ALL}')
sys.exit(1)
# Check for ffmpeg
try:
ffmpeg_path = self._get_ffmpeg_executable()
if os.path.exists(ffmpeg_path):
print(f'{Fore.GREEN}✓ FFmpeg found at {ffmpeg_path}{Style.RESET_ALL}')
else:
print(f'{Fore.YELLOW}⚠ Warning: FFmpeg not found at {ffmpeg_path}{Style.RESET_ALL}')
print(f'{Fore.YELLOW} → Download FFmpeg and place it in the bin/ folder{Style.RESET_ALL}')
except Exception as e:
print(f'{Fore.YELLOW}⚠ Warning: Could not verify FFmpeg: {e}{Style.RESET_ALL}')
# Check for TwitchDownloaderCLI (if VOD or Chat download enabled)
if self.downloadVOD or self.downloadCHAT:
try:
downloader_path = self._get_twitch_downloader_executable()
if os.path.exists(downloader_path):
print(f'{Fore.GREEN}✓ TwitchDownloaderCLI found{Style.RESET_ALL}')
else:
print(f'{Fore.YELLOW}⚠ Warning: TwitchDownloaderCLI not found at {downloader_path}{Style.RESET_ALL}')
print(f'{Fore.YELLOW} → Download from: https://github.com/lay295/TwitchDownloader/releases{Style.RESET_ALL}')
except Exception as e:
print(f'{Fore.YELLOW}⚠ Warning: Could not verify TwitchDownloaderCLI: {e}{Style.RESET_ALL}')
def _check_stream_status(self) -> Optional[Dict[str, Any]]:
"""
Check if the configured user is currently live.
Returns:
dict: Stream information if live, None if offline
Raises:
SystemExit: If API request fails
"""
query = f'query{{user(login: "{self.username}") {{stream{{archiveVideo{{id}}title createdAt}}}}}}'
try:
response = requests.post(
TWITCH_GQL_URL,
json={'query': query},
headers={"Client-ID": TWITCH_GQL_CLIENT_ID},
timeout=15
)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
print(f'{Fore.RED}✗ ERROR: Failed to check stream status{Style.RESET_ALL}')
print(f'{Fore.YELLOW} {str(e)}{Style.RESET_ALL}')
sys.exit(1)
def _get_latest_vod(self) -> Optional[Dict[str, Any]]:
"""
Get the most recent VOD for the configured user.
Returns:
dict: VOD information, or None if no VODs found
"""
query = f'query {{user(login: "{self.username}") {{videos(first: 1) {{edges {{node {{id title description recordedAt lengthSeconds animatedPreviewURL previewThumbnailURL(height: 1280, width: 720) thumbnailURLs(height: 1280, width: 720)}}}}}}}}}}'
try:
response = requests.post(
TWITCH_GQL_URL,
json={'query': query},
headers={"Client-ID": TWITCH_GQL_CLIENT_ID},
timeout=15
)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
print(f'{Fore.YELLOW}⚠ Warning: Could not fetch latest VOD{Style.RESET_ALL}')
print(f'{Fore.YELLOW} {str(e)}{Style.RESET_ALL}')
return None
def _get_unique_filename(self, filepath: str) -> str:
"""
Generate a unique filename by appending a counter if file already exists.
Args:
filepath: The desired file path
Returns:
str: A unique file path (original or with _N suffix)
Example:
If 'video.mp4' exists, returns 'video_1.mp4'
If 'video_1.mp4' also exists, returns 'video_2.mp4'
"""
if not os.path.exists(filepath):
return filepath
# Split into components
directory = os.path.dirname(filepath)
filename = os.path.basename(filepath)
name, ext = os.path.splitext(filename)
# Find next available counter
counter = 1
while True:
new_filepath = os.path.join(directory, f"{name}_{counter}{ext}")
if not os.path.exists(new_filepath):
return new_filepath
counter += 1
def send_notification(self, subject: str, content: str) -> None:
"""
Send email notification via Gmail SMTP.
Only sends if notifications are enabled in configuration.
Requires SENDER, RECEIVER, and PASSWD in .env file.
Args:
subject: Email subject line
content: Email body content
"""
if not self.notifications:
return
try:
sender = os.getenv("SENDER")
receiver = os.getenv("RECEIVER")
password = os.getenv("PASSWD")
if not all([sender, receiver, password]):
print(f'{Fore.YELLOW}⚠ Notification skipped: Missing email credentials in .env{Style.RESET_ALL}')
return
# Construct email
msg = MIMEMultipart()
msg['From'] = sender
msg['To'] = receiver
msg['Subject'] = f"{self.username} - {subject}"
body = f"Stream: {self.username}\n\n{content}"
msg.attach(MIMEText(body, 'plain'))
# Send via Gmail SMTP
with smtplib.SMTP('smtp.gmail.com', 587) as server:
server.starttls()
server.login(sender, password)
server.sendmail(sender, receiver, msg.as_string())
except socket.error as e:
print(f'{Fore.YELLOW}⚠ Notification failed: {str(e)}{Style.RESET_ALL}')
except Exception as e:
print(f'{Fore.YELLOW}⚠ Notification error: {str(e)}{Style.RESET_ALL}')
def _is_stream_already_processed(self, stream_id: str) -> bool:
"""
Check if a stream has already been processed.
Args:
stream_id: Unique identifier for the stream
Returns:
bool: True if already processed, False otherwise
"""
log_file = pathlib.Path(self.root_path, ".log")
with open(log_file, 'r', encoding='utf-8') as f:
return stream_id in f.read()
def _mark_stream_as_processed(self, stream_id: str) -> None:
"""Add stream to log file to prevent re-processing."""
log_file = pathlib.Path(self.root_path, ".log")
with open(log_file, 'a', encoding='utf-8') as f:
f.write(f"{stream_id}\n")
def _get_bin_path(self) -> str:
"""Get the path to the bin directory containing external tools."""
return str(pathlib.Path(__file__).parent.resolve() / "bin")
def _get_ffmpeg_executable(self) -> str:
"""Get the platform-specific ffmpeg executable path."""
bin_path = self._get_bin_path()
if self.os == 'windows':
return os.path.join(bin_path, 'ffmpeg.exe')
return os.path.join(bin_path, 'ffmpeg')
def _get_twitch_downloader_executable(self) -> str:
"""Get the platform-specific TwitchDownloaderCLI executable path."""
bin_path = self._get_bin_path()
if self.os == 'windows':
return os.path.join(bin_path, 'TwitchDownloaderCLI.exe')
return os.path.join(bin_path, 'TwitchDownloaderCLI')
def _detect_hardware_acceleration(self) -> Optional[str]:
"""
Detect available hardware acceleration based on config and system.
Returns:
str: Hardware acceleration type ('nvenc', 'qsv', 'amf', 'vaapi', 'none') or None
"""
hwaccel_config = getattr(self, 'ffmpeg_hwaccel', 'auto')
# If user explicitly set to 'none', disable hardware acceleration
if hwaccel_config == 'none':
return 'none'
# If user specified a particular type, use it
if hwaccel_config in ['nvenc', 'qsv', 'amf', 'vaapi']:
return hwaccel_config
# Auto-detect: try to determine available hardware
if hwaccel_config == 'auto':
# On Windows, NVIDIA is most common
if self.os == 'windows':
# Could check for nvidia-smi, but just return 'auto' for ffmpeg to decide
return 'auto'
else:
# On Linux, VAAPI is common for Intel/AMD, or NVENC for NVIDIA
# Let ffmpeg auto-detect
return 'auto'
return None
def _record_livestream(self, stream_info: Dict[str, Any], output_path: str) -> bool:
"""
Record a live Twitch stream using streamlink.
Args:
stream_info: Stream metadata from Twitch API
output_path: Path where the raw .ts file will be saved
Returns:
bool: True if recording completed normally, False if interrupted
"""
print(f'\n{Fore.CYAN}{"=" * 60}{Style.RESET_ALL}')
print(f'{Fore.GREEN}🔴 STREAM STARTED: {stream_info["title"]}{Style.RESET_ALL}')
print(f'{Fore.CYAN}{"=" * 60}{Style.RESET_ALL}\n')
# Build streamlink command
cmd = [
'streamlink',
f'twitch.tv/{self.username}',
self.quality,
'--hls-live-restart',
'--retry-streams', str(int(self.refresh)),
'--force',
'-o', output_path
]
# Add segment threads for faster downloads (requires streamlink 5.0+)
# This allows multiple segments to be downloaded in parallel
if self.hls_segments > 1:
cmd.extend(['--stream-segment-threads', str(self.hls_segments)])
# Add ad-blocking if enabled (Note: twitch-proxy-playlist was removed in newer streamlink versions)
# For ad-blocking, you may need to use alternative methods like --twitch-low-latency
# or rely on Twitch's own ad-free viewing for subscribers
if self.streamlink_ttvlol:
# The old --twitch-proxy-playlist option has been removed from streamlink
# Consider using alternative ad-blocking approaches or updating your method
print(f'{Fore.YELLOW}⚠ Warning: ttv-lol proxy option is deprecated in newer streamlink versions{Style.RESET_ALL}')
print(f'{Fore.YELLOW} Consider disabling streamlink_ttvlol in config or using alternative methods{Style.RESET_ALL}')
# Add authentication if available
oauth_token = os.getenv("OAUTH-PRIVATE-TOKEN", "")
if oauth_token and oauth_token != "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx":
cmd.extend(['--twitch-api-header', f'Authorization=OAuth {oauth_token}'])
# Show command being executed (hide OAuth token for security)
cmd_display = [c if 'OAuth' not in str(c) else 'Authorization=OAuth [HIDDEN]' for c in cmd]
print(f'{Fore.CYAN}Command: {" ".join(cmd_display)}{Style.RESET_ALL}')
# Record the stream (this blocks until stream ends)
print(f'{Fore.YELLOW}Recording stream...{Style.RESET_ALL}')
try:
self.current_process = subprocess.Popen(cmd)
return_code = self.current_process.wait()
self.current_process = None
if self.shutdown_requested:
print(f'{Fore.YELLOW}✓ Recording stopped by user{Style.RESET_ALL}')
# Return True so processing continues - we still want to process what was recorded
return True
print(f'{Fore.GREEN}✓ Stream recording complete{Style.RESET_ALL}')
return True
except Exception as e:
self.current_process = None
print(f'{Fore.RED}✗ Recording error: {str(e)}{Style.RESET_ALL}')
return False
def _process_raw_stream(self, raw_path: str, output_path: str) -> None:
"""
Process raw .ts file into mp4/mp3 using ffmpeg.
Args:
raw_path: Path to the raw .ts file
output_path: Path for the processed output file
"""
if not os.path.exists(raw_path):
print(f'{Fore.YELLOW}⚠ Raw file not found, skipping processing{Style.RESET_ALL}')
return
if self.onlyRaw:
print(f'{Fore.CYAN}Keeping raw .ts file (onlyRaw mode){Style.RESET_ALL}')
return
print(f'{Fore.YELLOW}Processing raw stream file...{Style.RESET_ALL}')
# Build ffmpeg command based on quality
if self.quality == 'audio_only':
# Audio-only conversion with modern AAC encoding
cmd = [
self._get_ffmpeg_executable(),
'-i', raw_path,
'-vn', # No video
'-c:a', self.ffmpeg_audio_codec, # Audio codec (AAC recommended)
'-ar', str(self.ffmpeg_audio_samplerate), # Audio sample rate
'-ac', '2', # Audio channels (stereo)
'-b:a', self.ffmpeg_audio_bitrate, # Audio bitrate
]
# Add threading for faster encoding
if self.ffmpeg_threads > 0:
cmd.extend(['-threads', str(self.ffmpeg_threads)])
# Add faststart for better streaming compatibility (MP4/M4A)
if self.ffmpeg_faststart and output_path.endswith(('.mp4', '.m4a')):
cmd.extend(['-movflags', '+faststart'])
cmd.append(output_path)
else:
# Video conversion with hardware acceleration support
cmd = [
self._get_ffmpeg_executable(),
'-y', # Overwrite output file
]
# Add hardware acceleration if enabled
hwaccel_type = self._detect_hardware_acceleration()
if hwaccel_type and hwaccel_type != 'none':
print(f'{Fore.CYAN}Using hardware acceleration: {hwaccel_type}{Style.RESET_ALL}')
cmd.extend(['-hwaccel', 'auto'])
cmd.extend([
'-i', raw_path,
'-analyzeduration', '2147483647',
'-probesize', '2147483647',
])
# Threading support
if self.ffmpeg_threads >= 0:
cmd.extend(['-threads', str(self.ffmpeg_threads)])
# Error recovery options for corrupted streams
if self.ffmpeg_error_recovery:
cmd.extend([
'-fflags', '+genpts', # Generate missing timestamps
'-avoid_negative_ts', 'make_zero', # Handle timestamp issues
'-err_detect', 'ignore_err' # More tolerant of errors
])
# Stream copy (fast, no re-encoding)
cmd.extend([
'-c:v', 'copy', # Copy video codec
'-c:a', 'copy', # Copy audio codec
'-start_at_zero',
'-copyts',
])
# Add faststart for MP4 files
if self.ffmpeg_faststart and output_path.endswith('.mp4'):
cmd.extend(['-movflags', '+faststart'])
cmd.append(output_path)
# Run ffmpeg with optional progress output
if self.ffmpeg_progress:
subprocess.call(cmd)
else:
subprocess.call(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT)
print(f'{Fore.GREEN}✓ Stream processed successfully{Style.RESET_ALL}')
def _download_vod(self, vod_info: Dict[str, Any], output_path: str) -> bool:
"""
Download VOD using TwitchDownloaderCLI.
Args:
vod_info: VOD metadata from Twitch API
output_path: Path where the VOD will be saved
Returns:
bool: True if download succeeded, False otherwise
"""
if not self.downloadVOD:
return False
print(f'\n{Fore.CYAN}Downloading VOD: {vod_info["title"]}{Style.RESET_ALL}')
# Extract numeric VOD ID (TwitchDownloaderCLI expects just the number)
vod_id = vod_info["id"]
# Remove 'v' prefix if present (API sometimes returns "v123456789")
if isinstance(vod_id, str) and vod_id.startswith('v'):
vod_id = vod_id[1:]
# Build URL format that TwitchDownloaderCLI accepts
vod_url = f"https://www.twitch.tv/videos/{vod_id}"
print(f'{Fore.YELLOW}VOD URL: {vod_url}{Style.RESET_ALL}')
bin_path = self._get_bin_path()
cmd = [
self._get_twitch_downloader_executable(),
'videodownload',
'-u', vod_url,
'-q', self.quality,
'-t', str(self.hls_segmentsVOD),
'--ffmpeg-path', self._get_ffmpeg_executable(),
'--temp-path', os.path.join(bin_path, 'temp'),
'--collision', 'Rename',
'-o', output_path
]
try:
result = subprocess.call(cmd)
if result == 0:
print(f'{Fore.GREEN}✓ VOD downloaded{Style.RESET_ALL}')
return True
else:
print(f'{Fore.RED}✗ VOD download failed with exit code: {result}{Style.RESET_ALL}')
return False
except Exception as e:
print(f'{Fore.RED}✗ VOD download failed: {str(e)}{Style.RESET_ALL}')
self.send_notification('VOD Download Error', f'Failed to download VOD: {str(e)}')
return False
def _download_and_render_chat(self, vod_info: Dict[str, Any], json_path: str, video_path: str) -> bool:
"""
Download chat logs and render them as video.
Args:
vod_info: VOD metadata from Twitch API
json_path: Path to save chat JSON
video_path: Path to save rendered chat video
Returns:
bool: True if succeeded, False otherwise
"""
if not self.downloadCHAT:
return False
print(f'\n{Fore.CYAN}Downloading chat: {vod_info["title"]}{Style.RESET_ALL}')
# Extract numeric VOD ID
vod_id = vod_info["id"]
if isinstance(vod_id, str) and vod_id.startswith('v'):
vod_id = vod_id[1:]
bin_path = self._get_bin_path()
downloader = self._get_twitch_downloader_executable()
# Chat rendering settings
chat_settings = [
'--background-color', '#FF111111',
'-w', '500',
'-h', '1080',
'--outline',
'-f', 'Arial',
'--font-size', '22',
'--update-rate', '1.0',
'--offline',
'--ffmpeg-path', self._get_ffmpeg_executable(),
'--temp-path', os.path.join(bin_path, 'temp'),
'--collision', 'Rename'
]
try:
# Download chat JSON
print(f'{Fore.YELLOW}Downloading chat JSON for VOD {vod_id}...{Style.RESET_ALL}')
result = subprocess.call([
downloader, 'chatdownload',
'--id', vod_id,
'--embed-images',
'--collision', 'Rename',
'-o', json_path
])
if result != 0:
print(f'{Fore.RED}✗ Chat JSON download failed with exit code: {result}{Style.RESET_ALL}')
return False
# Verify JSON file was created
if not os.path.exists(json_path):
print(f'{Fore.RED}✗ Chat JSON file was not created{Style.RESET_ALL}')
return False
print(f'{Fore.GREEN}✓ Chat JSON downloaded{Style.RESET_ALL}')
# Render chat video
print(f'{Fore.YELLOW}Rendering chat video...{Style.RESET_ALL}')
result = subprocess.call([
downloader, 'chatrender',
'-i', json_path,
'-o', video_path
] + chat_settings)
if result != 0:
print(f'{Fore.RED}✗ Chat render failed with exit code: {result}{Style.RESET_ALL}')
return False
print(f'{Fore.GREEN}✓ Chat rendered{Style.RESET_ALL}')
return True
except Exception as e:
print(f'{Fore.RED}✗ Chat processing failed: {str(e)}{Style.RESET_ALL}')
self.send_notification('Chat Download Error',
f'Failed to download/render chat: {str(e)}')
return False
def _download_live_chat(self, vod_id: str, json_path: str) -> Optional[subprocess.Popen]:
"""
Start downloading live chat in the background while stream is recording.
Args:
vod_id: The VOD/stream ID to download chat from
json_path: Path to save chat JSON
Returns:
subprocess.Popen: The process handle, or None if failed to start
"""
if not self.downloadLiveCHAT:
return None
print(f'\n{Fore.CYAN}Starting live chat download...{Style.RESET_ALL}')
# Remove 'v' prefix if present
if isinstance(vod_id, str) and vod_id.startswith('v'):
vod_id = vod_id[1:]
downloader = self._get_twitch_downloader_executable()
try:
# Start chat download as background process
cmd = [
downloader, 'chatdownload',
'--id', vod_id,
'--embed-images',
'--collision', 'Rename',
'-o', json_path
]
print(f'{Fore.YELLOW}Live chat download started in background for VOD {vod_id}{Style.RESET_ALL}')
process = subprocess.Popen(
cmd,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL
)
return process
except Exception as e:
print(f'{Fore.RED}✗ Failed to start live chat download: {str(e)}{Style.RESET_ALL}')
return None
def _wait_for_chat_download(self, process: Optional[subprocess.Popen], json_path: str) -> bool:
"""
Download chat logs and render them as video.
Args:
vod_info: VOD metadata from Twitch API
json_path: Path to save chat JSON
video_path: Path to save rendered chat video
Returns:
bool: True if succeeded, False otherwise
"""
if not self.downloadCHAT:
return False
print(f'\n{Fore.CYAN}Downloading chat: {vod_info["title"]}{Style.RESET_ALL}')
# Extract numeric VOD ID
vod_id = vod_info["id"]
if isinstance(vod_id, str) and vod_id.startswith('v'):
vod_id = vod_id[1:]
bin_path = self._get_bin_path()
downloader = self._get_twitch_downloader_executable()
# Chat rendering settings
chat_settings = [
'--background-color', '#FF111111',
'-w', '500',
'-h', '1080',
'--outline',
'-f', 'Arial',
'--font-size', '22',
'--update-rate', '1.0',
'--offline',
'--ffmpeg-path', self._get_ffmpeg_executable(),
'--temp-path', os.path.join(bin_path, 'temp'),
'--collision', 'Rename'
]
try:
# Download chat JSON
print(f'{Fore.YELLOW}Downloading chat JSON for VOD {vod_id}...{Style.RESET_ALL}')
result = subprocess.call([
downloader, 'chatdownload',
'--id', vod_id,
'--embed-images',
'--collision', 'Rename',
'-o', json_path
])
if result != 0:
print(f'{Fore.RED}✗ Chat JSON download failed with exit code: {result}{Style.RESET_ALL}')
return False
# Verify JSON file was created
if not os.path.exists(json_path):
print(f'{Fore.RED}✗ Chat JSON file was not created{Style.RESET_ALL}')
return False
print(f'{Fore.GREEN}✓ Chat JSON downloaded{Style.RESET_ALL}')
# Render chat video
print(f'{Fore.YELLOW}Rendering chat video...{Style.RESET_ALL}')
result = subprocess.call([
downloader, 'chatrender',
'-i', json_path,
'-o', video_path
] + chat_settings)
if result != 0:
print(f'{Fore.RED}✗ Chat render failed with exit code: {result}{Style.RESET_ALL}')
return False
print(f'{Fore.GREEN}✓ Chat rendered{Style.RESET_ALL}')
return True
except Exception as e:
print(f'{Fore.RED}✗ Chat processing failed: {str(e)}{Style.RESET_ALL}')
self.send_notification('Chat Download Error',
f'Failed to download/render chat: {str(e)}')
return False
def _wait_for_chat_download(self, process: Optional[subprocess.Popen], json_path: str) -> bool:
"""
Wait for live chat download process to complete.
Args:
process: The chat download process handle
json_path: Path where chat JSON should be saved
Returns:
bool: True if chat download succeeded, False otherwise
"""
if process is None:
return False
try:
print(f'{Fore.YELLOW}Waiting for live chat download to complete...{Style.RESET_ALL}')
return_code = process.wait(timeout=300) # 5 minute timeout
if return_code == 0 and os.path.exists(json_path):
print(f'{Fore.GREEN}✓ Live chat JSON downloaded{Style.RESET_ALL}')
return True
else:
print(f'{Fore.RED}✗ Live chat download failed (exit code: {return_code}){Style.RESET_ALL}')
return False
except subprocess.TimeoutExpired:
print(f'{Fore.YELLOW}⚠ Live chat download timed out, terminating...{Style.RESET_ALL}')
process.terminate()
return False
except Exception as e:
print(f'{Fore.RED}✗ Error waiting for chat download: {str(e)}{Style.RESET_ALL}')
return False
def _render_chat(self, json_path: str, video_path: str) -> bool:
"""
Render chat JSON as a video.
Args:
json_path: Path to chat JSON file
video_path: Path to save rendered chat video
Returns:
bool: True if succeeded, False otherwise
"""
if not os.path.exists(json_path):
print(f'{Fore.RED}✗ Chat JSON file not found: {json_path}{Style.RESET_ALL}')
return False
bin_path = self._get_bin_path()
downloader = self._get_twitch_downloader_executable()
# Chat rendering settings
chat_settings = [
'--background-color', '#FF111111',
'-w', '500',
'-h', '1080',
'--outline',
'-f', 'Arial',
'--font-size', '22',
'--update-rate', '1.0',
'--offline',
'--ffmpeg-path', self._get_ffmpeg_executable(),
'--temp-path', os.path.join(bin_path, 'temp'),
'--collision', 'Rename'
]
try:
print(f'{Fore.YELLOW}Rendering chat video...{Style.RESET_ALL}')
result = subprocess.call([
downloader, 'chatrender',
'-i', json_path,
'-o', video_path
] + chat_settings)
if result != 0:
print(f'{Fore.RED}✗ Chat render failed with exit code: {result}{Style.RESET_ALL}')
return False
print(f'{Fore.GREEN}✓ Chat rendered{Style.RESET_ALL}')
return True
except Exception as e:
print(f'{Fore.RED}✗ Chat rendering failed: {str(e)}{Style.RESET_ALL}')
return False
def _save_metadata(self, vod_info: Dict[str, Any], filename_base: str) -> None:
"""
Save VOD metadata to JSON file.
Args:
vod_info: VOD metadata from Twitch API
filename_base: Base filename (without extension)
"""
if not self.downloadMETADATA:
return
metadata_path = os.path.join(self.metadata_path, f"{PREFIX_METADATA}{filename_base}.json")
with open(metadata_path, 'w', encoding='utf-8') as f:
json.dump(vod_info, f, ensure_ascii=False, indent=4)
print(f'{Fore.GREEN}✓ Metadata saved{Style.RESET_ALL}')
def _signal_handler(self, signum, frame):
"""Handle interrupt signals gracefully."""
if not self.shutdown_requested:
print(f'\n{Fore.YELLOW}{"=" * 60}{Style.RESET_ALL}')
print(f'{Fore.YELLOW}⚠ Shutdown requested. Stopping downloads and finalizing...{Style.RESET_ALL}')
print(f'{Fore.YELLOW}{"=" * 60}{Style.RESET_ALL}\n')
self.shutdown_requested = True
# Stop current subprocess if running
if self.current_process:
try:
self.current_process.terminate()
print(f'{Fore.YELLOW}Stopping current download process...{Style.RESET_ALL}')
except Exception:
pass
def _interruptible_sleep(self, seconds: float) -> bool:
"""
Sleep for the specified duration, but check for shutdown periodically.
Args:
seconds: Number of seconds to sleep
Returns:
bool: True if sleep completed, False if interrupted by shutdown
"""
start_time = time.time()
while time.time() - start_time < seconds:
if self.shutdown_requested:
return False
time.sleep(min(1.0, seconds - (time.time() - start_time)))
return True
def loopcheck(self) -> None:
"""
Main monitoring loop.
Continuously checks if the streamer is live, and when they are:
1. Records the live stream
2. Downloads the VOD
3. Downloads and renders chat
4. Uploads everything to cloud storage (if enabled)
5. Optionally deletes local files after upload
"""
# Set up signal handlers for graceful shutdown
signal.signal(signal.SIGINT, self._signal_handler)
# SIGTERM is not available on Windows, handle gracefully
if hasattr(signal, 'SIGTERM'):
signal.signal(signal.SIGTERM, self._signal_handler)
while not self.shutdown_requested:
try:
# Check stream status
response = self._check_stream_status()
is_live = response['data']['user']['stream']
# Stream is offline
if is_live is None:
print(f'{Fore.CYAN}{self.username} is offline. Checking again in {self.refresh}s...{Style.RESET_ALL}', end='\r')
if self.shutdown_requested:
break
self._interruptible_sleep(self.refresh)
continue
# Stream is live but not ready yet
if not is_live.get('title'):
print(f'{Fore.YELLOW}⚠ Stream detected but no title yet. Waiting...{Style.RESET_ALL}')
if self.shutdown_requested:
break
self._interruptible_sleep(self.refresh)
continue
# Stream is live and ready!
print(f'\n{Fore.GREEN}{self.username} is LIVE!{Style.RESET_ALL}')
print(f'{Fore.CYAN}Title: {is_live["title"]}{Style.RESET_ALL}')
# Create unique stream identifier based on stream start time
stream_id = f"{is_live['createdAt']} - {self.username} - {is_live['title']}"
# Parse stream start time
live_date = datetime.strptime(
is_live["createdAt"], '%Y-%m-%dT%H:%M:%SZ'
).replace(tzinfo=timezone('UTC')).astimezone(tz=None).replace(tzinfo=None)
# Use CURRENT time for filename to ensure each recording is unique
# This allows recording a live stream multiple times (e.g., if script restarts)
current_time = datetime.now()
filename_base = current_time.strftime('%Y%m%d_%Hh%Mm%Ss')
# Check if we've already recorded this stream session
if self._is_stream_already_processed(stream_id):
print(f'{Fore.YELLOW}⚠ Stream was previously recorded, but it\'s still live!{Style.RESET_ALL}')
print(f'{Fore.GREEN}✓ Starting new recording with timestamp: {filename_base}{Style.RESET_ALL}')
else:
# First time seeing this stream - mark it
self._mark_stream_as_processed(stream_id)
print(f'{Fore.GREEN}✓ New stream detected - starting recording{Style.RESET_ALL}')
# Determine file paths
live_raw_path = os.path.join(self.raw_path, f"{PREFIX_LIVE}{filename_base}.ts")
live_proc_ext = '.mp3' if self.quality == 'audio_only' else '.mp4'
live_proc_path = os.path.join(self.video_path, f"{PREFIX_LIVE}{filename_base}{live_proc_ext}")
# Ensure unique filenames
live_raw_path = self._get_unique_filename(live_raw_path)
live_proc_path = self._get_unique_filename(live_proc_path)
filename_base = os.path.splitext(os.path.basename(live_raw_path))[0].replace(PREFIX_LIVE, "")
print(f'{Fore.CYAN}Output path: {live_raw_path}{Style.RESET_ALL}')
# Send notification
self.send_notification(f'🔴 Stream Started - {filename_base}',
f'Title: {is_live["title"]}')
# Store current stream data for potential graceful shutdown
self.current_stream_data = {
'filename_base': filename_base,
'live_raw_path': live_raw_path,
'live_proc_path': live_proc_path
}
# Start live chat download if enabled and VOD ID is available
live_chat_process = None
chat_json_path = os.path.join(self.chatJSON_path, f"{PREFIX_CHAT}{filename_base}.json")
if self.downloadLiveCHAT and is_live.get('archiveVideo') and is_live['archiveVideo'].get('id'):
live_vod_id = is_live['archiveVideo']['id']
print(f'{Fore.CYAN}Live VOD ID detected: {live_vod_id}{Style.RESET_ALL}')
live_chat_process = self._download_live_chat(live_vod_id, chat_json_path)
elif self.downloadLiveCHAT:
print(f'{Fore.YELLOW}⚠ No VOD ID available yet for live chat download{Style.RESET_ALL}')
# Record the live stream
recording_completed = self._record_livestream(is_live, live_raw_path)
# If shutdown was requested during recording, try to finalize
if self.shutdown_requested:
print(f'{Fore.YELLOW}Attempting to process any recorded content...{Style.RESET_ALL}')
# Process the raw stream file
self._process_raw_stream(live_raw_path, live_proc_path)
# Wait for live chat download if it was started
live_chat_downloaded = False
if live_chat_process is not None:
live_chat_downloaded = self._wait_for_chat_download(live_chat_process, chat_json_path)
# Render live chat if downloaded successfully
if live_chat_downloaded:
chat_video_path = os.path.join(self.chatMP4_path, f"{PREFIX_CHAT}{filename_base}.mp4")
self._render_chat(chat_json_path, chat_video_path)
# Skip VOD/chat download if shutdown was requested or vodTimeout is 0
vod_response = None
if self.shutdown_requested:
print(f'{Fore.YELLOW}Skipping VOD and chat download due to shutdown request{Style.RESET_ALL}')
elif self.vodTimeout == 0:
print(f'{Fore.CYAN}VOD check disabled (vodTimeout=0). Skipping VOD download.{Style.RESET_ALL}')
else:
# Try to match stream with VOD (with timeout)
print(f'{Fore.CYAN}Waiting for VOD to become available (timeout: {self.vodTimeout}s)...{Style.RESET_ALL}')
vod_found = False
vod_wait_start = time.time()
while time.time() - vod_wait_start < self.vodTimeout and not self.shutdown_requested:
vod_response = self._get_latest_vod()
if vod_response and vod_response['data']['user']['videos']['edges']:
current_vod = vod_response['data']['user']['videos']['edges'][0]['node']
vod_date = datetime.strptime(
current_vod["recordedAt"], '%Y-%m-%dT%H:%M:%SZ'
).replace(tzinfo=timezone('UTC')).astimezone(tz=None).replace(tzinfo=None)
# Check if VOD matches the stream (within 1 minute tolerance)
time_tolerance = timedelta(minutes=1)
if (live_date - time_tolerance) <= vod_date <= (live_date + time_tolerance):
vod_found = True
break
# Wait before checking again
if not vod_found:
print(f'{Fore.CYAN}VOD not found yet, waiting...{Style.RESET_ALL}', end='\r')
if not self._interruptible_sleep(min(10, self.vodTimeout - (time.time() - vod_wait_start))):
break
if not vod_found:
if self.shutdown_requested:
print(f'\n{Fore.YELLOW}VOD check interrupted by shutdown{Style.RESET_ALL}')
else:
print(f'\n{Fore.YELLOW}⚠ VOD not found after {self.vodTimeout}s - streamer may have VODs disabled{Style.RESET_ALL}')
print(f'{Fore.CYAN} → Live recording and chat (if enabled) were saved successfully{Style.RESET_ALL}')
vod_response = None
if not self.shutdown_requested and vod_response and vod_response['data']['user']['videos']['edges']:
current_vod = vod_response['data']['user']['videos']['edges'][0]['node']
vod_date = datetime.strptime(
current_vod["recordedAt"], '%Y-%m-%dT%H:%M:%SZ'
).replace(tzinfo=timezone('UTC')).astimezone(tz=None).replace(tzinfo=None)
# Check if VOD matches the stream (within 1 minute tolerance)
time_tolerance = timedelta(minutes=1)
if (live_date - time_tolerance) <= vod_date <= (live_date + time_tolerance):
print(f'\n{Fore.GREEN}✓ Found matching VOD{Style.RESET_ALL}')
# Save metadata
self._save_metadata(current_vod, filename_base)
# Download VOD
vod_ext = '.mp3' if self.quality == 'audio_only' else '.mp4'
vod_path = os.path.join(self.video_path, f"{PREFIX_VOD}{filename_base}{vod_ext}")
self._download_vod(current_vod, vod_path)
# Download and render chat from VOD (if not already done via live chat)
if not live_chat_downloaded:
chat_video_path = os.path.join(self.chatMP4_path, f"{PREFIX_CHAT}{filename_base}.mp4")
self._download_and_render_chat(current_vod, chat_json_path, chat_video_path)
else:
print(f'{Fore.CYAN}Chat already downloaded from live stream, skipping VOD chat download{Style.RESET_ALL}')
else:
print(f'{Fore.YELLOW}⚠ No matching VOD found for this stream{Style.RESET_ALL}')
# Clean up raw files if configured
if self.cleanRaw and os.path.exists(live_raw_path):
print(f'{Fore.YELLOW}Deleting raw .ts file...{Style.RESET_ALL}')
os.remove(live_raw_path)
# Upload to cloud if configured
upload_success = self._upload_to_cloud(filename_base)
# Delete local files if configured and upload succeeded
if self.deleteFiles and upload_success:
self._delete_local_files(filename_base, live_raw_path, live_proc_path)
# Done processing this stream
if self.shutdown_requested:
print(f'\n{Fore.YELLOW}{"=" * 60}{Style.RESET_ALL}')
print(f'{Fore.YELLOW}✓ Stream processing stopped by user{Style.RESET_ALL}')
print(f'{Fore.YELLOW}{"=" * 60}{Style.RESET_ALL}\n')
break
else:
print(f'\n{Fore.GREEN}{"=" * 60}{Style.RESET_ALL}')
print(f'{Fore.GREEN}✓ Stream processing complete{Style.RESET_ALL}')
print(f'{Fore.GREEN}{"=" * 60}{Style.RESET_ALL}\n')
self.send_notification(f'✓ Complete - {filename_base}',
'Stream processing finished. Resuming monitoring.')
self._interruptible_sleep(self.refresh)
except KeyboardInterrupt:
# Additional catch for any other KeyboardInterrupt not handled by signal
if not self.shutdown_requested:
self.shutdown_requested = True
print(f'\n{Fore.YELLOW}{"=" * 60}{Style.RESET_ALL}')
print(f'{Fore.YELLOW}⚠ Interrupted. Cleaning up...{Style.RESET_ALL}')
print(f'{Fore.YELLOW}{"=" * 60}{Style.RESET_ALL}\n')
break
except Exception as e:
print(f'\n{Fore.RED}{"=" * 60}{Style.RESET_ALL}')
print(f'{Fore.RED}✗ ERROR: {str(e)}{Style.RESET_ALL}')
print(f'{Fore.YELLOW}Waiting {self.refresh} seconds before retrying...{Style.RESET_ALL}')
print(f'{Fore.RED}{"=" * 60}{Style.RESET_ALL}\n')
self.send_notification('⚠ Error - Recovery',
f'Error: {str(e)}\nRetrying after {self.refresh} seconds.')
# Check for shutdown during sleep
if self.shutdown_requested:
break
self._interruptible_sleep(self.refresh)
# Final cleanup message
print(f'{Fore.GREEN}✓ Monitoring stopped cleanly{Style.RESET_ALL}')
def _upload_to_cloud(self, filename_base: str) -> bool:
"""
Upload archived files to cloud storage using rclone.
Args:
filename_base: Base filename (without prefixes/extensions)
Returns:
bool: True if upload succeeded or is disabled, False if failed
"""
if not self.uploadCloud:
return True # Consider upload "successful" if disabled
print(f'\n{Fore.CYAN}Uploading to cloud storage...{Style.RESET_ALL}')
self.send_notification(f'☁ Uploading - {filename_base}', 'Uploading files to cloud storage')
# Create list of files to upload
bin_path = self._get_bin_path()
upload_list_path = os.path.join(bin_path, 'temp', 'upload.txt')
# Ensure temp directory exists
os.makedirs(os.path.dirname(upload_list_path), exist_ok=True)
files_to_upload = [
f"{PREFIX_LIVE}{filename_base}.ts",
f"{PREFIX_LIVE}{filename_base}.mp4",
f"{PREFIX_LIVE}{filename_base}.mp3",
f"{PREFIX_VOD}{filename_base}.ts",
f"{PREFIX_VOD}{filename_base}.mp4",
f"{PREFIX_VOD}{filename_base}.mp3",
f"{PREFIX_METADATA}{filename_base}.json",
f"{PREFIX_CHAT}{filename_base}.json",
f"{PREFIX_CHAT}{filename_base}.mp4"
]
with open(upload_list_path, 'w') as f:
f.write('\n'.join(files_to_upload))
# Run rclone
try:
result = subprocess.call([
'rclone', 'copy',
str(pathlib.Path(self.root_path).resolve()),
self.rclone_path,
'--include-from', upload_list_path
])
# Clean up upload list
if os.path.exists(upload_list_path):
os.remove(upload_list_path)
if result == 0:
print(f'{Fore.GREEN}✓ Upload complete{Style.RESET_ALL}')
self.send_notification(f'✓ Upload Success - {filename_base}',
'All files uploaded successfully')
return True
else:
print(f'{Fore.RED}✗ Upload failed (exit code: {result}){Style.RESET_ALL}')
print(f'{Fore.YELLOW}Files preserved locally due to upload failure{Style.RESET_ALL}')
self.send_notification(f'✗ Upload Failed - {filename_base}',
f'Upload failed with code {result}. Files preserved locally.')
return False
except Exception as e:
print(f'{Fore.RED}✗ Upload error: {str(e)}{Style.RESET_ALL}')
return False
def _delete_local_files(self, filename_base: str, live_raw_path: str, live_proc_path: str) -> None:
"""
Delete local archive files after successful upload.
Args:
filename_base: Base filename (without prefixes/extensions)
live_raw_path: Path to live raw file
live_proc_path: Path to live processed file
"""
print(f'\n{Fore.RED}{"=" * 60}{Style.RESET_ALL}')
print(f'{Fore.RED}⚠ DELETING LOCAL FILES{Style.RESET_ALL}')
print(f'{Fore.RED}{"=" * 60}{Style.RESET_ALL}\n')
self.send_notification(f'🗑 Deleting - {filename_base}',
'Deleting local files after successful upload')
files_to_delete = []
# Live files
if not self.cleanRaw and os.path.exists(live_raw_path):
files_to_delete.append(live_raw_path)
if os.path.exists(live_proc_path):
files_to_delete.append(live_proc_path)
# VOD files
if self.downloadVOD:
vod_raw = os.path.join(self.raw_path, f"{PREFIX_VOD}{filename_base}.ts")
vod_mp4 = os.path.join(self.video_path, f"{PREFIX_VOD}{filename_base}.mp4")
vod_mp3 = os.path.join(self.video_path, f"{PREFIX_VOD}{filename_base}.mp3")
if not self.cleanRaw and os.path.exists(vod_raw):
files_to_delete.append(vod_raw)
if os.path.exists(vod_mp4):
files_to_delete.append(vod_mp4)
if os.path.exists(vod_mp3):
files_to_delete.append(vod_mp3)
# Chat files
if self.downloadCHAT:
chat_json = os.path.join(self.chatJSON_path, f"{PREFIX_CHAT}{filename_base}.json")
chat_mp4 = os.path.join(self.chatMP4_path, f"{PREFIX_CHAT}{filename_base}.mp4")
if os.path.exists(chat_json):
files_to_delete.append(chat_json)
if os.path.exists(chat_mp4):
files_to_delete.append(chat_mp4)
# Metadata files
if self.downloadMETADATA:
metadata = os.path.join(self.metadata_path, f"{PREFIX_METADATA}{filename_base}.json")
if os.path.exists(metadata):
files_to_delete.append(metadata)
# Delete all files
for filepath in files_to_delete:
try:
print(f'{Fore.RED} Deleting: {os.path.basename(filepath)}{Style.RESET_ALL}')
os.remove(filepath)
except Exception as e:
print(f'{Fore.YELLOW} ⚠ Failed to delete {filepath}: {e}{Style.RESET_ALL}')
print(f'{Fore.RED}\n✓ Cleanup complete{Style.RESET_ALL}')
# ============================================================================
# MULTI-STREAMER MANAGER
# ============================================================================
class TwitchArchiveManager:
"""
Manages multiple TwitchArchive instances for monitoring multiple streamers.
"""
def __init__(self, specific_streamer: Optional[str] = None, verbose: bool = False):
"""
Initialize the manager.
Args:
specific_streamer: If provided, only monitor this streamer (ignore enabled status)
verbose: Enable verbose debug output
"""
self.config_manager = ConfigManager()
self.specific_streamer = specific_streamer
self.verbose = verbose
self.archivers: Dict[str, TwitchArchive] = {}
self.shutdown_requested = False
self.active_recordings: Dict[str, str] = {} # Track active recordings: {username: stream_id}
# Setup signal handlers
signal.signal(signal.SIGTERM, self._signal_handler)
signal.signal(signal.SIGINT, self._signal_handler)
def _signal_handler(self, signum, frame):
"""Handle shutdown signals gracefully."""
print(f'\n{Fore.YELLOW}⚠ Shutdown signal received...{Style.RESET_ALL}')
self.shutdown_requested = True
# Signal all archivers to shut down
for archiver in self.archivers.values():
archiver.shutdown_requested = True
def _get_streamers_to_monitor(self) -> list:
"""
Get list of streamers to monitor.
Returns:
list: List of streamer usernames to monitor
"""
if self.specific_streamer:
# Monitor only the specified streamer (ignore enabled flag)
return [self.specific_streamer]
else:
# Monitor all enabled streamers
return self.config_manager.get_all_enabled_streamers()
def _initialize_archiver(self, username: str) -> TwitchArchive:
"""
Initialize a TwitchArchive instance for a streamer.
Args:
username: Twitch username
Returns:
TwitchArchive: Initialized archiver instance
"""
config = self.config_manager.load_streamer_config(username)
archiver = TwitchArchive(config)
return archiver
def run(self) -> None:
"""
Main entry point for multi-streamer monitoring.
Monitors all enabled streamers (or a specific one if provided).
"""
print(f'\n{Fore.CYAN}{"=" * 70}{Style.RESET_ALL}')
print(f'{Fore.CYAN}TWITCH ARCHIVE - Multi-Streamer Mode{Style.RESET_ALL}')
print(f'{Fore.CYAN}{"=" * 70}{Style.RESET_ALL}\n')
# Get streamers to monitor
streamers = self._get_streamers_to_monitor()
if not streamers:
print(f'{Fore.RED}✗ No streamers configured or enabled{Style.RESET_ALL}')
print(f'{Fore.CYAN}→ Create config files in config/streamers/{Style.RESET_ALL}')
print(f'{Fore.CYAN}→ Or run with -u <username> to create a new config{Style.RESET_ALL}')
sys.exit(1)
print(f'{Fore.GREEN}Monitoring {len(streamers)} streamer(s):{Style.RESET_ALL}')
for streamer in streamers:
print(f'{Fore.CYAN}{streamer}{Style.RESET_ALL}')
print()
# Initialize archivers for all streamers
for username in streamers:
try:
archiver = self._initialize_archiver(username)
# Load environment and validate
archiver._load_environment_variables()
archiver._validate_username()
archiver._initialize_paths()
self.archivers[username] = archiver
print(f'{Fore.GREEN}✓ Initialized {username}{Style.RESET_ALL}')
except Exception as e:
print(f'{Fore.RED}✗ Failed to initialize {username}: {e}{Style.RESET_ALL}')
if not self.archivers:
print(f'{Fore.RED}✗ No archivers could be initialized{Style.RESET_ALL}')
sys.exit(1)
# Verify dependencies once (shared across all streamers)
print(f'\n{Fore.CYAN}Verifying dependencies...{Style.RESET_ALL}')
first_archiver = next(iter(self.archivers.values()))
first_archiver._verify_dependencies()
# Print configuration summary for each streamer
for username, archiver in self.archivers.items():
archiver._print_configuration_summary()
print(f'\n{Fore.GREEN}🚀 Starting monitoring loop...{Style.RESET_ALL}\n')
# Start monitoring loop
self._monitoring_loop()
def _monitoring_loop(self) -> None:
"""
Main monitoring loop for all streamers.
Checks each streamer's status and processes streams as needed.
"""
last_check = {}
last_status_print = time.time()
while not self.shutdown_requested:
current_time = time.time()
# Print periodic status every 60 seconds
if current_time - last_status_print >= 60:
status_line = " | ".join([f"{username}: checking" for username in self.archivers.keys()])
print(f'{Fore.CYAN}[Status] {status_line}{Style.RESET_ALL}')
last_status_print = current_time
for username, archiver in self.archivers.items():
# Check if enough time has passed since last check for this streamer
if username not in last_check or (current_time - last_check[username]) >= archiver.refresh:
last_check[username] = current_time
# Check stream status
try:
response = archiver._check_stream_status()
# Debug: Print the full response (if verbose)
if self.verbose:
print(f'\n{Fore.MAGENTA}[DEBUG {username}] API Response: {response}{Style.RESET_ALL}')
stream_data = response['data']['user']['stream'] if response else None
if self.verbose:
print(f'{Fore.MAGENTA}[DEBUG {username}] Stream data: {stream_data}{Style.RESET_ALL}')
if stream_data:
# Stream is live - check if it has required data
if stream_data.get('archiveVideo') and stream_data['archiveVideo'].get('id'):
# Create composite stream ID like single-streamer mode
# This prevents duplicate recordings in the same session
stream_id = f"{stream_data['createdAt']} - {username} - {stream_data.get('title', 'Untitled')}"
if self.verbose:
print(f'{Fore.MAGENTA}[DEBUG {username}] VOD ID: {stream_data["archiveVideo"]["id"]}{Style.RESET_ALL}')
print(f'{Fore.MAGENTA}[DEBUG {username}] Composite Stream ID: {stream_id}{Style.RESET_ALL}')
# Check if we're currently recording this stream
currently_recording = username in self.active_recordings and self.active_recordings[username] == stream_id
if self.verbose:
print(f'{Fore.MAGENTA}[DEBUG {username}] Currently recording: {currently_recording}{Style.RESET_ALL}')
print(f'{Fore.MAGENTA}[DEBUG {username}] Active recordings: {self.active_recordings}{Style.RESET_ALL}')
# Record if not currently recording (ignore .log file - always record if live)
if not currently_recording:
print(f'\n{Fore.GREEN}[{username}] Stream detected!{Style.RESET_ALL}')
print(f'{Fore.CYAN}Title: {stream_data.get("title", "No title")}{Style.RESET_ALL}')
print(f'{Fore.CYAN}Started at: {stream_data["createdAt"]}{Style.RESET_ALL}')
# Mark as currently recording
self.active_recordings[username] = stream_id
# Process the stream (this blocks until stream ends)
self._process_stream(archiver, stream_data, stream_id)
# Mark as processed in log (for record keeping)
archiver._mark_stream_as_processed(stream_id)
# Remove from active recordings
if username in self.active_recordings:
del self.active_recordings[username]
else:
if self.verbose:
print(f'{Fore.CYAN}[{username}] Currently recording this stream, skipping duplicate...{Style.RESET_ALL}')
else:
# Stream is live but VOD ID not available yet
print(f'{Fore.YELLOW}[{username}] Stream is live but VOD ID not ready yet (title: {stream_data.get("title", "No title")}){Style.RESET_ALL}')
else:
# Not live
if self.verbose:
print(f'{Fore.CYAN}[{username}] Offline - checking again in {archiver.refresh}s{Style.RESET_ALL}', end='\r')
except Exception as e:
print(f'{Fore.RED}[{username}] Error checking stream: {e}{Style.RESET_ALL}')
import traceback
traceback.print_exc()
# Sleep briefly before next iteration
time.sleep(1)
def _process_stream(self, archiver: TwitchArchive, stream_info: Dict[str, Any], stream_id: str) -> None:
"""
Process a detected stream for a specific archiver.
Args:
archiver: The TwitchArchive instance
stream_info: Stream information from API
stream_id: Unique stream ID
"""
# Store stream data
archiver.current_stream_data = {
'stream_id': stream_id,
'title': stream_info['title'],
'started_at': stream_info['createdAt']
}
# Generate timestamp and filename
timestamp = datetime.now(timezone('UTC')).strftime("%Y%m%d_%Hh%Mm%Ss")
filename_base = f"{PREFIX_LIVE}{archiver.username}_{timestamp}"
# Parse stream start time
live_date = datetime.strptime(
stream_info["createdAt"], '%Y-%m-%dT%H:%M:%SZ'
).replace(tzinfo=timezone('UTC')).astimezone(tz=None).replace(tzinfo=None)
# Define paths
raw_extension = '.ts'
proc_extension = '.mp3' if archiver.quality == 'audio_only' else '.mp4'
live_raw_path = str(archiver.raw_path / f"{filename_base}{raw_extension}")
live_proc_path = str(archiver.video_path / f"{filename_base}{proc_extension}")
chat_json_path = str(archiver.chatJSON_path / f"{PREFIX_CHAT}{filename_base}.json")
# Send notification
archiver.send_notification(
f"Stream Started - {archiver.username}",
f"Recording: {stream_info['title']}"
)
# Start live chat download if enabled and VOD ID is available
live_chat_process = None
if archiver.downloadLiveCHAT and stream_info.get('archiveVideo') and stream_info['archiveVideo'].get('id'):
live_vod_id = stream_info['archiveVideo']['id']
print(f'{Fore.CYAN}Live VOD ID detected: {live_vod_id}{Style.RESET_ALL}')
live_chat_process = archiver._download_live_chat(live_vod_id, chat_json_path)
elif archiver.downloadLiveCHAT:
print(f'{Fore.YELLOW}⚠ No VOD ID available yet for live chat download{Style.RESET_ALL}')
# Record livestream
recording_successful = archiver._record_livestream(stream_info, live_raw_path)
# Check if raw file exists (may exist even after interrupted recording)
if not os.path.exists(live_raw_path):
print(f'{Fore.RED}✗ No recording file found, skipping processing{Style.RESET_ALL}')
return
# Get file size to check if anything was recorded
file_size = os.path.getsize(live_raw_path)
if file_size < 1024: # Less than 1KB means essentially nothing was recorded
print(f'{Fore.RED}✗ Recording file too small ({file_size} bytes), skipping processing{Style.RESET_ALL}')
return
print(f'{Fore.CYAN}Processing recorded content ({file_size / (1024*1024):.2f} MB)...{Style.RESET_ALL}')
# Process raw stream
if not archiver.onlyRaw:
archiver._process_raw_stream(live_raw_path, live_proc_path)
# Wait for live chat download if it was started
live_chat_downloaded = False
if live_chat_process is not None:
live_chat_downloaded = archiver._wait_for_chat_download(live_chat_process, chat_json_path)
# Render live chat if downloaded successfully
if live_chat_downloaded:
chat_video_path = str(archiver.chatMP4_path / f"{PREFIX_CHAT}{filename_base}.mp4")
archiver._render_chat(chat_json_path, chat_video_path)
# Wait for VOD and download it
vod_response = None
if archiver.vodTimeout == 0:
print(f'{Fore.CYAN}VOD check disabled (vodTimeout=0). Skipping VOD download.{Style.RESET_ALL}')
elif archiver.shutdown_requested:
print(f'{Fore.YELLOW}Skipping VOD download due to shutdown request{Style.RESET_ALL}')
else:
# Try to match stream with VOD (with timeout)
print(f'{Fore.CYAN}Waiting for VOD to become available (timeout: {archiver.vodTimeout}s)...{Style.RESET_ALL}')
vod_found = False
vod_wait_start = time.time()
while time.time() - vod_wait_start < archiver.vodTimeout:
# Check for shutdown request
if archiver.shutdown_requested:
print(f'\n{Fore.YELLOW}VOD check interrupted by shutdown{Style.RESET_ALL}')
break
vod_response = archiver._get_latest_vod()
if vod_response and vod_response['data']['user']['videos']['edges']:
current_vod = vod_response['data']['user']['videos']['edges'][0]['node']
vod_date = datetime.strptime(
current_vod["recordedAt"], '%Y-%m-%dT%H:%M:%SZ'
).replace(tzinfo=timezone('UTC')).astimezone(tz=None).replace(tzinfo=None)
# Check if VOD matches the stream (within 1 minute tolerance)
time_tolerance = timedelta(minutes=1)
if (live_date - time_tolerance) <= vod_date <= (live_date + time_tolerance):
vod_found = True
break
# Wait before checking again
if not vod_found:
print(f'{Fore.CYAN}VOD not found yet, waiting...{Style.RESET_ALL}', end='\r')
time.sleep(min(10, archiver.vodTimeout - (time.time() - vod_wait_start)))
if not vod_found:
print(f'\n{Fore.YELLOW}⚠ VOD not found after {archiver.vodTimeout}s - streamer may have VODs disabled{Style.RESET_ALL}')
print(f'{Fore.CYAN} → Live recording and chat (if enabled) were saved successfully{Style.RESET_ALL}')
vod_response = None
# Process VOD if found
if vod_response and vod_response['data']['user']['videos']['edges']:
current_vod = vod_response['data']['user']['videos']['edges'][0]['node']
vod_date = datetime.strptime(
current_vod["recordedAt"], '%Y-%m-%dT%H:%M:%SZ'
).replace(tzinfo=timezone('UTC')).astimezone(tz=None).replace(tzinfo=None)
# Check if VOD matches the stream (within 1 minute tolerance)
time_tolerance = timedelta(minutes=1)
if (live_date - time_tolerance) <= vod_date <= (live_date + time_tolerance):
print(f'\n{Fore.GREEN}✓ Found matching VOD{Style.RESET_ALL}')
# Save metadata
if archiver.downloadMETADATA:
archiver._save_metadata(current_vod, filename_base)
# Download VOD
if archiver.downloadVOD:
vod_ext = '.mp3' if archiver.quality == 'audio_only' else '.mp4'
vod_path = str(archiver.video_path / f"{PREFIX_VOD}{filename_base}{vod_ext}")
archiver._download_vod(current_vod, vod_path)
# Download and render chat from VOD (if not already done via live chat)
if archiver.downloadCHAT and not live_chat_downloaded:
chat_video_path = str(archiver.chatMP4_path / f"{PREFIX_CHAT}{filename_base}.mp4")
archiver._download_and_render_chat(current_vod, chat_json_path, chat_video_path)
elif live_chat_downloaded:
print(f'{Fore.CYAN}Chat already downloaded from live stream, skipping VOD chat download{Style.RESET_ALL}')
else:
print(f'{Fore.YELLOW}⚠ No matching VOD found for this stream{Style.RESET_ALL}')
elif archiver.downloadMETADATA:
# Save what metadata we have from the live stream
archiver._save_metadata(stream_info, filename_base)
# Clean up raw file if configured
if archiver.cleanRaw and os.path.exists(live_raw_path):
print(f'{Fore.YELLOW}Deleting raw .ts file...{Style.RESET_ALL}')
os.remove(live_raw_path)
# Upload to cloud if configured
upload_success = False
if archiver.uploadCloud:
upload_success = archiver._upload_to_cloud(filename_base)
# Delete files if configured
if archiver.deleteFiles and upload_success:
archiver._delete_local_files(filename_base, live_raw_path, live_proc_path)
# Send completion notification
archiver.send_notification(
f"Stream Archived - {archiver.username}",
f"Completed: {stream_info['title']}"
)
# ============================================================================
# COMMAND-LINE INTERFACE
# ============================================================================
def main(argv: list) -> None:
"""
Main entry point for command-line execution.
Parses command-line arguments and starts the archive system.
Args:
argv: Command-line arguments
"""
specific_streamer = None
use_legacy_mode = False
help_msg = f'''
{Fore.CYAN}{"=" * 70}
TWITCH ARCHIVE - Automated Stream Recording & Archiving
{"=" * 70}{Style.RESET_ALL}
{Fore.GREEN}USAGE:{Style.RESET_ALL}
python twitch-archive.py [OPTIONS]
{Fore.GREEN}MODES:{Style.RESET_ALL}
• Multi-Streamer Mode (default):
Monitor all enabled streamers from config/streamers/*.json
• Single-Streamer Mode:
Use -u <username> to monitor only one streamer
• Legacy Mode:
Uses config.json if it exists (deprecated)
{Fore.GREEN}OPTIONS:{Style.RESET_ALL}
-h, --help Display this help information
-u, --username <name> Monitor only this Twitch channel
--verbose Enable verbose debug output
--legacy Force legacy mode (use config.json)
{Fore.GREEN}LEGACY OPTIONS (when using --legacy):{Style.RESET_ALL}
-q, --quality <qual> Stream quality: best/source, high/720p,
medium/480p, low/360p, audio_only
-a, --ttv-lol <0|1> Enable ad-blocking (1) or disable (0)
-v, --vod <0|1> Download VODs after stream ends
-c, --chat <0|1> Download and render chat
-m, --metadata <0|1> Download stream metadata
-r, --upload <0|1> Upload to cloud storage via rclone
-d, --delete <0|1> Delete local files after upload (CAREFUL!)
-n, --notifications <0|1> Send email notifications
{Fore.YELLOW}TIPS:{Style.RESET_ALL}
• Create config/global.json for default settings
• Create config/streamers/<username>.json for each streamer
• Set enabled: true/false in each streamer config
• Set up API credentials in .env file
{Fore.CYAN}EXAMPLES:{Style.RESET_ALL}
python twitch-archive.py # Monitor all enabled streamers
python twitch-archive.py -u vinesauce # Monitor only vinesauce
python twitch-archive.py -u hackerling --verbose # Monitor with debug output
python twitch-archive.py --legacy # Use old config.json mode
{Fore.CYAN}{"=" * 70}{Style.RESET_ALL}
'''
try:
opts, args = getopt.getopt(
argv,
"h:u:q:a:v:c:m:r:d:n:",
["help", "username=", "quality=", "ttv-lol=", "vod=", "chat=",
"metadata=", "upload=", "delete=", "notifications=", "legacy", "verbose"]
)
except getopt.GetoptError as e:
print(f'{Fore.RED}Error: {e}{Style.RESET_ALL}\n')
print(help_msg)
sys.exit(2)
# Check if legacy mode is requested or if config.json exists (fallback)
legacy_config_exists = os.path.exists(os.path.join(os.path.dirname(__file__), 'config.json'))
# Parse command line args
legacy_overrides = {}
verbose_mode = False
for opt, arg in opts:
if opt in ('-h', '--help'):
print(help_msg)
sys.exit(0)
elif opt in ("-u", "--username"):
specific_streamer = arg
elif opt == "--verbose":
verbose_mode = True
elif opt == "--legacy":
use_legacy_mode = True
# Legacy options (only used in legacy mode)
elif opt in ("-q", "--quality"):
legacy_overrides['quality'] = arg
elif opt in ("-a", "--ttv-lol"):
legacy_overrides['streamlink_ttvlol'] = bool(int(arg))
elif opt in ("-v", "--vod"):
legacy_overrides['downloadVOD'] = bool(int(arg))
elif opt in ("-c", "--chat"):
legacy_overrides['downloadCHAT'] = bool(int(arg))
elif opt in ("-m", "--metadata"):
legacy_overrides['downloadMETADATA'] = bool(int(arg))
elif opt in ("-r", "--upload"):
legacy_overrides['uploadCloud'] = bool(int(arg))
elif opt in ("-d", "--delete"):
legacy_overrides['deleteFiles'] = bool(int(arg))
elif opt in ("-n", "--notifications"):
legacy_overrides['notifications'] = bool(int(arg))
# Determine which mode to use
if use_legacy_mode or (legacy_config_exists and not specific_streamer and not os.path.exists('config/global.json')):
# Legacy mode: single streamer using config.json
print(f'{Fore.YELLOW}⚠ Using legacy mode (config.json){Style.RESET_ALL}')
print(f'{Fore.CYAN}→ Consider migrating to new config structure (config/global.json + config/streamers/*.json){Style.RESET_ALL}\n')
twitch_archive = TwitchArchive() # Loads from config.json
# Apply command-line overrides
for key, value in legacy_overrides.items():
setattr(twitch_archive, key, value)
# Start the archive system
twitch_archive.run()
else:
# New multi-streamer mode
manager = TwitchArchiveManager(specific_streamer=specific_streamer, verbose=verbose_mode)
manager.run()
if __name__ == "__main__":
try:
main(sys.argv[1:])
except KeyboardInterrupt:
# Suppress stack trace for clean exit
print(f'\n{Fore.GREEN}✓ Graceful shutdown complete{Style.RESET_ALL}')
sys.exit(0)