""" Live stream recording functionality for Twitch Archive. """ import os import subprocess from typing import Dict, Any, Optional from colorama import Fore, Style from .utils import get_env_value class StreamRecorder: """Handles live stream recording using streamlink.""" def __init__(self, username: str, quality: str, refresh: float, hls_segments: int, streamlink_ttvlol: bool, shutdown_callback=None): """ Initialize the stream recorder. Args: username: Twitch username to record quality: Stream quality (e.g., 'best', '1080p60', 'audio_only') refresh: Retry interval in seconds hls_segments: Number of parallel HLS segments to download streamlink_ttvlol: Enable ttv-lol proxy (deprecated) shutdown_callback: Callable to check if shutdown was requested """ self.username = username self.quality = quality self.refresh = refresh self.hls_segments = hls_segments self.streamlink_ttvlol = streamlink_ttvlol self.shutdown_callback = shutdown_callback self.current_process = None def record(self, stream_info: Dict[str, Any], output_path: str) -> bool: """ Record a live Twitch stream using streamlink. Args: stream_info: Stream metadata from Twitch API output_path: Path where the raw .ts file will be saved Returns: bool: True if recording completed normally, False if interrupted """ print(f'\n{Fore.CYAN}{"=" * 60}{Style.RESET_ALL}') print(f'{Fore.GREEN}🔴 STREAM STARTED: {stream_info["title"]}{Style.RESET_ALL}') print(f'{Fore.CYAN}{"=" * 60}{Style.RESET_ALL}\n') # Build streamlink command cmd = [ 'streamlink', f'twitch.tv/{self.username}', self.quality, '--hls-live-restart', '--retry-streams', str(int(self.refresh)), '--force', '-o', output_path ] # Add segment threads for faster downloads (requires streamlink 5.0+) if self.hls_segments > 1: cmd.extend(['--stream-segment-threads', str(self.hls_segments)]) # Add ad-blocking if enabled (deprecated warning) if self.streamlink_ttvlol: print(f'{Fore.YELLOW}⚠ Warning: ttv-lol proxy option is deprecated in newer streamlink versions{Style.RESET_ALL}') print(f'{Fore.YELLOW} Consider disabling streamlink_ttvlol in config or using alternative methods{Style.RESET_ALL}') # Add authentication if available oauth_token = get_env_value("OAUTH-PRIVATE-TOKEN", "OAUTH_PRIVATE_TOKEN", default="") if oauth_token and oauth_token != "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx": cmd.extend(['--twitch-api-header', f'Authorization=OAuth {oauth_token}']) # Show command being executed (hide OAuth token for security) cmd_display = [c if 'OAuth' not in str(c) else 'Authorization=OAuth [HIDDEN]' for c in cmd] print(f'{Fore.CYAN}Command: {" ".join(cmd_display)}{Style.RESET_ALL}') # Record the stream (this blocks until stream ends) print(f'{Fore.YELLOW}Recording stream...{Style.RESET_ALL}') try: self.current_process = subprocess.Popen(cmd) return_code = self.current_process.wait() self.current_process = None if self.shutdown_callback and self.shutdown_callback(): print(f'{Fore.YELLOW}✓ Recording stopped by user{Style.RESET_ALL}') # Return True so processing continues - we still want to process what was recorded return True print(f'{Fore.GREEN}✓ Stream recording complete{Style.RESET_ALL}') return True except Exception as e: self.current_process = None print(f'{Fore.RED}✗ Recording error: {str(e)}{Style.RESET_ALL}') return False def stop(self) -> None: """Stop the current recording process.""" if self.current_process: try: self.current_process.terminate() print(f'{Fore.YELLOW}Stopping recording process...{Style.RESET_ALL}') except Exception: pass