TwitchDownloader/modules/recorder.py
MaddoScientisto f97e0200d6 Refactor downloader and file manager for improved rclone integration and add healthcheck and smoke test options
- Renamed download flags in ContentDownloader for clarity.
- Enhanced FileManager with methods to build upload paths and verify existing files for rclone uploads.
- Updated StreamProcessor to return success status for stream processing.
- Added rclone smoke test and healthcheck functions to validate configuration and tool availability.
- Improved environment variable handling with a utility function.
- Updated TwitchArchive to incorporate new rclone verification and processing logic.
- Added unit tests for new functionality and refactored existing tests for clarity and coverage.

Co-authored-by: Copilot <copilot@github.com>
2026-04-25 11:54:03 +02:00

107 lines
4.3 KiB
Python

"""
Live stream recording functionality for Twitch Archive.
"""
import os
import subprocess
from typing import Dict, Any, Optional
from colorama import Fore, Style
from .utils import get_env_value
class StreamRecorder:
"""Handles live stream recording using streamlink."""
def __init__(self, username: str, quality: str, refresh: float,
hls_segments: int, streamlink_ttvlol: bool, shutdown_callback=None):
"""
Initialize the stream recorder.
Args:
username: Twitch username to record
quality: Stream quality (e.g., 'best', '1080p60', 'audio_only')
refresh: Retry interval in seconds
hls_segments: Number of parallel HLS segments to download
streamlink_ttvlol: Enable ttv-lol proxy (deprecated)
shutdown_callback: Callable to check if shutdown was requested
"""
self.username = username
self.quality = quality
self.refresh = refresh
self.hls_segments = hls_segments
self.streamlink_ttvlol = streamlink_ttvlol
self.shutdown_callback = shutdown_callback
self.current_process = None
def record(self, stream_info: Dict[str, Any], output_path: str) -> bool:
"""
Record a live Twitch stream using streamlink.
Args:
stream_info: Stream metadata from Twitch API
output_path: Path where the raw .ts file will be saved
Returns:
bool: True if recording completed normally, False if interrupted
"""
print(f'\n{Fore.CYAN}{"=" * 60}{Style.RESET_ALL}')
print(f'{Fore.GREEN}🔴 STREAM STARTED: {stream_info["title"]}{Style.RESET_ALL}')
print(f'{Fore.CYAN}{"=" * 60}{Style.RESET_ALL}\n')
# Build streamlink command
cmd = [
'streamlink',
f'twitch.tv/{self.username}',
self.quality,
'--hls-live-restart',
'--retry-streams', str(int(self.refresh)),
'--force',
'-o', output_path
]
# Add segment threads for faster downloads (requires streamlink 5.0+)
if self.hls_segments > 1:
cmd.extend(['--stream-segment-threads', str(self.hls_segments)])
# Add ad-blocking if enabled (deprecated warning)
if self.streamlink_ttvlol:
print(f'{Fore.YELLOW}⚠ Warning: ttv-lol proxy option is deprecated in newer streamlink versions{Style.RESET_ALL}')
print(f'{Fore.YELLOW} Consider disabling streamlink_ttvlol in config or using alternative methods{Style.RESET_ALL}')
# Add authentication if available
oauth_token = get_env_value("OAUTH-PRIVATE-TOKEN", "OAUTH_PRIVATE_TOKEN", default="")
if oauth_token and oauth_token != "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx":
cmd.extend(['--twitch-api-header', f'Authorization=OAuth {oauth_token}'])
# Show command being executed (hide OAuth token for security)
cmd_display = [c if 'OAuth' not in str(c) else 'Authorization=OAuth [HIDDEN]' for c in cmd]
print(f'{Fore.CYAN}Command: {" ".join(cmd_display)}{Style.RESET_ALL}')
# Record the stream (this blocks until stream ends)
print(f'{Fore.YELLOW}Recording stream...{Style.RESET_ALL}')
try:
self.current_process = subprocess.Popen(cmd)
return_code = self.current_process.wait()
self.current_process = None
if self.shutdown_callback and self.shutdown_callback():
print(f'{Fore.YELLOW}✓ Recording stopped by user{Style.RESET_ALL}')
# Return True so processing continues - we still want to process what was recorded
return True
print(f'{Fore.GREEN}✓ Stream recording complete{Style.RESET_ALL}')
return True
except Exception as e:
self.current_process = None
print(f'{Fore.RED}✗ Recording error: {str(e)}{Style.RESET_ALL}')
return False
def stop(self) -> None:
"""Stop the current recording process."""
if self.current_process:
try:
self.current_process.terminate()
print(f'{Fore.YELLOW}Stopping recording process...{Style.RESET_ALL}')
except Exception:
pass