""" Twitch Archive - Automated Twitch Stream Recording & Archiving System This script monitors a Twitch channel and automatically: - Records live streams as they happen - Downloads VODs (Video on Demand) after the stream ends - Downloads and renders chat logs - Saves stream metadata - Uploads everything to cloud storage (optional) Requirements: - Python 3.7+ - External tools: streamlink, ffmpeg, TwitchDownloaderCLI, rclone (optional) - Configuration file: config.json (copy from config.sample.json) - Environment file: .env (for API credentials) Refactored Version 2.0: This version has been split into multiple modules for better maintainability: - modules/constants.py: Constants and default configuration - modules/config.py: Configuration management - modules/notifications.py: Email notifications - modules/utils.py: Utility functions - modules/stream_monitor.py: Stream monitoring and API - modules/recorder.py: Live stream recording - modules/processor.py: Video/audio processing - modules/downloader.py: VOD and chat downloading - modules/file_manager.py: File and cloud management """ # Standard library imports import os import sys import time import json import signal import getopt import pathlib import subprocess from typing import Dict, Optional, Any from datetime import datetime, timedelta # Third-party imports from colorama import Fore, Style from pytz import timezone from dotenv import load_dotenv, find_dotenv # Local module imports from modules.constants import DEFAULT_CONFIG, PREFIX_LIVE, PREFIX_VOD, PREFIX_CHAT, PREFIX_MERGED, PREFIX_METADATA from modules.config import ConfigManager from modules.notifications import NotificationManager from modules.utils import ( detect_operating_system, get_ffmpeg_executable, get_twitch_downloader_executable, get_unique_filename, get_video_duration, verify_streamlink, verify_ffmpeg, verify_twitch_downloader, verify_rclone, get_env_value ) from modules.stream_monitor import StreamMonitor from modules.recorder import StreamRecorder from modules.processor import StreamProcessor from modules.downloader import ContentDownloader from modules.file_manager import FileManager class TwitchArchive: """ Main class for the Twitch Archive system. Handles monitoring a Twitch channel, recording live streams, and downloading VODs, chat logs, and metadata. Can optionally upload to cloud storage. Refactored Version 2.0: This class now delegates most functionality to specialized modules for better code organization. """ def __init__(self, config: Optional[Dict[str, Any]] = None): """ Initialize the TwitchArchive with configuration settings. Args: config: Configuration dictionary. If None, loads from legacy config.json """ if config is None: # Legacy mode: load from config.json self.load_config() else: # New mode: use provided config for key, value in config.items(): setattr(self, key, value) # Initialize system components self.os_type = detect_operating_system() self.shutdown_requested = False self.current_stream_data = {} # Initialize component modules (created during run()) self.stream_monitor = None self.notification_manager = None self.file_manager = None self.recorder = None self.processor = None self.downloader = None def load_config(self) -> None: """ Load configuration from config.json file (legacy support). Falls back to default configuration if file is not found or cannot be read. Filters out comment fields (starting with '_') from the config. """ config_file = os.path.join(os.path.dirname(__file__), 'config.json') # Start with default configuration config = DEFAULT_CONFIG.copy() # Try to load and merge user configuration if os.path.exists(config_file): try: with open(config_file, 'r', encoding='utf-8') as f: user_config = json.load(f) # Filter out comment fields (those starting with '_') user_config = {k: v for k, v in user_config.items() if not k.startswith('_')} # Merge user config with defaults (user config takes precedence) config.update(user_config) print(f'{Fore.GREEN}✓ Configuration loaded from config.json{Style.RESET_ALL}') except json.JSONDecodeError as e: print(f'{Fore.YELLOW}⚠ Warning: Invalid JSON in config.json: {e}{Style.RESET_ALL}') print(f'{Fore.YELLOW} Using default configuration{Style.RESET_ALL}') except Exception as e: print(f'{Fore.YELLOW}⚠ Warning: Could not load config.json: {e}{Style.RESET_ALL}') print(f'{Fore.YELLOW} Using default configuration{Style.RESET_ALL}') else: print(f'{Fore.YELLOW}⚠ Warning: config.json not found{Style.RESET_ALL}') print(f'{Fore.CYAN} → Copy config.sample.json to config.json and edit it with your settings{Style.RESET_ALL}') # Set all configuration values as instance attributes for key, value in config.items(): setattr(self, key, value) def _load_environment_variables(self) -> None: """ Load environment variables from process environment or an optional .env file. Required variables: - CLIENT-ID: Twitch API client ID - CLIENT-SECRET: Twitch API client secret - OAUTH-PRIVATE-TOKEN: Optional, for accessing subscriber-only streams - SENDER: Email address for notifications (if enabled) - RECEIVER: Email address to receive notifications (if enabled) - PASSWD: Email password for sending notifications (if enabled) Raises: SystemExit: If required Twitch API credentials are unavailable """ has_required_env = bool( get_env_value('CLIENT-ID', 'CLIENT_ID') and get_env_value('CLIENT-SECRET', 'CLIENT_SECRET') ) if has_required_env: print(f'{Fore.GREEN}✓ Twitch API credentials loaded from process environment{Style.RESET_ALL}') return dotenv_loaded = load_dotenv(find_dotenv()) has_required_env = bool( get_env_value('CLIENT-ID', 'CLIENT_ID') and get_env_value('CLIENT-SECRET', 'CLIENT_SECRET') ) if has_required_env: if dotenv_loaded: print(f'{Fore.GREEN}✓ Twitch API credentials loaded from .env file{Style.RESET_ALL}') return if not dotenv_loaded and not has_required_env: print(f'{Fore.RED}✗ ERROR: .env file not found{Style.RESET_ALL}') print(f'{Fore.CYAN} → Create a .env file with your Twitch API credentials or pass them via environment variables{Style.RESET_ALL}') print(f'{Fore.CYAN} → Required: CLIENT-ID/CLIENT_ID and CLIENT-SECRET/CLIENT_SECRET{Style.RESET_ALL}') sys.exit(1) if not has_required_env: print(f'{Fore.RED}✗ ERROR: Twitch API credentials are missing{Style.RESET_ALL}') print(f'{Fore.CYAN} → Required: CLIENT-ID/CLIENT_ID and CLIENT-SECRET/CLIENT_SECRET{Style.RESET_ALL}') sys.exit(1) def _initialize_components(self) -> None: """Initialize all component modules.""" # Stream monitoring self.stream_monitor = StreamMonitor(self.username) # Notifications self.notification_manager = NotificationManager( enabled=self.notifications, username=self.username ) # File management self.file_manager = FileManager( root_path=self.root_path, username=self.username, config=vars(self) ) self.file_manager.initialize_directories() # Recording self.recorder = StreamRecorder( username=self.username, quality=self.quality, refresh=self.refresh, hls_segments=self.hls_segments, streamlink_ttvlol=self.streamlink_ttvlol, shutdown_callback=lambda: self.shutdown_requested ) # Processing ffmpeg_path = get_ffmpeg_executable(self.os_type) self.processor = StreamProcessor( os_type=self.os_type, ffmpeg_path=ffmpeg_path, config=vars(self) ) # Downloading twitch_downloader_path = get_twitch_downloader_executable(self.os_type) self.downloader = ContentDownloader( twitch_downloader_path=twitch_downloader_path, ffmpeg_path=ffmpeg_path, config=vars(self) ) def _print_configuration_summary(self) -> None: """Print a summary of the current configuration to the console.""" print(f'\n{Fore.CYAN}{"=" * 60}{Style.RESET_ALL}') print(f'{Fore.CYAN}TWITCH ARCHIVE - Configuration Summary{Style.RESET_ALL}') print(f'{Fore.CYAN}{"=" * 60}{Style.RESET_ALL}\n') # Basic settings print(f'Streamer: {Fore.GREEN}{self.username}{Style.RESET_ALL}') print(f'Quality: {Fore.GREEN}{self.quality}{Style.RESET_ALL}') print(f'Storage: {Fore.GREEN}{os.path.abspath(self.root_path)}{Style.RESET_ALL}') print(f'Refresh rate: {Fore.GREEN}{self.refresh}s{Style.RESET_ALL}\n') # Feature toggles self._print_toggle('Email notifications', self.notifications) self._print_toggle('Metadata download', self.downloadMETADATA) self._print_toggle('VOD download', self.downloadVOD) self._print_toggle('Chat download & render', self.downloadCHAT) if self.downloadCHAT: self._print_toggle(' ↳ Merge video + chat', self.mergeVideoChat) if self.mergeVideoChat: print(f' Layout: {Fore.GREEN}{self.mergeChatLayout}{Style.RESET_ALL}') self._print_toggle('Cloud upload', self.uploadCloud) # Warning messages if self.deleteFiles: print(f'\n{Fore.RED}⚠ WARNING: Files will be DELETED after processing{Style.RESET_ALL}') if not self.uploadCloud: print(f'{Fore.RED}⚠ CRITICAL: Files will be deleted WITHOUT cloud backup!{Style.RESET_ALL}') print(f'{Fore.YELLOW} Press CTRL+C to stop and change configuration{Style.RESET_ALL}') else: print(f'\n{Fore.GREEN}✓ Files will be preserved locally{Style.RESET_ALL}') print(f'\n{Fore.CYAN}{"=" * 60}{Style.RESET_ALL}\n') def _print_toggle(self, label: str, value: bool) -> None: """Helper method to print a configuration toggle in a consistent format.""" status = f'{Fore.GREEN}Enabled{Style.RESET_ALL}' if value else f'{Fore.RED}Disabled{Style.RESET_ALL}' print(f'{label}: {status}') def run(self) -> None: """ Main entry point for the application. Initializes environment, validates configuration, creates necessary directories, and starts the monitoring loop. """ # Load environment variables self._load_environment_variables() # Initialize all component modules self._initialize_components() # Validate username self.stream_monitor.validate_username() # Verify dependencies if not verify_streamlink(): sys.exit(1) verify_ffmpeg(self.os_type) if self.downloadVOD or self.downloadCHAT: verify_twitch_downloader(self.os_type) if self.uploadCloud and not verify_rclone(): sys.exit(1) # Print configuration summary self._print_configuration_summary() # Start monitoring print(f"Monitoring {Fore.GREEN}{self.username}{Style.RESET_ALL} every {Fore.GREEN}{self.refresh}s{Style.RESET_ALL}") self.notification_manager.send("TWITCH ARCHIVE STARTED", f"Monitoring {self.username} every {self.refresh} seconds.") # Begin the main monitoring loop self.loopcheck() def _interruptible_sleep(self, seconds: float) -> bool: """ Sleep for the specified duration, but check for shutdown periodically. Args: seconds: Number of seconds to sleep Returns: bool: True if sleep completed, False if interrupted by shutdown """ start_time = time.time() while time.time() - start_time < seconds: if self.shutdown_requested: return False time.sleep(min(1.0, seconds - (time.time() - start_time))) return True def _signal_handler(self, signum, frame): """Handle interrupt signals gracefully.""" if not self.shutdown_requested: print(f'\n{Fore.YELLOW}{"=" * 60}{Style.RESET_ALL}') print(f'{Fore.YELLOW}⚠ Shutdown requested. Stopping downloads and finalizing...{Style.RESET_ALL}') print(f'{Fore.YELLOW}{"=" * 60}{Style.RESET_ALL}\n') self.shutdown_requested = True # Stop current subprocess if running if self.recorder: self.recorder.stop() def loopcheck(self) -> None: """ Main monitoring loop. Continuously checks if the streamer is live, and when they are: 1. Records the live stream 2. Downloads the VOD 3. Downloads and renders chat 4. Uploads everything to cloud storage (if enabled) 5. Optionally deletes local files after upload """ # Set up signal handlers for graceful shutdown signal.signal(signal.SIGINT, self._signal_handler) if hasattr(signal, 'SIGTERM'): signal.signal(signal.SIGTERM, self._signal_handler) while not self.shutdown_requested: try: # Check stream status using StreamMonitor response = self.stream_monitor.check_stream_status() is_live = response['data']['user']['stream'] # Stream is offline if is_live is None: print(f'{Fore.CYAN}⏳ {self.username} is offline. Checking again in {self.refresh}s...{Style.RESET_ALL}', end='\r') if self.shutdown_requested: break self._interruptible_sleep(self.refresh) continue # Stream is live but not ready yet if not is_live.get('title'): print(f'{Fore.YELLOW}⚠ Stream detected but no title yet. Waiting...{Style.RESET_ALL}') if self.shutdown_requested: break self._interruptible_sleep(self.refresh) continue # Stream is live and ready! print(f'\n{Fore.GREEN}✓ {self.username} is LIVE!{Style.RESET_ALL}') print(f'{Fore.CYAN}Title: {is_live["title"]}{Style.RESET_ALL}') # Create unique stream identifier stream_id = f"{is_live['createdAt']} - {self.username} - {is_live['title']}" # Parse stream start time live_date = datetime.strptime( is_live["createdAt"], '%Y-%m-%dT%H:%M:%SZ' ).replace(tzinfo=timezone('UTC')).astimezone(tz=None).replace(tzinfo=None) # Use CURRENT time for filename current_time = datetime.now() filename_base = current_time.strftime('%Y%m%d_%Hh%Mm%Ss') # Check if stream was already processed if self.file_manager.is_stream_processed(stream_id): print(f'{Fore.YELLOW}⚠ Stream was previously recorded, but it\'s still live!{Style.RESET_ALL}') print(f'{Fore.GREEN}✓ Starting new recording with timestamp: {filename_base}{Style.RESET_ALL}') else: self.file_manager.mark_stream_processed(stream_id) print(f'{Fore.GREEN}✓ New stream detected - starting recording{Style.RESET_ALL}') # Determine file paths live_raw_path = str(self.file_manager.raw_path / f"{PREFIX_LIVE}{filename_base}.ts") live_proc_ext = '.mp3' if self.quality == 'audio_only' else '.mp4' live_proc_path = str(self.file_manager.video_path / f"{PREFIX_LIVE}{filename_base}{live_proc_ext}") # Ensure unique filenames live_raw_path = get_unique_filename(live_raw_path) live_proc_path = get_unique_filename(live_proc_path) filename_base = os.path.splitext(os.path.basename(live_raw_path))[0].replace(PREFIX_LIVE, "") print(f'{Fore.CYAN}Output path: {live_raw_path}{Style.RESET_ALL}') # Send notification self.notification_manager.send(f'🔴 Stream Started - {filename_base}', f'Title: {is_live["title"]}') # Start live chat download if enabled live_chat_process = None live_chat_method = None # Track which method was used chat_json_path = str(self.file_manager.chat_json_path / f"{PREFIX_CHAT}{filename_base}.json") if self.downloadLiveCHAT: vod_id = is_live.get('archiveVideo', {}).get('id') if is_live.get('archiveVideo') else None stream_url = f"https://twitch.tv/{self.username}" live_chat_process, live_chat_method = self.downloader.start_live_chat_download_with_fallback( vod_id=vod_id, stream_url=stream_url, json_path=chat_json_path, use_chat_downloader_primary=self.use_chat_downloader_primary, no_chat_downloader_fallback=self.no_chat_downloader_fallback, verbose=self.verbose ) # Record the live stream recording_completed = self.recorder.record(is_live, live_raw_path) # If shutdown was requested during recording, try to finalize if self.shutdown_requested: print(f'{Fore.YELLOW}Attempting to process any recorded content...{Style.RESET_ALL}') # Process the raw stream file processing_succeeded = self.processor.process_raw_stream(live_raw_path, live_proc_path) self.downloader.reset_chat_render_status() # Wait for live chat download if it was started live_chat_downloaded = False if live_chat_process is not None: live_chat_downloaded = self.downloader.wait_for_chat_download(live_chat_process, chat_json_path) # Render live chat if downloaded successfully chat_rendered_successfully = False chat_video_path = None if live_chat_downloaded: chat_video_path = str(self.file_manager.chat_mp4_path / f"{PREFIX_CHAT}{filename_base}.mp4") output_args = self.processor.build_chat_output_args() # Wait for chat file to be fully accessible (not locked) print(f'{Fore.CYAN}Verifying chat file is ready for rendering...{Style.RESET_ALL}') if not self.downloader.wait_for_file_access(chat_json_path, max_attempts=15, delay=0.5): print(f'{Fore.RED}✗ Chat file is locked, skipping rendering{Style.RESET_ALL}') chat_rendered_successfully = False else: # Get video duration first (needed for chat conversion and trimming) ffmpeg_path = get_ffmpeg_executable(self.os_type) if not processing_succeeded or not os.path.exists(live_proc_path): print(f'{Fore.YELLOW}⚠ Processed video file is unavailable, skipping chat render{Style.RESET_ALL}') video_duration = None else: video_duration = get_video_duration(live_proc_path, ffmpeg_path) print(f'{Fore.CYAN}Video duration for chat rendering: {video_duration}s{Style.RESET_ALL}') # Convert chat format if needed (chat_downloader uses different JSON structure) render_json_path = chat_json_path if live_chat_method == 'chat_downloader': print(f'{Fore.CYAN}Converting chat format for rendering...{Style.RESET_ALL}') converted_path = chat_json_path.replace('.json', '_converted.json') if self.downloader.convert_chat_downloader_to_twitch_format(chat_json_path, converted_path, video_duration): render_json_path = converted_path print(f'{Fore.GREEN}✓ Chat format converted successfully{Style.RESET_ALL}') else: print(f'{Fore.RED}✗ Failed to convert chat format{Style.RESET_ALL}') chat_rendered_successfully = self.downloader.render_chat( render_json_path, chat_video_path, output_args, video_duration=video_duration ) # Merge video and chat if configured merged_video_path = None if chat_rendered_successfully and self.mergeVideoChat and os.path.exists(live_proc_path) and os.path.exists(chat_video_path): merged_video_path = str(self.file_manager.video_path / f"{PREFIX_MERGED}{filename_base}{live_proc_ext}") merge_success = self.processor.merge_video_and_chat( live_proc_path, chat_video_path, merged_video_path, self.mergeChatLayout ) # Skip VOD/chat download if shutdown was requested or vodTimeout is 0 vod_response = None if self.shutdown_requested: print(f'{Fore.YELLOW}Skipping VOD and chat download due to shutdown request{Style.RESET_ALL}') elif self.vodTimeout == 0: print(f'{Fore.CYAN}VOD check disabled (vodTimeout=0). Skipping VOD download.{Style.RESET_ALL}') else: # Try to match stream with VOD (with timeout) print(f'{Fore.CYAN}Waiting for VOD to become available (timeout: {self.vodTimeout}s)...{Style.RESET_ALL}') vod_found = False vod_wait_start = time.time() while time.time() - vod_wait_start < self.vodTimeout and not self.shutdown_requested: vod_response = self.stream_monitor.get_latest_vod() if vod_response and vod_response['data']['user']['videos']['edges']: current_vod = vod_response['data']['user']['videos']['edges'][0]['node'] vod_date = datetime.strptime( current_vod["recordedAt"], '%Y-%m-%dT%H:%M:%SZ' ).replace(tzinfo=timezone('UTC')).astimezone(tz=None).replace(tzinfo=None) # Check if VOD matches the stream (within 1 minute tolerance) time_tolerance = timedelta(minutes=1) if (live_date - time_tolerance) <= vod_date <= (live_date + time_tolerance): vod_found = True break # Wait before checking again if not vod_found: print(f'{Fore.CYAN}VOD not found yet, waiting...{Style.RESET_ALL}', end='\r') if not self._interruptible_sleep(min(10, self.vodTimeout - (time.time() - vod_wait_start))): break if not vod_found: if self.shutdown_requested: print(f'\n{Fore.YELLOW}VOD check interrupted by shutdown{Style.RESET_ALL}') else: print(f'\n{Fore.YELLOW}⚠ VOD not found after {self.vodTimeout}s - streamer may have VODs disabled{Style.RESET_ALL}') print(f'{Fore.CYAN} → Live recording and chat (if enabled) were saved successfully{Style.RESET_ALL}') vod_response = None # Process VOD if found if not self.shutdown_requested and vod_response and vod_response['data']['user']['videos']['edges']: current_vod = vod_response['data']['user']['videos']['edges'][0]['node'] vod_date = datetime.strptime( current_vod["recordedAt"], '%Y-%m-%dT%H:%M:%SZ' ).replace(tzinfo=timezone('UTC')).astimezone(tz=None).replace(tzinfo=None) # Check if VOD matches the stream (within 1 minute tolerance) time_tolerance = timedelta(minutes=1) if (live_date - time_tolerance) <= vod_date <= (live_date + time_tolerance): print(f'\n{Fore.GREEN}✓ Found matching VOD{Style.RESET_ALL}') # Save metadata self.file_manager.save_metadata(current_vod, filename_base) # Download VOD vod_ext = '.mp3' if self.quality == 'audio_only' else '.mp4' vod_path = str(self.file_manager.video_path / f"{PREFIX_VOD}{filename_base}{vod_ext}") self.downloader.download_vod(current_vod, vod_path) # Download and render chat from VOD (if not already done via live chat) if not live_chat_downloaded: chat_video_path = str(self.file_manager.chat_mp4_path / f"{PREFIX_CHAT}{filename_base}.mp4") output_args = self.processor.build_chat_output_args() # Get VOD duration to trim chat accordingly ffmpeg_path = get_ffmpeg_executable(self.os_type) vod_duration = get_video_duration(vod_path, ffmpeg_path) chat_rendered_successfully = self.downloader.download_and_render_chat( current_vod, chat_json_path, chat_video_path, output_args, video_duration=vod_duration ) # Merge VOD and chat if configured if chat_rendered_successfully and self.mergeVideoChat and os.path.exists(vod_path) and os.path.exists(chat_video_path): merged_vod_path = str(self.file_manager.video_path / f"{PREFIX_MERGED}{PREFIX_VOD}{filename_base}{vod_ext}") self.processor.merge_video_and_chat( vod_path, chat_video_path, merged_vod_path, self.mergeChatLayout ) else: print(f'{Fore.CYAN}Chat already downloaded from live stream, skipping VOD chat download{Style.RESET_ALL}') # But still merge VOD with existing chat if configured if self.mergeVideoChat and os.path.exists(vod_path) and chat_video_path and os.path.exists(chat_video_path): merged_vod_path = str(self.file_manager.video_path / f"{PREFIX_MERGED}{PREFIX_VOD}{filename_base}{vod_ext}") self.processor.merge_video_and_chat( vod_path, chat_video_path, merged_vod_path, self.mergeChatLayout ) else: print(f'{Fore.YELLOW}⚠ No matching VOD found for this stream{Style.RESET_ALL}') # Clean up raw files if configured chat_render_retry_needed = ( self.downloader.last_chat_render_attempted and not self.downloader.last_chat_render_succeeded ) if chat_render_retry_needed: print(f'{Fore.YELLOW}⚠ Preserving local files because chat rendering failed and can be retried later{Style.RESET_ALL}') elif processing_succeeded: self.file_manager.clean_raw_file(live_raw_path) elif os.path.exists(live_raw_path): print(f'{Fore.YELLOW}⚠ Keeping raw file because conversion did not complete successfully{Style.RESET_ALL}') # Upload to cloud if configured upload_success = self.file_manager.upload_to_cloud( filename_base, notification_callback=self.notification_manager.send ) # Delete local files if configured and upload succeeded if self.deleteFiles and self.uploadCloud and upload_success and not chat_render_retry_needed: self.file_manager.delete_local_files( filename_base, live_raw_path, live_proc_path, notification_callback=self.notification_manager.send ) # Done processing this stream if self.shutdown_requested: print(f'\n{Fore.YELLOW}{"=" * 60}{Style.RESET_ALL}') print(f'{Fore.YELLOW}✓ Stream processing stopped by user{Style.RESET_ALL}') print(f'{Fore.YELLOW}{"=" * 60}{Style.RESET_ALL}\n') break else: print(f'\n{Fore.GREEN}{"=" * 60}{Style.RESET_ALL}') print(f'{Fore.GREEN}✓ Stream processing complete{Style.RESET_ALL}') print(f'{Fore.GREEN}{"=" * 60}{Style.RESET_ALL}\n') self.notification_manager.send(f'✓ Complete - {filename_base}', 'Stream processing finished. Resuming monitoring.') self._interruptible_sleep(self.refresh) except KeyboardInterrupt: if not self.shutdown_requested: self.shutdown_requested = True print(f'\n{Fore.YELLOW}{"=" * 60}{Style.RESET_ALL}') print(f'{Fore.YELLOW}⚠ Interrupted. Cleaning up...{Style.RESET_ALL}') print(f'{Fore.YELLOW}{"=" * 60}{Style.RESET_ALL}\n') break except Exception as e: print(f'\n{Fore.RED}{"=" * 60}{Style.RESET_ALL}') print(f'{Fore.RED}✗ ERROR: {str(e)}{Style.RESET_ALL}') print(f'{Fore.YELLOW}Waiting {self.refresh} seconds before retrying...{Style.RESET_ALL}') print(f'{Fore.RED}{"=" * 60}{Style.RESET_ALL}\n') self.notification_manager.send('⚠ Error - Recovery', f'Error: {str(e)}\nRetrying after {self.refresh} seconds.') if self.shutdown_requested: break self._interruptible_sleep(self.refresh) # Final cleanup message print(f'{Fore.GREEN}✓ Monitoring stopped cleanly{Style.RESET_ALL}') def _upload_to_cloud(self, filename_base: str) -> bool: """ Upload archived files to cloud storage using rclone. Args: filename_base: Base filename (without prefixes/extensions) Returns: bool: True if upload succeeded or is disabled, False if failed """ if not self.uploadCloud: return True # Consider upload "successful" if disabled print(f'\n{Fore.CYAN}Uploading to cloud storage...{Style.RESET_ALL}') self.send_notification(f'☁ Uploading - {filename_base}', 'Uploading files to cloud storage') # Create list of files to upload bin_path = self._get_bin_path() upload_list_path = os.path.join(bin_path, 'temp', 'upload.txt') # Ensure temp directory exists os.makedirs(os.path.dirname(upload_list_path), exist_ok=True) files_to_upload = [ f"{PREFIX_LIVE}{filename_base}.ts", f"{PREFIX_LIVE}{filename_base}.mp4", f"{PREFIX_LIVE}{filename_base}.mp3", f"{PREFIX_VOD}{filename_base}.ts", f"{PREFIX_VOD}{filename_base}.mp4", f"{PREFIX_VOD}{filename_base}.mp3", f"{PREFIX_METADATA}{filename_base}.json", f"{PREFIX_CHAT}{filename_base}.json", f"{PREFIX_CHAT}{filename_base}.mp4" ] with open(upload_list_path, 'w') as f: f.write('\n'.join(files_to_upload)) # Run rclone try: result = subprocess.call([ 'rclone', 'copy', str(pathlib.Path(self.root_path).resolve()), self.rclone_path, '--include-from', upload_list_path ]) # Clean up upload list if os.path.exists(upload_list_path): os.remove(upload_list_path) if result == 0: print(f'{Fore.GREEN}✓ Upload complete{Style.RESET_ALL}') self.send_notification(f'✓ Upload Success - {filename_base}', 'All files uploaded successfully') return True else: print(f'{Fore.RED}✗ Upload failed (exit code: {result}){Style.RESET_ALL}') print(f'{Fore.YELLOW}Files preserved locally due to upload failure{Style.RESET_ALL}') self.send_notification(f'✗ Upload Failed - {filename_base}', f'Upload failed with code {result}. Files preserved locally.') return False except Exception as e: print(f'{Fore.RED}✗ Upload error: {str(e)}{Style.RESET_ALL}') return False def _delete_local_files(self, filename_base: str, live_raw_path: str, live_proc_path: str) -> None: """ Delete local archive files after successful upload. Args: filename_base: Base filename (without prefixes/extensions) live_raw_path: Path to live raw file live_proc_path: Path to live processed file """ print(f'\n{Fore.RED}{"=" * 60}{Style.RESET_ALL}') print(f'{Fore.RED}⚠ DELETING LOCAL FILES{Style.RESET_ALL}') print(f'{Fore.RED}{"=" * 60}{Style.RESET_ALL}\n') self.send_notification(f'🗑 Deleting - {filename_base}', 'Deleting local files after successful upload') files_to_delete = [] # Live files if not self.cleanRaw and os.path.exists(live_raw_path): files_to_delete.append(live_raw_path) if os.path.exists(live_proc_path): files_to_delete.append(live_proc_path) # VOD files if self.downloadVOD: vod_raw = os.path.join(self.raw_path, f"{PREFIX_VOD}{filename_base}.ts") vod_mp4 = os.path.join(self.video_path, f"{PREFIX_VOD}{filename_base}.mp4") vod_mp3 = os.path.join(self.video_path, f"{PREFIX_VOD}{filename_base}.mp3") if not self.cleanRaw and os.path.exists(vod_raw): files_to_delete.append(vod_raw) if os.path.exists(vod_mp4): files_to_delete.append(vod_mp4) if os.path.exists(vod_mp3): files_to_delete.append(vod_mp3) # Chat files if self.downloadCHAT: chat_json = os.path.join(self.chatJSON_path, f"{PREFIX_CHAT}{filename_base}.json") chat_mp4 = os.path.join(self.chatMP4_path, f"{PREFIX_CHAT}{filename_base}.mp4") if os.path.exists(chat_json): files_to_delete.append(chat_json) if os.path.exists(chat_mp4): files_to_delete.append(chat_mp4) # Metadata files if self.downloadMETADATA: metadata = os.path.join(self.metadata_path, f"{PREFIX_METADATA}{filename_base}.json") if os.path.exists(metadata): files_to_delete.append(metadata) # Delete all files for filepath in files_to_delete: try: print(f'{Fore.RED} Deleting: {os.path.basename(filepath)}{Style.RESET_ALL}') os.remove(filepath) except Exception as e: print(f'{Fore.YELLOW} ⚠ Failed to delete {filepath}: {e}{Style.RESET_ALL}') print(f'{Fore.RED}\n✓ Cleanup complete{Style.RESET_ALL}') # ============================================================================ # MULTI-STREAMER MANAGER # ============================================================================ class TwitchArchiveManager: """ Manages multiple TwitchArchive instances for monitoring multiple streamers. """ def __init__(self, specific_streamer: Optional[str] = None, verbose: bool = False, chat_only: bool = False, use_chat_downloader_primary: bool = False, use_chat_downloader_fallback: bool = True): """ Initialize the manager. Args: specific_streamer: If provided, only monitor this streamer (ignore enabled status) verbose: Enable verbose debug output chat_only: Only download chat, skip video recording (test mode) use_chat_downloader_primary: Use chat_downloader as primary chat source use_chat_downloader_fallback: Enable chat_downloader fallback """ self.config_manager = ConfigManager() self.specific_streamer = specific_streamer self.verbose = verbose self.chat_only = chat_only self.use_chat_downloader_primary = use_chat_downloader_primary self.use_chat_downloader_fallback = use_chat_downloader_fallback self.archivers: Dict[str, TwitchArchive] = {} self.shutdown_requested = False self.active_recordings: Dict[str, str] = {} # Track active recordings: {username: stream_id} # Setup signal handlers signal.signal(signal.SIGTERM, self._signal_handler) signal.signal(signal.SIGINT, self._signal_handler) def _signal_handler(self, signum, frame): """Handle shutdown signals gracefully.""" print(f'\n{Fore.YELLOW}⚠ Shutdown signal received...{Style.RESET_ALL}') self.shutdown_requested = True # Signal all archivers to shut down for archiver in self.archivers.values(): archiver.shutdown_requested = True def _get_streamers_to_monitor(self) -> list: """ Get list of streamers to monitor. Returns: list: List of streamer usernames to monitor """ if self.specific_streamer: # Monitor only the specified streamer (ignore enabled flag) return [self.specific_streamer] else: # Monitor all enabled streamers return self.config_manager.get_all_enabled_streamers() def _initialize_archiver(self, username: str) -> TwitchArchive: """ Initialize a TwitchArchive instance for a streamer. Args: username: Twitch username Returns: TwitchArchive: Initialized archiver instance """ config = self.config_manager.load_streamer_config(username) # Apply command-line overrides for chat_downloader options config['useChatDownloaderPrimary'] = self.use_chat_downloader_primary config['useChatDownloaderFallback'] = self.use_chat_downloader_fallback archiver = TwitchArchive(config) return archiver def run(self) -> None: """ Main entry point for multi-streamer monitoring. Monitors all enabled streamers (or a specific one if provided). """ print(f'\n{Fore.CYAN}{"=" * 70}{Style.RESET_ALL}') print(f'{Fore.CYAN}TWITCH ARCHIVE - Multi-Streamer Mode{Style.RESET_ALL}') if self.chat_only: print(f'{Fore.YELLOW}🧪 TEST MODE: Chat-Only (Video Recording Disabled){Style.RESET_ALL}') print(f'{Fore.CYAN}{"=" * 70}{Style.RESET_ALL}\n') # Get streamers to monitor streamers = self._get_streamers_to_monitor() if not streamers: print(f'{Fore.RED}✗ No streamers configured or enabled{Style.RESET_ALL}') print(f'{Fore.CYAN}→ Create config files in config/streamers/{Style.RESET_ALL}') print(f'{Fore.CYAN}→ Or run with -u to create a new config{Style.RESET_ALL}') sys.exit(1) if self.chat_only: print(f'{Fore.YELLOW}📝 Chat-Only Mode Enabled:{Style.RESET_ALL}') print(f'{Fore.CYAN} • Verbose logging: ON{Style.RESET_ALL}') print(f'{Fore.CYAN} • Video recording: DISABLED{Style.RESET_ALL}') print(f'{Fore.CYAN} • Chat download: ENABLED{Style.RESET_ALL}') print(f'{Fore.CYAN} • VOD download: DISABLED{Style.RESET_ALL}') print() print(f'{Fore.GREEN}Monitoring {len(streamers)} streamer(s):{Style.RESET_ALL}') for streamer in streamers: print(f' • {Fore.CYAN}{streamer}{Style.RESET_ALL}') print() # Initialize archivers for all streamers for username in streamers: try: archiver = self._initialize_archiver(username) # Load environment and initialize components archiver._load_environment_variables() archiver._initialize_components() # Validate username through stream_monitor archiver.stream_monitor.validate_username() self.archivers[username] = archiver print(f'{Fore.GREEN}✓ Initialized {username}{Style.RESET_ALL}') except Exception as e: print(f'{Fore.RED}✗ Failed to initialize {username}: {e}{Style.RESET_ALL}') import traceback traceback.print_exc() if not self.archivers: print(f'{Fore.RED}✗ No archivers could be initialized{Style.RESET_ALL}') sys.exit(1) # Verify dependencies once (shared across all streamers) print(f'\n{Fore.CYAN}Verifying dependencies...{Style.RESET_ALL}') first_archiver = next(iter(self.archivers.values())) if not verify_streamlink(): sys.exit(1) verify_ffmpeg(first_archiver.os_type) if first_archiver.downloadVOD or first_archiver.downloadCHAT: verify_twitch_downloader(first_archiver.os_type) if any(archiver.uploadCloud for archiver in self.archivers.values()) and not verify_rclone(): sys.exit(1) # Print configuration summary for each streamer for username, archiver in self.archivers.items(): archiver._print_configuration_summary() print(f'\n{Fore.GREEN}🚀 Starting monitoring loop...{Style.RESET_ALL}\n') # Start monitoring loop self._monitoring_loop() def _monitoring_loop(self) -> None: """ Main monitoring loop for all streamers. Checks each streamer's status and processes streams as needed. """ last_check = {} last_status_print = time.time() while not self.shutdown_requested: current_time = time.time() # Print periodic status every 60 seconds if current_time - last_status_print >= 60: status_line = " | ".join([f"{username}: checking" for username in self.archivers.keys()]) print(f'{Fore.CYAN}[Status] {status_line}{Style.RESET_ALL}') last_status_print = current_time for username, archiver in self.archivers.items(): # Check if enough time has passed since last check for this streamer if username not in last_check or (current_time - last_check[username]) >= archiver.refresh: last_check[username] = current_time # Check stream status try: response = archiver.stream_monitor.check_stream_status() # Debug: Print the full response (if verbose) if self.verbose: print(f'\n{Fore.MAGENTA}[DEBUG {username}] API Response: {response}{Style.RESET_ALL}') stream_data = response['data']['user']['stream'] if response else None if self.verbose: print(f'{Fore.MAGENTA}[DEBUG {username}] Stream data: {stream_data}{Style.RESET_ALL}') if stream_data: # Stream is live - check if it has required basic data (title and start time) if stream_data.get('title') and stream_data.get('createdAt'): # Create composite stream ID like single-streamer mode # This prevents duplicate recordings in the same session stream_id = f"{stream_data['createdAt']} - {username} - {stream_data.get('title', 'Untitled')}" if self.verbose: # Check if VOD ID is available (for live chat) if stream_data.get('archiveVideo') and stream_data['archiveVideo'].get('id'): print(f'{Fore.MAGENTA}[DEBUG {username}] VOD ID: {stream_data["archiveVideo"]["id"]}{Style.RESET_ALL}') else: print(f'{Fore.MAGENTA}[DEBUG {username}] No VOD ID available (VODs may be disabled){Style.RESET_ALL}') print(f'{Fore.MAGENTA}[DEBUG {username}] Composite Stream ID: {stream_id}{Style.RESET_ALL}') # Check if we're currently recording this stream currently_recording = username in self.active_recordings and self.active_recordings[username] == stream_id if self.verbose: print(f'{Fore.MAGENTA}[DEBUG {username}] Currently recording: {currently_recording}{Style.RESET_ALL}') print(f'{Fore.MAGENTA}[DEBUG {username}] Active recordings: {self.active_recordings}{Style.RESET_ALL}') # Record if not currently recording (ignore .log file - always record if live) if not currently_recording: print(f'\n{Fore.GREEN}[{username}] Stream detected!{Style.RESET_ALL}') print(f'{Fore.CYAN}Title: {stream_data.get("title", "No title")}{Style.RESET_ALL}') print(f'{Fore.CYAN}Started at: {stream_data["createdAt"]}{Style.RESET_ALL}') # Warn if VOD ID not available if not (stream_data.get('archiveVideo') and stream_data['archiveVideo'].get('id')): print(f'{Fore.YELLOW}⚠ VOD ID not available - live chat download will be skipped{Style.RESET_ALL}') print(f'{Fore.YELLOW} Stream recording will proceed normally{Style.RESET_ALL}') # Mark as currently recording self.active_recordings[username] = stream_id # Process the stream (this blocks until stream ends) self._process_stream(archiver, stream_data, stream_id) # Mark as processed in log (for record keeping) archiver.file_manager.mark_stream_processed(stream_id) # Remove from active recordings if username in self.active_recordings: del self.active_recordings[username] else: if self.verbose: print(f'{Fore.CYAN}[{username}] Currently recording this stream, skipping duplicate...{Style.RESET_ALL}') else: # Stream is live but not fully initialized yet print(f'{Fore.YELLOW}[{username}] Stream starting up, waiting for stream data...{Style.RESET_ALL}') else: # Not live if self.verbose: print(f'{Fore.CYAN}[{username}] Offline - checking again in {archiver.refresh}s{Style.RESET_ALL}', end='\r') except Exception as e: print(f'{Fore.RED}[{username}] Error checking stream: {e}{Style.RESET_ALL}') import traceback traceback.print_exc() # Sleep briefly before next iteration time.sleep(1) def _process_stream(self, archiver: TwitchArchive, stream_info: Dict[str, Any], stream_id: str) -> None: """ Process a detected stream for a specific archiver. Args: archiver: The TwitchArchive instance stream_info: Stream information from API stream_id: Unique stream ID """ # Store stream data archiver.current_stream_data = { 'stream_id': stream_id, 'title': stream_info['title'], 'started_at': stream_info['createdAt'] } # Generate timestamp and filename timestamp = datetime.now(timezone('UTC')).strftime("%Y%m%d_%Hh%Mm%Ss") filename_base = f"{archiver.username}_{timestamp}" # Parse stream start time live_date = datetime.strptime( stream_info["createdAt"], '%Y-%m-%dT%H:%M:%SZ' ).replace(tzinfo=timezone('UTC')).astimezone(tz=None).replace(tzinfo=None) # Define paths raw_extension = '.ts' proc_extension = '.mp3' if archiver.quality == 'audio_only' else '.mp4' live_raw_path = str(archiver.file_manager.raw_path / f"{PREFIX_LIVE}{filename_base}{raw_extension}") live_proc_path = str(archiver.file_manager.video_path / f"{PREFIX_LIVE}{filename_base}{proc_extension}") chat_json_path = str(archiver.file_manager.chat_json_path / f"{PREFIX_CHAT}{filename_base}.json") # Send notification if not self.chat_only: archiver.notification_manager.send( f"Stream Started - {archiver.username}", f"Recording: {stream_info['title']}" ) # Start live chat download if enabled (with fallback support) live_chat_process = None live_chat_method = 'failed' if archiver.downloadLiveCHAT: if self.verbose or self.chat_only: print(f'\n{Fore.MAGENTA}[VERBOSE] Starting chat download process...{Style.RESET_ALL}') print(f'{Fore.MAGENTA}[VERBOSE] downloadLiveCHAT: {archiver.downloadLiveCHAT}{Style.RESET_ALL}') print(f'{Fore.MAGENTA}[VERBOSE] useChatDownloaderPrimary: {archiver.downloader.use_chat_downloader_primary}{Style.RESET_ALL}') print(f'{Fore.MAGENTA}[VERBOSE] useChatDownloaderFallback: {archiver.downloader.use_chat_downloader_fallback}{Style.RESET_ALL}') # Get VOD ID if available live_vod_id = None if stream_info.get('archiveVideo') and stream_info['archiveVideo'].get('id'): live_vod_id = stream_info['archiveVideo']['id'] print(f'{Fore.CYAN}Live VOD ID detected: {live_vod_id}{Style.RESET_ALL}') if self.verbose or self.chat_only: print(f'{Fore.MAGENTA}[VERBOSE] VOD URL: https://www.twitch.tv/videos/{live_vod_id}{Style.RESET_ALL}') else: print(f'{Fore.YELLOW}⚠ No VOD ID available - will use fallback if configured{Style.RESET_ALL}') if self.verbose or self.chat_only: print(f'{Fore.MAGENTA}[VERBOSE] This happens when streamer has VODs disabled{Style.RESET_ALL}') print(f'{Fore.MAGENTA}[VERBOSE] chat_downloader fallback will be used if enabled{Style.RESET_ALL}') # Try to start live chat download with fallback try: if self.verbose or self.chat_only: print(f'{Fore.MAGENTA}[VERBOSE] Calling start_live_chat_download_with_fallback(){Style.RESET_ALL}') print(f'{Fore.MAGENTA}[VERBOSE] Username: {archiver.username}{Style.RESET_ALL}') print(f'{Fore.MAGENTA}[VERBOSE] VOD ID: {live_vod_id}{Style.RESET_ALL}') print(f'{Fore.MAGENTA}[VERBOSE] Output path: {chat_json_path}{Style.RESET_ALL}') live_chat_process, live_chat_method = archiver.downloader.start_live_chat_download_with_fallback( archiver.username, live_vod_id, chat_json_path ) if self.verbose or self.chat_only: print(f'{Fore.MAGENTA}[VERBOSE] Chat download method selected: {live_chat_method}{Style.RESET_ALL}') print(f'{Fore.MAGENTA}[VERBOSE] Process handle: {live_chat_process}{Style.RESET_ALL}') # If chat_downloader is selected, start it in background thread now (before video recording) if live_chat_method == 'chat_downloader' and not self.chat_only: if self.verbose: print(f'{Fore.MAGENTA}[VERBOSE] Starting chat_downloader in background thread...{Style.RESET_ALL}') try: print(f'{Fore.CYAN}Starting chat_downloader in background (concurrent with video)...{Style.RESET_ALL}') archiver.downloader.start_chat_downloader_thread( archiver.username, chat_json_path, shutdown_check=lambda: self.shutdown_requested or archiver.shutdown_requested, stream_monitor=archiver.stream_monitor, verbose=self.verbose ) except Exception as e: print(f'{Fore.RED}✗ Failed to start chat thread: {e}{Style.RESET_ALL}') import traceback traceback.print_exc() live_chat_method = 'failed' except Exception as e: print(f'{Fore.RED}✗ Failed to start live chat download: {e}{Style.RESET_ALL}') import traceback traceback.print_exc() live_chat_method = 'failed' # Record livestream (skip in chat-only mode) if self.chat_only: print(f'\n{Fore.YELLOW}🧪 Chat-Only Mode: Skipping video recording{Style.RESET_ALL}') print(f'{Fore.CYAN}Waiting for chat download to complete...{Style.RESET_ALL}') # Start chat download based on method if live_chat_method == 'chat_downloader': if self.verbose: print(f'{Fore.MAGENTA}[VERBOSE] Starting chat_downloader in background thread...{Style.RESET_ALL}') try: print(f'{Fore.CYAN}Using chat_downloader for live chat...{Style.RESET_ALL}') archiver.downloader.start_chat_downloader_thread( archiver.username, chat_json_path, shutdown_check=lambda: self.shutdown_requested or archiver.shutdown_requested, stream_monitor=archiver.stream_monitor, verbose=self.verbose or self.chat_only ) # Wait for completion live_chat_downloaded = archiver.downloader.wait_for_chat_thread() except Exception as e: print(f'{Fore.RED}✗ chat_downloader failed: {e}{Style.RESET_ALL}') import traceback traceback.print_exc() live_chat_downloaded = False elif live_chat_method == 'twitch_downloader' and live_chat_process is not None: if self.verbose: print(f'{Fore.MAGENTA}[VERBOSE] Waiting for TwitchDownloaderCLI process...{Style.RESET_ALL}') live_chat_downloaded = archiver.downloader.wait_for_chat_download(live_chat_process, chat_json_path) else: live_chat_downloaded = False # Report results if live_chat_downloaded: print(f'\n{Fore.GREEN}✓ Chat-Only Test Complete!{Style.RESET_ALL}') print(f'{Fore.CYAN}Chat saved to: {chat_json_path}{Style.RESET_ALL}') if os.path.exists(chat_json_path): file_size = os.path.getsize(chat_json_path) print(f'{Fore.CYAN}File size: {file_size / 1024:.2f} KB{Style.RESET_ALL}') else: print(f'\n{Fore.RED}✗ Chat download failed{Style.RESET_ALL}') return # Exit early, don't process video # Normal mode: Record livestream recording_successful = archiver.recorder.record(stream_info, live_raw_path) # Check if raw file exists (may exist even after interrupted recording) if not os.path.exists(live_raw_path): print(f'{Fore.RED}✗ No recording file found, skipping processing{Style.RESET_ALL}') # Still wait for chat if it's downloading if live_chat_method == 'chat_downloader' and archiver.downloader.chat_thread is not None: print(f'{Fore.CYAN}Waiting for chat download to finish...{Style.RESET_ALL}') archiver.downloader.wait_for_chat_thread(timeout=30) elif live_chat_method == 'twitch_downloader' and live_chat_process is not None: print(f'{Fore.CYAN}Waiting for chat download to finish...{Style.RESET_ALL}') archiver.downloader.wait_for_chat_download(live_chat_process, chat_json_path, timeout=30) return # Get file size to check if anything was recorded file_size = os.path.getsize(live_raw_path) if file_size < 1024: # Less than 1KB means essentially nothing was recorded print(f'{Fore.RED}✗ Recording file too small ({file_size} bytes), skipping processing{Style.RESET_ALL}') return print(f'{Fore.CYAN}Processing recorded content ({file_size / (1024*1024):.2f} MB)...{Style.RESET_ALL}') # Process raw stream processing_succeeded = False if not archiver.onlyRaw: processing_succeeded = archiver.processor.process_raw_stream(live_raw_path, live_proc_path) archiver.downloader.reset_chat_render_status() # Wait for live chat download if it was started live_chat_downloaded = False chat_rendered_successfully = False chat_video_path = None # Handle different chat download methods if live_chat_method == 'twitch_downloader' and live_chat_process is not None: # Wait for TwitchDownloaderCLI process print(f'{Fore.CYAN}Waiting for live chat download to complete...{Style.RESET_ALL}') live_chat_downloaded = archiver.downloader.wait_for_chat_download(live_chat_process, chat_json_path) elif live_chat_method == 'chat_downloader' and archiver.downloader.chat_thread is not None: # Wait for chat_downloader thread print(f'{Fore.CYAN}Waiting for live chat download to complete...{Style.RESET_ALL}') try: live_chat_downloaded = archiver.downloader.wait_for_chat_thread() if live_chat_downloaded: print(f'{Fore.GREEN}✓ Chat download thread completed successfully{Style.RESET_ALL}') else: print(f'{Fore.YELLOW}⚠ Chat download thread completed with errors or no messages{Style.RESET_ALL}') except Exception as e: print(f'{Fore.RED}✗ Error waiting for chat download thread: {e}{Style.RESET_ALL}') import traceback traceback.print_exc() live_chat_downloaded = False # Render live chat if downloaded successfully if live_chat_downloaded: chat_video_path = str(archiver.file_manager.chat_mp4_path / f"{PREFIX_CHAT}{filename_base}.mp4") output_args = archiver.processor.build_chat_output_args() # Wait for chat file to be fully accessible (not locked) print(f'{Fore.CYAN}Verifying chat file is ready for rendering...{Style.RESET_ALL}') if not archiver.downloader.wait_for_file_access(chat_json_path, max_attempts=15, delay=0.5): print(f'{Fore.RED}✗ Chat file is locked, skipping rendering{Style.RESET_ALL}') chat_rendered_successfully = False else: # Get video duration first if not processing_succeeded or not os.path.exists(live_proc_path): print(f'{Fore.YELLOW}⚠ Processed video file is unavailable, skipping chat render{Style.RESET_ALL}') video_duration = None else: ffmpeg_path = get_ffmpeg_executable(archiver.os_type) video_duration = get_video_duration(live_proc_path, ffmpeg_path) if video_duration is None: print(f'{Fore.YELLOW}⚠ Could not detect video duration from {live_proc_path}{Style.RESET_ALL}') print(f'{Fore.YELLOW} Will use chat message timestamps instead{Style.RESET_ALL}') else: print(f'{Fore.CYAN}Video duration for chat rendering: {video_duration}s{Style.RESET_ALL}') # Convert chat format if chat_downloader was used render_json_path = chat_json_path if live_chat_method == 'chat_downloader': converted_path = chat_json_path.replace('.json', '_converted.json') print(f'{Fore.CYAN}Chat downloaded with chat_downloader, converting format...{Style.RESET_ALL}') if archiver.downloader.convert_chat_downloader_to_twitch_format(chat_json_path, converted_path, video_duration): render_json_path = converted_path print(f'{Fore.GREEN}✓ Using converted chat file for rendering{Style.RESET_ALL}') else: print(f'{Fore.RED}✗ Format conversion failed, skipping rendering{Style.RESET_ALL}') chat_rendered_successfully = False render_json_path = None if render_json_path: chat_rendered_successfully = archiver.downloader.render_chat( render_json_path, chat_video_path, output_args, video_duration=video_duration ) # Merge video and chat if configured merged_video_path = None if chat_rendered_successfully and archiver.mergeVideoChat and os.path.exists(live_proc_path) and os.path.exists(chat_video_path): merged_video_path = str(archiver.file_manager.video_path / f"{PREFIX_MERGED}{filename_base}{proc_extension}") archiver.processor.merge_video_and_chat( live_proc_path, chat_video_path, merged_video_path, archiver.mergeChatLayout ) # Wait for VOD and download it vod_response = None if archiver.vodTimeout == 0: print(f'{Fore.CYAN}VOD check disabled (vodTimeout=0). Skipping VOD download.{Style.RESET_ALL}') elif archiver.shutdown_requested: print(f'{Fore.YELLOW}Skipping VOD download due to shutdown request{Style.RESET_ALL}') else: # Try to match stream with VOD (with timeout) print(f'{Fore.CYAN}Waiting for VOD to become available (timeout: {archiver.vodTimeout}s)...{Style.RESET_ALL}') vod_found = False vod_wait_start = time.time() while time.time() - vod_wait_start < archiver.vodTimeout: # Check for shutdown request if archiver.shutdown_requested: print(f'\n{Fore.YELLOW}VOD check interrupted by shutdown{Style.RESET_ALL}') break vod_response = archiver.stream_monitor.get_latest_vod() if vod_response and vod_response['data']['user']['videos']['edges']: current_vod = vod_response['data']['user']['videos']['edges'][0]['node'] vod_date = datetime.strptime( current_vod["recordedAt"], '%Y-%m-%dT%H:%M:%SZ' ).replace(tzinfo=timezone('UTC')).astimezone(tz=None).replace(tzinfo=None) # Check if VOD matches the stream (within 1 minute tolerance) time_tolerance = timedelta(minutes=1) if (live_date - time_tolerance) <= vod_date <= (live_date + time_tolerance): vod_found = True break # Wait before checking again if not vod_found: print(f'{Fore.CYAN}VOD not found yet, waiting...{Style.RESET_ALL}', end='\r') time.sleep(min(10, archiver.vodTimeout - (time.time() - vod_wait_start))) if not vod_found: print(f'\n{Fore.YELLOW}⚠ VOD not found after {archiver.vodTimeout}s - streamer may have VODs disabled{Style.RESET_ALL}') print(f'{Fore.CYAN} → Live recording and chat (if enabled) were saved successfully{Style.RESET_ALL}') vod_response = None # Process VOD if found if vod_response and vod_response['data']['user']['videos']['edges']: current_vod = vod_response['data']['user']['videos']['edges'][0]['node'] vod_date = datetime.strptime( current_vod["recordedAt"], '%Y-%m-%dT%H:%M:%SZ' ).replace(tzinfo=timezone('UTC')).astimezone(tz=None).replace(tzinfo=None) # Check if VOD matches the stream (within 1 minute tolerance) time_tolerance = timedelta(minutes=1) if (live_date - time_tolerance) <= vod_date <= (live_date + time_tolerance): print(f'\n{Fore.GREEN}✓ Found matching VOD{Style.RESET_ALL}') # Save metadata if archiver.downloadMETADATA: archiver.file_manager.save_metadata(current_vod, filename_base) # Download VOD if archiver.downloadVOD: vod_ext = '.mp3' if archiver.quality == 'audio_only' else '.mp4' vod_path = str(archiver.file_manager.video_path / f"{PREFIX_VOD}{filename_base}{vod_ext}") archiver.downloader.download_vod(current_vod, vod_path) # Download and render chat from VOD (if not already done via live chat) if archiver.downloadCHAT and not live_chat_downloaded: chat_video_path = str(archiver.file_manager.chat_mp4_path / f"{PREFIX_CHAT}{filename_base}.mp4") output_args = archiver.processor.build_chat_output_args() # Get VOD duration to trim chat accordingly ffmpeg_path = get_ffmpeg_executable(archiver.os_type) vod_duration = get_video_duration(vod_path, ffmpeg_path) chat_rendered_successfully = archiver.downloader.download_and_render_chat( current_vod, chat_json_path, chat_video_path, output_args, video_duration=vod_duration ) # Merge VOD and chat if configured if chat_rendered_successfully and archiver.mergeVideoChat and os.path.exists(vod_path) and os.path.exists(chat_video_path): merged_vod_path = str(archiver.file_manager.video_path / f"{PREFIX_MERGED}{PREFIX_VOD}{filename_base}{vod_ext}") archiver.processor.merge_video_and_chat( vod_path, chat_video_path, merged_vod_path, archiver.mergeChatLayout ) elif live_chat_downloaded: print(f'{Fore.CYAN}Chat already downloaded from live stream, skipping VOD chat download{Style.RESET_ALL}') # But still merge VOD with existing chat if configured if archiver.mergeVideoChat and archiver.downloadVOD and os.path.exists(vod_path) and chat_video_path and os.path.exists(chat_video_path): merged_vod_path = str(archiver.file_manager.video_path / f"{PREFIX_MERGED}{PREFIX_VOD}{filename_base}{vod_ext}") archiver.processor.merge_video_and_chat( vod_path, chat_video_path, merged_vod_path, archiver.mergeChatLayout ) else: print(f'{Fore.YELLOW}⚠ No matching VOD found for this stream{Style.RESET_ALL}') elif archiver.downloadMETADATA: # Save what metadata we have from the live stream archiver.file_manager.save_metadata(stream_info, filename_base) # Clean up raw file if configured chat_render_retry_needed = ( archiver.downloader.last_chat_render_attempted and not archiver.downloader.last_chat_render_succeeded ) if chat_render_retry_needed: print(f'{Fore.YELLOW}⚠ Preserving local files because chat rendering failed and can be retried later{Style.RESET_ALL}') elif processing_succeeded: archiver.file_manager.clean_raw_file(live_raw_path) elif os.path.exists(live_raw_path): print(f'{Fore.YELLOW}⚠ Keeping raw file because conversion did not complete successfully{Style.RESET_ALL}') # Upload to cloud if configured upload_success = archiver.file_manager.upload_to_cloud( filename_base, notification_callback=archiver.notification_manager.send ) # Delete files if configured if archiver.deleteFiles and archiver.uploadCloud and upload_success and not chat_render_retry_needed: archiver.file_manager.delete_local_files( filename_base, live_raw_path, live_proc_path, notification_callback=archiver.notification_manager.send ) # Send completion notification archiver.notification_manager.send( f"Stream Archived - {archiver.username}", f"Completed: {stream_info['title']}" ) def run_rclone_smoke_test(specific_streamer: Optional[str] = None) -> int: """Run a one-off rclone smoke test using the configured upload destination.""" config_manager = ConfigManager() if specific_streamer: username = specific_streamer else: enabled_streamers = config_manager.get_all_enabled_streamers() if not enabled_streamers: print(f'{Fore.RED}✗ No enabled streamers available for smoke test{Style.RESET_ALL}') print(f'{Fore.CYAN}→ Use -u or enable a streamer config{Style.RESET_ALL}') return 1 username = enabled_streamers[0] config = config_manager.load_streamer_config(username) file_manager = FileManager( root_path=config.get('root_path', 'archive'), username=username, config=config ) file_manager.initialize_directories() print(f'\n{Fore.CYAN}{"=" * 70}{Style.RESET_ALL}') print(f'{Fore.CYAN}TWITCH ARCHIVE - Rclone Smoke Test{Style.RESET_ALL}') print(f'{Fore.CYAN}{"=" * 70}{Style.RESET_ALL}') print(f'{Fore.GREEN}Streamer: {username}{Style.RESET_ALL}') print(f'{Fore.GREEN}Remote: {config.get("rclone_path", "")}{Style.RESET_ALL}\n') return 0 if file_manager.run_rclone_smoke_test() else 1 def run_healthcheck(specific_streamer: Optional[str] = None) -> int: """Run a local readiness check suitable for Docker health checks.""" config_manager = ConfigManager() if specific_streamer: username = specific_streamer else: enabled_streamers = config_manager.get_all_enabled_streamers() username = enabled_streamers[0] if enabled_streamers else 'vinesauce' config = config_manager.load_streamer_config(username) archive = TwitchArchive(config) try: archive._load_environment_variables() except SystemExit: return 1 archive._initialize_components() checks_ok = True if not verify_streamlink(): checks_ok = False if not verify_ffmpeg(archive.os_type): checks_ok = False if (archive.downloadVOD or archive.downloadCHAT) and not verify_twitch_downloader(archive.os_type): checks_ok = False if archive.uploadCloud: if not verify_rclone(): checks_ok = False rclone_config_path = os.getenv('RCLONE_CONFIG') if rclone_config_path and not os.path.exists(rclone_config_path): print(f'{Fore.RED}✗ ERROR: RCLONE_CONFIG points to a missing file: {rclone_config_path}{Style.RESET_ALL}') checks_ok = False if not checks_ok: return 1 print(f'{Fore.GREEN}✓ Healthcheck OK for {username}{Style.RESET_ALL}') return 0 # ============================================================================ # COMMAND-LINE INTERFACE # ============================================================================ def main(argv: list) -> None: """ Main entry point for command-line execution. Parses command-line arguments and starts the archive system. Args: argv: Command-line arguments """ specific_streamer = None use_legacy_mode = False rclone_smoke_test_mode = False healthcheck_mode = False help_msg = f''' {Fore.CYAN}{"=" * 70} TWITCH ARCHIVE - Automated Stream Recording & Archiving {"=" * 70}{Style.RESET_ALL} {Fore.GREEN}USAGE:{Style.RESET_ALL} python twitch-archive.py [OPTIONS] {Fore.GREEN}MODES:{Style.RESET_ALL} • Multi-Streamer Mode (default): Monitor all enabled streamers from config/streamers/*.json • Single-Streamer Mode: Use -u to monitor only one streamer • Legacy Mode: Uses config.json if it exists (deprecated) {Fore.GREEN}OPTIONS:{Style.RESET_ALL} -h, --help Display this help information -u, --username Monitor only this Twitch channel --verbose Enable verbose debug output --legacy Force legacy mode (use config.json) --chat-only Test mode: Only download chat (skip video recording) Automatically enables verbose logging --healthcheck Validate config and tool availability, then exit --rclone-smoke-test Create a small test file and upload it with rclone --use-chat-downloader-primary Use chat_downloader as primary chat source (for testing) --no-chat-downloader-fallback Disable chat_downloader fallback {Fore.GREEN}LEGACY OPTIONS (when using --legacy):{Style.RESET_ALL} -q, --quality Stream quality: best/source, high/720p, medium/480p, low/360p, audio_only -a, --ttv-lol <0|1> Enable ad-blocking (1) or disable (0) -v, --vod <0|1> Download VODs after stream ends -c, --chat <0|1> Download and render chat -m, --metadata <0|1> Download stream metadata -r, --upload <0|1> Upload to cloud storage via rclone -d, --delete <0|1> Delete local files after upload (CAREFUL!) -n, --notifications <0|1> Send email notifications {Fore.YELLOW}TIPS:{Style.RESET_ALL} • Create config/global.json for default settings • Create config/streamers/.json for each streamer • Set enabled: true/false in each streamer config • Set up API credentials in .env file {Fore.CYAN}EXAMPLES:{Style.RESET_ALL} python twitch-archive.py # Monitor all enabled streamers python twitch-archive.py -u vinesauce # Monitor only vinesauce python twitch-archive.py -u hackerling --verbose # Monitor with debug output python twitch-archive.py -u streamername --chat-only # Test chat download only (no video) python twitch-archive.py --use-chat-downloader-primary # Test chat_downloader library python twitch-archive.py --legacy # Use old config.json mode {Fore.CYAN}{"=" * 70}{Style.RESET_ALL} ''' try: opts, args = getopt.getopt( argv, "h:u:q:a:v:c:m:r:d:n:", ["help", "username=", "quality=", "ttv-lol=", "vod=", "chat=", "metadata=", "upload=", "delete=", "notifications=", "legacy", "verbose", "chat-only", "healthcheck", "rclone-smoke-test", "use-chat-downloader-primary", "no-chat-downloader-fallback"] ) except getopt.GetoptError as e: print(f'{Fore.RED}Error: {e}{Style.RESET_ALL}\n') print(help_msg) sys.exit(2) # Check if legacy mode is requested or if config.json exists (fallback) legacy_config_exists = os.path.exists(os.path.join(os.path.dirname(__file__), 'config.json')) # Parse command line args legacy_overrides = {} verbose_mode = False chat_only_mode = False use_chat_downloader_primary = False use_chat_downloader_fallback = True # Default to enabled for opt, arg in opts: if opt in ('-h', '--help'): print(help_msg) sys.exit(0) elif opt in ("-u", "--username"): specific_streamer = arg elif opt == "--verbose": verbose_mode = True elif opt == "--chat-only": chat_only_mode = True verbose_mode = True # Auto-enable verbose for chat-only mode elif opt == "--healthcheck": healthcheck_mode = True elif opt == "--rclone-smoke-test": rclone_smoke_test_mode = True elif opt == "--legacy": use_legacy_mode = True elif opt == "--use-chat-downloader-primary": use_chat_downloader_primary = True elif opt == "--no-chat-downloader-fallback": use_chat_downloader_fallback = False if opt in ('-h', '--help'): print(help_msg) sys.exit(0) elif opt in ("-u", "--username"): specific_streamer = arg elif opt == "--verbose": verbose_mode = True elif opt == "--legacy": use_legacy_mode = True # Legacy options (only used in legacy mode) elif opt in ("-q", "--quality"): legacy_overrides['quality'] = arg elif opt in ("-a", "--ttv-lol"): legacy_overrides['streamlink_ttvlol'] = bool(int(arg)) elif opt in ("-v", "--vod"): legacy_overrides['downloadVOD'] = bool(int(arg)) elif opt in ("-c", "--chat"): legacy_overrides['downloadCHAT'] = bool(int(arg)) elif opt in ("-m", "--metadata"): legacy_overrides['downloadMETADATA'] = bool(int(arg)) elif opt in ("-r", "--upload"): legacy_overrides['uploadCloud'] = bool(int(arg)) elif opt in ("-d", "--delete"): legacy_overrides['deleteFiles'] = bool(int(arg)) elif opt in ("-n", "--notifications"): legacy_overrides['notifications'] = bool(int(arg)) if rclone_smoke_test_mode: sys.exit(run_rclone_smoke_test(specific_streamer)) if healthcheck_mode: sys.exit(run_healthcheck(specific_streamer)) # Determine which mode to use if use_legacy_mode or (legacy_config_exists and not specific_streamer and not os.path.exists('config/global.json')): # Legacy mode: single streamer using config.json print(f'{Fore.YELLOW}⚠ Using legacy mode (config.json){Style.RESET_ALL}') print(f'{Fore.CYAN}→ Consider migrating to new config structure (config/global.json + config/streamers/*.json){Style.RESET_ALL}\n') twitch_archive = TwitchArchive() # Loads from config.json # Apply command-line overrides for key, value in legacy_overrides.items(): setattr(twitch_archive, key, value) # Apply chat_downloader options if hasattr(twitch_archive.downloader, 'use_chat_downloader_primary'): twitch_archive.downloader.use_chat_downloader_primary = use_chat_downloader_primary if hasattr(twitch_archive.downloader, 'use_chat_downloader_fallback'): twitch_archive.downloader.use_chat_downloader_fallback = use_chat_downloader_fallback # Start the archive system twitch_archive.run() else: # New multi-streamer mode manager = TwitchArchiveManager( specific_streamer=specific_streamer, verbose=verbose_mode, chat_only=chat_only_mode, use_chat_downloader_primary=use_chat_downloader_primary, use_chat_downloader_fallback=use_chat_downloader_fallback ) manager.run() if __name__ == "__main__": try: main(sys.argv[1:]) except KeyboardInterrupt: # Suppress stack trace for clean exit print(f'\n{Fore.GREEN}✓ Graceful shutdown complete{Style.RESET_ALL}') sys.exit(0)