TwitchDownloader/twitch-archive.py

1597 lines
No EOL
67 KiB
Python

"""
Twitch Archive - Automated Twitch Stream Recording & Archiving System
This script monitors a Twitch channel and automatically:
- Records live streams as they happen
- Downloads VODs (Video on Demand) after the stream ends
- Downloads and renders chat logs
- Saves stream metadata
- Uploads everything to cloud storage (optional)
Requirements:
- Python 3.7+
- External tools: streamlink, ffmpeg, TwitchDownloaderCLI, rclone (optional)
- Configuration file: config.json (copy from config.sample.json)
- Environment file: .env (for API credentials)
"""
# Standard library imports
import os
import sys
import time
import json
import socket
import smtplib
import pathlib
import subprocess
import getopt
import signal
from typing import Dict, Optional, Any
from datetime import datetime, timedelta
# Third-party imports
import requests
from colorama import Fore, Style
from pytz import timezone
from dotenv import load_dotenv, find_dotenv
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
# ============================================================================
# CONSTANTS - Configuration defaults and magic values
# ============================================================================
# API Endpoints
TWITCH_OAUTH_URL = "https://id.twitch.tv/oauth2/token"
TWITCH_API_URL = "https://api.twitch.tv/helix"
TWITCH_GQL_URL = "https://gql.twitch.tv/gql"
TWITCH_GQL_CLIENT_ID = "kimne78kx3ncx6brgo4mv6wki5h1ko"
# File prefixes for different content types
PREFIX_LIVE = "LIVE_"
PREFIX_VOD = "VOD_"
PREFIX_CHAT = "CHAT_"
PREFIX_METADATA = "METADA_" # Note: keeping original typo for compatibility
# Default configuration values
DEFAULT_CONFIG = {
'username': 'your_twitch_username',
'quality': 'best',
'root_path': 'archive',
'rclone_path': 'remote:path/to/streams',
'refresh': 60.0,
'streamlink_ttvlol': 0,
'notifications': 0,
'downloadMETADATA': 1,
'downloadVOD': 1,
'downloadCHAT': 1,
'downloadLiveCHAT': 1,
'vodTimeout': 300,
'uploadCloud': 1,
'deleteFiles': 0,
'onlyRaw': 0,
'cleanRaw': 1,
'hls_segments': 3,
'hls_segmentsVOD': 10,
# FFmpeg 8.0+ Enhancement Options
'ffmpeg_hwaccel': 'auto', # Hardware acceleration: 'auto', 'nvenc', 'qsv', 'amf', 'vaapi', 'none'
'ffmpeg_threads': 0, # Thread count (0 = auto-detect)
'ffmpeg_audio_codec': 'aac', # Audio codec for audio-only streams
'ffmpeg_audio_samplerate': 48000, # Audio sample rate (48000 recommended for broadcasts)
'ffmpeg_audio_bitrate': '192k', # Audio bitrate
'ffmpeg_error_recovery': 1, # Enable error recovery for corrupted streams (0/1)
'ffmpeg_faststart': 1, # Enable faststart for MP4 (better streaming compatibility) (0/1)
'ffmpeg_progress': 0 # Show encoding progress (0/1)
}
# ============================================================================
# MAIN CLASS
# ============================================================================
class TwitchArchive:
"""
Main class for the Twitch Archive system.
Handles monitoring a Twitch channel, recording live streams, and downloading
VODs, chat logs, and metadata. Can optionally upload to cloud storage.
"""
def __init__(self):
"""Initialize the TwitchArchive with configuration settings."""
self.load_config()
self.os = self._detect_operating_system()
self.paths_initialized = False
self.shutdown_requested = False
self.current_process = None
self.current_stream_data = {}
def load_config(self) -> None:
"""
Load configuration from config.json file.
Falls back to default configuration if file is not found or cannot be read.
Filters out comment fields (starting with '_') from the config.
"""
config_file = os.path.join(os.path.dirname(__file__), 'config.json')
# Start with default configuration
config = DEFAULT_CONFIG.copy()
# Try to load and merge user configuration
if os.path.exists(config_file):
try:
with open(config_file, 'r', encoding='utf-8') as f:
user_config = json.load(f)
# Filter out comment fields (those starting with '_')
user_config = {k: v for k, v in user_config.items() if not k.startswith('_')}
# Merge user config with defaults (user config takes precedence)
config.update(user_config)
print(f'{Fore.GREEN}✓ Configuration loaded from config.json{Style.RESET_ALL}')
except json.JSONDecodeError as e:
print(f'{Fore.YELLOW}⚠ Warning: Invalid JSON in config.json: {e}{Style.RESET_ALL}')
print(f'{Fore.YELLOW} Using default configuration{Style.RESET_ALL}')
except Exception as e:
print(f'{Fore.YELLOW}⚠ Warning: Could not load config.json: {e}{Style.RESET_ALL}')
print(f'{Fore.YELLOW} Using default configuration{Style.RESET_ALL}')
else:
print(f'{Fore.YELLOW}⚠ Warning: config.json not found{Style.RESET_ALL}')
print(f'{Fore.CYAN} → Copy config.sample.json to config.json and edit it with your settings{Style.RESET_ALL}')
# Set all configuration values as instance attributes
for key, value in config.items():
setattr(self, key, value)
def _detect_operating_system(self) -> str:
"""
Detect the current operating system.
Returns:
str: 'windows' or 'linux'
Raises:
SystemExit: If OS is not supported
"""
if sys.platform.startswith('win32'):
return 'windows'
elif sys.platform.startswith('linux'):
return 'linux'
else:
print(f'{Fore.RED}✗ ERROR: Unsupported operating system: {sys.platform}{Style.RESET_ALL}')
print(f'{Fore.YELLOW} This script only supports Windows and Linux{Style.RESET_ALL}')
sys.exit(1)
def _initialize_paths(self) -> None:
"""
Initialize all directory paths needed for archiving.
Creates the directory structure:
- root_path/username/video/raw/ (for raw .ts files)
- root_path/username/video/ (for processed videos)
- root_path/username/chat/json/ (for chat JSON files)
- root_path/username/chat/ (for rendered chat videos)
- root_path/username/metadata/ (for stream metadata)
"""
# Convert all paths to absolute paths
self.raw_path = pathlib.Path(self.root_path, self.username, "video", "raw").absolute()
self.video_path = pathlib.Path(self.root_path, self.username, "video").absolute()
self.chatJSON_path = pathlib.Path(self.root_path, self.username, "chat", "json").absolute()
self.chatMP4_path = pathlib.Path(self.root_path, self.username, "chat").absolute()
self.metadata_path = pathlib.Path(self.root_path, self.username, "metadata").absolute()
# Create directories if they don't exist
for path in [self.raw_path, self.video_path, self.chatJSON_path,
self.chatMP4_path, self.metadata_path]:
path.mkdir(parents=True, exist_ok=True)
# Create log file if it doesn't exist
log_file = pathlib.Path(self.root_path, ".log")
if not log_file.exists():
log_file.touch()
self.paths_initialized = True
def _load_environment_variables(self) -> None:
"""
Load environment variables from .env file.
Required variables:
- CLIENT-ID: Twitch API client ID
- CLIENT-SECRET: Twitch API client secret
- OAUTH-PRIVATE-TOKEN: Optional, for accessing subscriber-only streams
- SENDER: Email address for notifications (if enabled)
- RECEIVER: Email address to receive notifications (if enabled)
- PASSWD: Email password for sending notifications (if enabled)
Raises:
SystemExit: If .env file is not found
"""
if not load_dotenv(find_dotenv()):
print(f'{Fore.RED}✗ ERROR: .env file not found{Style.RESET_ALL}')
print(f'{Fore.CYAN} → Create a .env file with your Twitch API credentials{Style.RESET_ALL}')
print(f'{Fore.CYAN} → Required: CLIENT-ID, CLIENT-SECRET{Style.RESET_ALL}')
sys.exit(1)
def _print_configuration_summary(self) -> None:
"""Print a summary of the current configuration to the console."""
print(f'\n{Fore.CYAN}{"=" * 60}{Style.RESET_ALL}')
print(f'{Fore.CYAN}TWITCH ARCHIVE - Configuration Summary{Style.RESET_ALL}')
print(f'{Fore.CYAN}{"=" * 60}{Style.RESET_ALL}\n')
# Basic settings
print(f'Streamer: {Fore.GREEN}{self.username}{Style.RESET_ALL}')
print(f'Quality: {Fore.GREEN}{self.quality}{Style.RESET_ALL}')
print(f'Storage: {Fore.GREEN}{pathlib.Path(self.root_path).resolve()}{Style.RESET_ALL}')
print(f'Refresh rate: {Fore.GREEN}{self.refresh}s{Style.RESET_ALL}\n')
# Feature toggles
self._print_toggle('Email notifications', self.notifications)
self._print_toggle('Metadata download', self.downloadMETADATA)
self._print_toggle('VOD download', self.downloadVOD)
self._print_toggle('Chat download & render', self.downloadCHAT)
self._print_toggle('Cloud upload', self.uploadCloud)
# Warning messages
if self.deleteFiles == 1:
print(f'\n{Fore.RED}⚠ WARNING: Files will be DELETED after processing{Style.RESET_ALL}')
if self.uploadCloud == 0:
print(f'{Fore.RED}⚠ CRITICAL: Files will be deleted WITHOUT cloud backup!{Style.RESET_ALL}')
print(f'{Fore.YELLOW} Press CTRL+C to stop and change configuration{Style.RESET_ALL}')
else:
print(f'\n{Fore.GREEN}✓ Files will be preserved locally{Style.RESET_ALL}')
print(f'\n{Fore.CYAN}{"=" * 60}{Style.RESET_ALL}\n')
def _print_toggle(self, label: str, value: int) -> None:
"""Helper method to print a configuration toggle in a consistent format."""
status = f'{Fore.GREEN}Enabled{Style.RESET_ALL}' if value == 1 else f'{Fore.RED}Disabled{Style.RESET_ALL}'
print(f'{label}: {status}')
def run(self) -> None:
"""
Main entry point for the application.
Initializes environment, validates configuration, creates necessary
directories, and starts the monitoring loop.
"""
# Load environment variables
self._load_environment_variables()
# Validate username
self._validate_username()
# Initialize directory structure
self._initialize_paths()
# Verify streamlink is available
self._verify_dependencies()
# Print configuration summary
self._print_configuration_summary()
# Start monitoring
print(f"Monitoring {Fore.GREEN}{self.username}{Style.RESET_ALL} every {Fore.GREEN}{self.refresh}s{Style.RESET_ALL}")
self.send_notification("TWITCH ARCHIVE STARTED",
f"Monitoring {self.username} every {self.refresh} seconds.")
# Begin the main monitoring loop
self.loopcheck()
def _get_oauth_token(self) -> str:
"""
Get OAuth token from Twitch API.
Uses CLIENT-ID and CLIENT-SECRET from environment variables.
Returns:
str: OAuth access token
Raises:
SystemExit: If authentication fails
"""
try:
url = f"{TWITCH_OAUTH_URL}?client_id={os.getenv('CLIENT-ID')}&client_secret={os.getenv('CLIENT-SECRET')}&grant_type=client_credentials"
response = requests.post(url, timeout=15)
response.raise_for_status()
return response.json()['access_token']
except requests.exceptions.RequestException as e:
print(f'{Fore.RED}✗ ERROR: Failed to authenticate with Twitch API{Style.RESET_ALL}')
print(f'{Fore.YELLOW} {str(e)}{Style.RESET_ALL}')
print(f'{Fore.CYAN} → Check your CLIENT-ID and CLIENT-SECRET in the .env file{Style.RESET_ALL}')
sys.exit(1)
except KeyError:
print(f'{Fore.RED}✗ ERROR: Invalid response from Twitch API{Style.RESET_ALL}')
print(f'{Fore.CYAN} → Verify your CLIENT-ID and CLIENT-SECRET are correct{Style.RESET_ALL}')
sys.exit(1)
def _validate_username(self) -> None:
"""
Validate that the configured Twitch username exists.
Raises:
SystemExit: If username is invalid or doesn't exist
"""
try:
url = f'{TWITCH_API_URL}/users?login={self.username}'
headers = {
"Authorization": f"Bearer {self._get_oauth_token()}",
"Client-ID": os.getenv('CLIENT-ID')
}
response = requests.get(url, headers=headers, timeout=15)
response.raise_for_status()
data = response.json()
if not data.get('data'):
print(f'{Fore.RED}✗ ERROR: Twitch user "{self.username}" not found{Style.RESET_ALL}')
print(f'{Fore.CYAN} → Check the username in your config.json file{Style.RESET_ALL}')
sys.exit(1)
print(f'{Fore.GREEN}✓ Username "{self.username}" validated{Style.RESET_ALL}')
except requests.exceptions.RequestException as e:
print(f'{Fore.RED}✗ ERROR: Could not validate username{Style.RESET_ALL}')
print(f'{Fore.YELLOW} {str(e)}{Style.RESET_ALL}')
sys.exit(1)
def _verify_dependencies(self) -> None:
"""
Verify that required external dependencies are available.
Raises:
SystemExit: If required dependencies are not found
"""
# Check for streamlink
try:
result = subprocess.run(['streamlink', '--version'],
capture_output=True,
text=True,
timeout=5)
if result.returncode == 0:
version = result.stdout.strip().split()[1] if len(result.stdout.split()) > 1 else 'unknown'
print(f'{Fore.GREEN}✓ Streamlink v{version} found{Style.RESET_ALL}')
else:
raise FileNotFoundError()
except (FileNotFoundError, subprocess.TimeoutExpired, IndexError):
print(f'{Fore.RED}✗ ERROR: Streamlink not found{Style.RESET_ALL}')
print(f'{Fore.CYAN} → Install streamlink: pip install streamlink{Style.RESET_ALL}')
print(f'{Fore.CYAN} → Or download from: https://streamlink.github.io/{Style.RESET_ALL}')
sys.exit(1)
# Check for ffmpeg
try:
ffmpeg_path = self._get_ffmpeg_executable()
if os.path.exists(ffmpeg_path):
print(f'{Fore.GREEN}✓ FFmpeg found at {ffmpeg_path}{Style.RESET_ALL}')
else:
print(f'{Fore.YELLOW}⚠ Warning: FFmpeg not found at {ffmpeg_path}{Style.RESET_ALL}')
print(f'{Fore.YELLOW} → Download FFmpeg and place it in the bin/ folder{Style.RESET_ALL}')
except Exception as e:
print(f'{Fore.YELLOW}⚠ Warning: Could not verify FFmpeg: {e}{Style.RESET_ALL}')
# Check for TwitchDownloaderCLI (if VOD or Chat download enabled)
if self.downloadVOD == 1 or self.downloadCHAT == 1:
try:
downloader_path = self._get_twitch_downloader_executable()
if os.path.exists(downloader_path):
print(f'{Fore.GREEN}✓ TwitchDownloaderCLI found{Style.RESET_ALL}')
else:
print(f'{Fore.YELLOW}⚠ Warning: TwitchDownloaderCLI not found at {downloader_path}{Style.RESET_ALL}')
print(f'{Fore.YELLOW} → Download from: https://github.com/lay295/TwitchDownloader/releases{Style.RESET_ALL}')
except Exception as e:
print(f'{Fore.YELLOW}⚠ Warning: Could not verify TwitchDownloaderCLI: {e}{Style.RESET_ALL}')
def _check_stream_status(self) -> Optional[Dict[str, Any]]:
"""
Check if the configured user is currently live.
Returns:
dict: Stream information if live, None if offline
Raises:
SystemExit: If API request fails
"""
query = f'query{{user(login: "{self.username}") {{stream{{archiveVideo{{id}}title createdAt}}}}}}'
try:
response = requests.post(
TWITCH_GQL_URL,
json={'query': query},
headers={"Client-ID": TWITCH_GQL_CLIENT_ID},
timeout=15
)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
print(f'{Fore.RED}✗ ERROR: Failed to check stream status{Style.RESET_ALL}')
print(f'{Fore.YELLOW} {str(e)}{Style.RESET_ALL}')
sys.exit(1)
def _get_latest_vod(self) -> Optional[Dict[str, Any]]:
"""
Get the most recent VOD for the configured user.
Returns:
dict: VOD information, or None if no VODs found
"""
query = f'query {{user(login: "{self.username}") {{videos(first: 1) {{edges {{node {{id title description recordedAt lengthSeconds animatedPreviewURL previewThumbnailURL(height: 1280, width: 720) thumbnailURLs(height: 1280, width: 720)}}}}}}}}}}'
try:
response = requests.post(
TWITCH_GQL_URL,
json={'query': query},
headers={"Client-ID": TWITCH_GQL_CLIENT_ID},
timeout=15
)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
print(f'{Fore.YELLOW}⚠ Warning: Could not fetch latest VOD{Style.RESET_ALL}')
print(f'{Fore.YELLOW} {str(e)}{Style.RESET_ALL}')
return None
def _get_unique_filename(self, filepath: str) -> str:
"""
Generate a unique filename by appending a counter if file already exists.
Args:
filepath: The desired file path
Returns:
str: A unique file path (original or with _N suffix)
Example:
If 'video.mp4' exists, returns 'video_1.mp4'
If 'video_1.mp4' also exists, returns 'video_2.mp4'
"""
if not os.path.exists(filepath):
return filepath
# Split into components
directory = os.path.dirname(filepath)
filename = os.path.basename(filepath)
name, ext = os.path.splitext(filename)
# Find next available counter
counter = 1
while True:
new_filepath = os.path.join(directory, f"{name}_{counter}{ext}")
if not os.path.exists(new_filepath):
return new_filepath
counter += 1
def send_notification(self, subject: str, content: str) -> None:
"""
Send email notification via Gmail SMTP.
Only sends if notifications are enabled in configuration.
Requires SENDER, RECEIVER, and PASSWD in .env file.
Args:
subject: Email subject line
content: Email body content
"""
if self.notifications != 1:
return
try:
sender = os.getenv("SENDER")
receiver = os.getenv("RECEIVER")
password = os.getenv("PASSWD")
if not all([sender, receiver, password]):
print(f'{Fore.YELLOW}⚠ Notification skipped: Missing email credentials in .env{Style.RESET_ALL}')
return
# Construct email
msg = MIMEMultipart()
msg['From'] = sender
msg['To'] = receiver
msg['Subject'] = f"{self.username} - {subject}"
body = f"Stream: {self.username}\n\n{content}"
msg.attach(MIMEText(body, 'plain'))
# Send via Gmail SMTP
with smtplib.SMTP('smtp.gmail.com', 587) as server:
server.starttls()
server.login(sender, password)
server.sendmail(sender, receiver, msg.as_string())
except socket.error as e:
print(f'{Fore.YELLOW}⚠ Notification failed: {str(e)}{Style.RESET_ALL}')
except Exception as e:
print(f'{Fore.YELLOW}⚠ Notification error: {str(e)}{Style.RESET_ALL}')
def _is_stream_already_processed(self, stream_id: str) -> bool:
"""
Check if a stream has already been processed.
Args:
stream_id: Unique identifier for the stream
Returns:
bool: True if already processed, False otherwise
"""
log_file = pathlib.Path(self.root_path, ".log")
with open(log_file, 'r', encoding='utf-8') as f:
return stream_id in f.read()
def _mark_stream_as_processed(self, stream_id: str) -> None:
"""Add stream to log file to prevent re-processing."""
log_file = pathlib.Path(self.root_path, ".log")
with open(log_file, 'a', encoding='utf-8') as f:
f.write(f"{stream_id}\n")
def _get_bin_path(self) -> str:
"""Get the path to the bin directory containing external tools."""
return str(pathlib.Path(__file__).parent.resolve() / "bin")
def _get_ffmpeg_executable(self) -> str:
"""Get the platform-specific ffmpeg executable path."""
bin_path = self._get_bin_path()
if self.os == 'windows':
return os.path.join(bin_path, 'ffmpeg.exe')
return os.path.join(bin_path, 'ffmpeg')
def _get_twitch_downloader_executable(self) -> str:
"""Get the platform-specific TwitchDownloaderCLI executable path."""
bin_path = self._get_bin_path()
if self.os == 'windows':
return os.path.join(bin_path, 'TwitchDownloaderCLI.exe')
return os.path.join(bin_path, 'TwitchDownloaderCLI')
def _detect_hardware_acceleration(self) -> Optional[str]:
"""
Detect available hardware acceleration based on config and system.
Returns:
str: Hardware acceleration type ('nvenc', 'qsv', 'amf', 'vaapi', 'none') or None
"""
hwaccel_config = getattr(self, 'ffmpeg_hwaccel', 'auto')
# If user explicitly set to 'none', disable hardware acceleration
if hwaccel_config == 'none':
return 'none'
# If user specified a particular type, use it
if hwaccel_config in ['nvenc', 'qsv', 'amf', 'vaapi']:
return hwaccel_config
# Auto-detect: try to determine available hardware
if hwaccel_config == 'auto':
# On Windows, NVIDIA is most common
if self.os == 'windows':
# Could check for nvidia-smi, but just return 'auto' for ffmpeg to decide
return 'auto'
else:
# On Linux, VAAPI is common for Intel/AMD, or NVENC for NVIDIA
# Let ffmpeg auto-detect
return 'auto'
return None
def _record_livestream(self, stream_info: Dict[str, Any], output_path: str) -> bool:
"""
Record a live Twitch stream using streamlink.
Args:
stream_info: Stream metadata from Twitch API
output_path: Path where the raw .ts file will be saved
Returns:
bool: True if recording completed normally, False if interrupted
"""
print(f'\n{Fore.CYAN}{"=" * 60}{Style.RESET_ALL}')
print(f'{Fore.GREEN}🔴 STREAM STARTED: {stream_info["title"]}{Style.RESET_ALL}')
print(f'{Fore.CYAN}{"=" * 60}{Style.RESET_ALL}\n')
# Build streamlink command
cmd = [
'streamlink',
f'twitch.tv/{self.username}',
self.quality,
'--hls-live-restart',
'--retry-streams', str(int(self.refresh)),
'--force',
'-o', output_path
]
# Add segment threads for faster downloads (requires streamlink 5.0+)
# This allows multiple segments to be downloaded in parallel
if self.hls_segments > 1:
cmd.extend(['--stream-segment-threads', str(self.hls_segments)])
# Add ad-blocking if enabled (Note: twitch-proxy-playlist was removed in newer streamlink versions)
# For ad-blocking, you may need to use alternative methods like --twitch-low-latency
# or rely on Twitch's own ad-free viewing for subscribers
if self.streamlink_ttvlol == 1:
# The old --twitch-proxy-playlist option has been removed from streamlink
# Consider using alternative ad-blocking approaches or updating your method
print(f'{Fore.YELLOW}⚠ Warning: ttv-lol proxy option is deprecated in newer streamlink versions{Style.RESET_ALL}')
print(f'{Fore.YELLOW} Consider disabling streamlink_ttvlol in config or using alternative methods{Style.RESET_ALL}')
# Add authentication if available
oauth_token = os.getenv("OAUTH-PRIVATE-TOKEN", "")
if oauth_token and oauth_token != "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx":
cmd.extend(['--twitch-api-header', f'Authorization=OAuth {oauth_token}'])
# Show command being executed (hide OAuth token for security)
cmd_display = [c if 'OAuth' not in str(c) else 'Authorization=OAuth [HIDDEN]' for c in cmd]
print(f'{Fore.CYAN}Command: {" ".join(cmd_display)}{Style.RESET_ALL}')
# Record the stream (this blocks until stream ends)
print(f'{Fore.YELLOW}Recording stream...{Style.RESET_ALL}')
try:
self.current_process = subprocess.Popen(cmd)
return_code = self.current_process.wait()
self.current_process = None
if self.shutdown_requested:
print(f'{Fore.YELLOW}✓ Recording stopped by user{Style.RESET_ALL}')
return False
print(f'{Fore.GREEN}✓ Stream recording complete{Style.RESET_ALL}')
return True
except Exception as e:
self.current_process = None
print(f'{Fore.RED}✗ Recording error: {str(e)}{Style.RESET_ALL}')
return False
def _process_raw_stream(self, raw_path: str, output_path: str) -> None:
"""
Process raw .ts file into mp4/mp3 using ffmpeg.
Args:
raw_path: Path to the raw .ts file
output_path: Path for the processed output file
"""
if not os.path.exists(raw_path):
print(f'{Fore.YELLOW}⚠ Raw file not found, skipping processing{Style.RESET_ALL}')
return
if self.onlyRaw == 1:
print(f'{Fore.CYAN}Keeping raw .ts file (onlyRaw mode){Style.RESET_ALL}')
return
print(f'{Fore.YELLOW}Processing raw stream file...{Style.RESET_ALL}')
# Build ffmpeg command based on quality
if self.quality == 'audio_only':
# Audio-only conversion with modern AAC encoding
cmd = [
self._get_ffmpeg_executable(),
'-i', raw_path,
'-vn', # No video
'-c:a', self.ffmpeg_audio_codec, # Audio codec (AAC recommended)
'-ar', str(self.ffmpeg_audio_samplerate), # Audio sample rate
'-ac', '2', # Audio channels (stereo)
'-b:a', self.ffmpeg_audio_bitrate, # Audio bitrate
]
# Add threading for faster encoding
if self.ffmpeg_threads > 0:
cmd.extend(['-threads', str(self.ffmpeg_threads)])
# Add faststart for better streaming compatibility (MP4/M4A)
if self.ffmpeg_faststart == 1 and output_path.endswith(('.mp4', '.m4a')):
cmd.extend(['-movflags', '+faststart'])
cmd.append(output_path)
else:
# Video conversion with hardware acceleration support
cmd = [
self._get_ffmpeg_executable(),
'-y', # Overwrite output file
]
# Add hardware acceleration if enabled
hwaccel_type = self._detect_hardware_acceleration()
if hwaccel_type and hwaccel_type != 'none':
print(f'{Fore.CYAN}Using hardware acceleration: {hwaccel_type}{Style.RESET_ALL}')
cmd.extend(['-hwaccel', 'auto'])
cmd.extend([
'-i', raw_path,
'-analyzeduration', '2147483647',
'-probesize', '2147483647',
])
# Threading support
if self.ffmpeg_threads >= 0:
cmd.extend(['-threads', str(self.ffmpeg_threads)])
# Error recovery options for corrupted streams
if self.ffmpeg_error_recovery == 1:
cmd.extend([
'-fflags', '+genpts', # Generate missing timestamps
'-avoid_negative_ts', 'make_zero', # Handle timestamp issues
'-err_detect', 'ignore_err' # More tolerant of errors
])
# Stream copy (fast, no re-encoding)
cmd.extend([
'-c:v', 'copy', # Copy video codec
'-c:a', 'copy', # Copy audio codec
'-start_at_zero',
'-copyts',
])
# Add faststart for MP4 files
if self.ffmpeg_faststart == 1 and output_path.endswith('.mp4'):
cmd.extend(['-movflags', '+faststart'])
cmd.append(output_path)
# Run ffmpeg with optional progress output
if self.ffmpeg_progress == 1:
subprocess.call(cmd)
else:
subprocess.call(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT)
print(f'{Fore.GREEN}✓ Stream processed successfully{Style.RESET_ALL}')
def _download_vod(self, vod_info: Dict[str, Any], output_path: str) -> bool:
"""
Download VOD using TwitchDownloaderCLI.
Args:
vod_info: VOD metadata from Twitch API
output_path: Path where the VOD will be saved
Returns:
bool: True if download succeeded, False otherwise
"""
if self.downloadVOD != 1:
return False
print(f'\n{Fore.CYAN}Downloading VOD: {vod_info["title"]}{Style.RESET_ALL}')
# Extract numeric VOD ID (TwitchDownloaderCLI expects just the number)
vod_id = vod_info["id"]
# Remove 'v' prefix if present (API sometimes returns "v123456789")
if isinstance(vod_id, str) and vod_id.startswith('v'):
vod_id = vod_id[1:]
# Build URL format that TwitchDownloaderCLI accepts
vod_url = f"https://www.twitch.tv/videos/{vod_id}"
print(f'{Fore.YELLOW}VOD URL: {vod_url}{Style.RESET_ALL}')
bin_path = self._get_bin_path()
cmd = [
self._get_twitch_downloader_executable(),
'videodownload',
'-u', vod_url,
'-q', self.quality,
'-t', str(self.hls_segmentsVOD),
'--ffmpeg-path', self._get_ffmpeg_executable(),
'--temp-path', os.path.join(bin_path, 'temp'),
'--collision', 'Rename',
'-o', output_path
]
try:
result = subprocess.call(cmd)
if result == 0:
print(f'{Fore.GREEN}✓ VOD downloaded{Style.RESET_ALL}')
return True
else:
print(f'{Fore.RED}✗ VOD download failed with exit code: {result}{Style.RESET_ALL}')
return False
except Exception as e:
print(f'{Fore.RED}✗ VOD download failed: {str(e)}{Style.RESET_ALL}')
self.send_notification('VOD Download Error', f'Failed to download VOD: {str(e)}')
return False
def _download_and_render_chat(self, vod_info: Dict[str, Any], json_path: str, video_path: str) -> bool:
"""
Download chat logs and render them as video.
Args:
vod_info: VOD metadata from Twitch API
json_path: Path to save chat JSON
video_path: Path to save rendered chat video
Returns:
bool: True if succeeded, False otherwise
"""
if self.downloadCHAT != 1:
return False
print(f'\n{Fore.CYAN}Downloading chat: {vod_info["title"]}{Style.RESET_ALL}')
# Extract numeric VOD ID
vod_id = vod_info["id"]
if isinstance(vod_id, str) and vod_id.startswith('v'):
vod_id = vod_id[1:]
bin_path = self._get_bin_path()
downloader = self._get_twitch_downloader_executable()
# Chat rendering settings
chat_settings = [
'--background-color', '#FF111111',
'-w', '500',
'-h', '1080',
'--outline',
'-f', 'Arial',
'--font-size', '22',
'--update-rate', '1.0',
'--offline',
'--ffmpeg-path', self._get_ffmpeg_executable(),
'--temp-path', os.path.join(bin_path, 'temp'),
'--collision', 'Rename'
]
try:
# Download chat JSON
print(f'{Fore.YELLOW}Downloading chat JSON for VOD {vod_id}...{Style.RESET_ALL}')
result = subprocess.call([
downloader, 'chatdownload',
'--id', vod_id,
'--embed-images',
'--collision', 'Rename',
'-o', json_path
])
if result != 0:
print(f'{Fore.RED}✗ Chat JSON download failed with exit code: {result}{Style.RESET_ALL}')
return False
# Verify JSON file was created
if not os.path.exists(json_path):
print(f'{Fore.RED}✗ Chat JSON file was not created{Style.RESET_ALL}')
return False
print(f'{Fore.GREEN}✓ Chat JSON downloaded{Style.RESET_ALL}')
# Render chat video
print(f'{Fore.YELLOW}Rendering chat video...{Style.RESET_ALL}')
result = subprocess.call([
downloader, 'chatrender',
'-i', json_path,
'-o', video_path
] + chat_settings)
if result != 0:
print(f'{Fore.RED}✗ Chat render failed with exit code: {result}{Style.RESET_ALL}')
return False
print(f'{Fore.GREEN}✓ Chat rendered{Style.RESET_ALL}')
return True
except Exception as e:
print(f'{Fore.RED}✗ Chat processing failed: {str(e)}{Style.RESET_ALL}')
self.send_notification('Chat Download Error',
f'Failed to download/render chat: {str(e)}')
return False
def _download_live_chat(self, vod_id: str, json_path: str) -> Optional[subprocess.Popen]:
"""
Start downloading live chat in the background while stream is recording.
Args:
vod_id: The VOD/stream ID to download chat from
json_path: Path to save chat JSON
Returns:
subprocess.Popen: The process handle, or None if failed to start
"""
if self.downloadLiveCHAT != 1:
return None
print(f'\n{Fore.CYAN}Starting live chat download...{Style.RESET_ALL}')
# Remove 'v' prefix if present
if isinstance(vod_id, str) and vod_id.startswith('v'):
vod_id = vod_id[1:]
downloader = self._get_twitch_downloader_executable()
try:
# Start chat download as background process
cmd = [
downloader, 'chatdownload',
'--id', vod_id,
'--embed-images',
'--collision', 'Rename',
'-o', json_path
]
print(f'{Fore.YELLOW}Live chat download started in background for VOD {vod_id}{Style.RESET_ALL}')
process = subprocess.Popen(
cmd,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL
)
return process
except Exception as e:
print(f'{Fore.RED}✗ Failed to start live chat download: {str(e)}{Style.RESET_ALL}')
return None
def _wait_for_chat_download(self, process: Optional[subprocess.Popen], json_path: str) -> bool:
"""
Download chat logs and render them as video.
Args:
vod_info: VOD metadata from Twitch API
json_path: Path to save chat JSON
video_path: Path to save rendered chat video
Returns:
bool: True if succeeded, False otherwise
"""
if self.downloadCHAT != 1:
return False
print(f'\n{Fore.CYAN}Downloading chat: {vod_info["title"]}{Style.RESET_ALL}')
# Extract numeric VOD ID
vod_id = vod_info["id"]
if isinstance(vod_id, str) and vod_id.startswith('v'):
vod_id = vod_id[1:]
bin_path = self._get_bin_path()
downloader = self._get_twitch_downloader_executable()
# Chat rendering settings
chat_settings = [
'--background-color', '#FF111111',
'-w', '500',
'-h', '1080',
'--outline',
'-f', 'Arial',
'--font-size', '22',
'--update-rate', '1.0',
'--offline',
'--ffmpeg-path', self._get_ffmpeg_executable(),
'--temp-path', os.path.join(bin_path, 'temp'),
'--collision', 'Rename'
]
try:
# Download chat JSON
print(f'{Fore.YELLOW}Downloading chat JSON for VOD {vod_id}...{Style.RESET_ALL}')
result = subprocess.call([
downloader, 'chatdownload',
'--id', vod_id,
'--embed-images',
'--collision', 'Rename',
'-o', json_path
])
if result != 0:
print(f'{Fore.RED}✗ Chat JSON download failed with exit code: {result}{Style.RESET_ALL}')
return False
# Verify JSON file was created
if not os.path.exists(json_path):
print(f'{Fore.RED}✗ Chat JSON file was not created{Style.RESET_ALL}')
return False
print(f'{Fore.GREEN}✓ Chat JSON downloaded{Style.RESET_ALL}')
# Render chat video
print(f'{Fore.YELLOW}Rendering chat video...{Style.RESET_ALL}')
result = subprocess.call([
downloader, 'chatrender',
'-i', json_path,
'-o', video_path
] + chat_settings)
if result != 0:
print(f'{Fore.RED}✗ Chat render failed with exit code: {result}{Style.RESET_ALL}')
return False
print(f'{Fore.GREEN}✓ Chat rendered{Style.RESET_ALL}')
return True
except Exception as e:
print(f'{Fore.RED}✗ Chat processing failed: {str(e)}{Style.RESET_ALL}')
self.send_notification('Chat Download Error',
f'Failed to download/render chat: {str(e)}')
return False
def _wait_for_chat_download(self, process: Optional[subprocess.Popen], json_path: str) -> bool:
"""
Wait for live chat download process to complete.
Args:
process: The chat download process handle
json_path: Path where chat JSON should be saved
Returns:
bool: True if chat download succeeded, False otherwise
"""
if process is None:
return False
try:
print(f'{Fore.YELLOW}Waiting for live chat download to complete...{Style.RESET_ALL}')
return_code = process.wait(timeout=300) # 5 minute timeout
if return_code == 0 and os.path.exists(json_path):
print(f'{Fore.GREEN}✓ Live chat JSON downloaded{Style.RESET_ALL}')
return True
else:
print(f'{Fore.RED}✗ Live chat download failed (exit code: {return_code}){Style.RESET_ALL}')
return False
except subprocess.TimeoutExpired:
print(f'{Fore.YELLOW}⚠ Live chat download timed out, terminating...{Style.RESET_ALL}')
process.terminate()
return False
except Exception as e:
print(f'{Fore.RED}✗ Error waiting for chat download: {str(e)}{Style.RESET_ALL}')
return False
def _render_chat(self, json_path: str, video_path: str) -> bool:
"""
Render chat JSON as a video.
Args:
json_path: Path to chat JSON file
video_path: Path to save rendered chat video
Returns:
bool: True if succeeded, False otherwise
"""
if not os.path.exists(json_path):
print(f'{Fore.RED}✗ Chat JSON file not found: {json_path}{Style.RESET_ALL}')
return False
bin_path = self._get_bin_path()
downloader = self._get_twitch_downloader_executable()
# Chat rendering settings
chat_settings = [
'--background-color', '#FF111111',
'-w', '500',
'-h', '1080',
'--outline',
'-f', 'Arial',
'--font-size', '22',
'--update-rate', '1.0',
'--offline',
'--ffmpeg-path', self._get_ffmpeg_executable(),
'--temp-path', os.path.join(bin_path, 'temp'),
'--collision', 'Rename'
]
try:
print(f'{Fore.YELLOW}Rendering chat video...{Style.RESET_ALL}')
result = subprocess.call([
downloader, 'chatrender',
'-i', json_path,
'-o', video_path
] + chat_settings)
if result != 0:
print(f'{Fore.RED}✗ Chat render failed with exit code: {result}{Style.RESET_ALL}')
return False
print(f'{Fore.GREEN}✓ Chat rendered{Style.RESET_ALL}')
return True
except Exception as e:
print(f'{Fore.RED}✗ Chat rendering failed: {str(e)}{Style.RESET_ALL}')
return False
def _save_metadata(self, vod_info: Dict[str, Any], filename_base: str) -> None:
"""
Save VOD metadata to JSON file.
Args:
vod_info: VOD metadata from Twitch API
filename_base: Base filename (without extension)
"""
if self.downloadMETADATA != 1:
return
metadata_path = os.path.join(self.metadata_path, f"{PREFIX_METADATA}{filename_base}.json")
with open(metadata_path, 'w', encoding='utf-8') as f:
json.dump(vod_info, f, ensure_ascii=False, indent=4)
print(f'{Fore.GREEN}✓ Metadata saved{Style.RESET_ALL}')
def _signal_handler(self, signum, frame):
"""Handle interrupt signals gracefully."""
if not self.shutdown_requested:
print(f'\n{Fore.YELLOW}{"=" * 60}{Style.RESET_ALL}')
print(f'{Fore.YELLOW}⚠ Shutdown requested. Stopping downloads and finalizing...{Style.RESET_ALL}')
print(f'{Fore.YELLOW}{"=" * 60}{Style.RESET_ALL}\n')
self.shutdown_requested = True
# Stop current subprocess if running
if self.current_process:
try:
self.current_process.terminate()
print(f'{Fore.YELLOW}Stopping current download process...{Style.RESET_ALL}')
except Exception:
pass
def _interruptible_sleep(self, seconds: float) -> bool:
"""
Sleep for the specified duration, but check for shutdown periodically.
Args:
seconds: Number of seconds to sleep
Returns:
bool: True if sleep completed, False if interrupted by shutdown
"""
start_time = time.time()
while time.time() - start_time < seconds:
if self.shutdown_requested:
return False
time.sleep(min(1.0, seconds - (time.time() - start_time)))
return True
def loopcheck(self) -> None:
"""
Main monitoring loop.
Continuously checks if the streamer is live, and when they are:
1. Records the live stream
2. Downloads the VOD
3. Downloads and renders chat
4. Uploads everything to cloud storage (if enabled)
5. Optionally deletes local files after upload
"""
# Set up signal handlers for graceful shutdown
signal.signal(signal.SIGINT, self._signal_handler)
# SIGTERM is not available on Windows, handle gracefully
if hasattr(signal, 'SIGTERM'):
signal.signal(signal.SIGTERM, self._signal_handler)
while not self.shutdown_requested:
try:
# Check stream status
response = self._check_stream_status()
is_live = response['data']['user']['stream']
# Stream is offline
if is_live is None:
print(f'{Fore.CYAN}{self.username} is offline. Checking again in {self.refresh}s...{Style.RESET_ALL}', end='\r')
if self.shutdown_requested:
break
self._interruptible_sleep(self.refresh)
continue
# Stream is live but not ready yet
if not is_live.get('title'):
print(f'{Fore.YELLOW}⚠ Stream detected but no title yet. Waiting...{Style.RESET_ALL}')
if self.shutdown_requested:
break
self._interruptible_sleep(self.refresh)
continue
# Stream is live and ready!
print(f'\n{Fore.GREEN}{self.username} is LIVE!{Style.RESET_ALL}')
print(f'{Fore.CYAN}Title: {is_live["title"]}{Style.RESET_ALL}')
# Create unique stream identifier based on stream start time
stream_id = f"{is_live['createdAt']} - {self.username} - {is_live['title']}"
# Parse stream start time
live_date = datetime.strptime(
is_live["createdAt"], '%Y-%m-%dT%H:%M:%SZ'
).replace(tzinfo=timezone('UTC')).astimezone(tz=None).replace(tzinfo=None)
# Use CURRENT time for filename to ensure each recording is unique
# This allows recording a live stream multiple times (e.g., if script restarts)
current_time = datetime.now()
filename_base = current_time.strftime('%Y%m%d_%Hh%Mm%Ss')
# Check if we've already recorded this stream session
if self._is_stream_already_processed(stream_id):
print(f'{Fore.YELLOW}⚠ Stream was previously recorded, but it\'s still live!{Style.RESET_ALL}')
print(f'{Fore.GREEN}✓ Starting new recording with timestamp: {filename_base}{Style.RESET_ALL}')
else:
# First time seeing this stream - mark it
self._mark_stream_as_processed(stream_id)
print(f'{Fore.GREEN}✓ New stream detected - starting recording{Style.RESET_ALL}')
# Determine file paths
live_raw_path = os.path.join(self.raw_path, f"{PREFIX_LIVE}{filename_base}.ts")
live_proc_ext = '.mp3' if self.quality == 'audio_only' else '.mp4'
live_proc_path = os.path.join(self.video_path, f"{PREFIX_LIVE}{filename_base}{live_proc_ext}")
# Ensure unique filenames
live_raw_path = self._get_unique_filename(live_raw_path)
live_proc_path = self._get_unique_filename(live_proc_path)
filename_base = os.path.splitext(os.path.basename(live_raw_path))[0].replace(PREFIX_LIVE, "")
print(f'{Fore.CYAN}Output path: {live_raw_path}{Style.RESET_ALL}')
# Send notification
self.send_notification(f'🔴 Stream Started - {filename_base}',
f'Title: {is_live["title"]}')
# Store current stream data for potential graceful shutdown
self.current_stream_data = {
'filename_base': filename_base,
'live_raw_path': live_raw_path,
'live_proc_path': live_proc_path
}
# Start live chat download if enabled and VOD ID is available
live_chat_process = None
chat_json_path = os.path.join(self.chatJSON_path, f"{PREFIX_CHAT}{filename_base}.json")
if self.downloadLiveCHAT == 1 and is_live.get('archiveVideo') and is_live['archiveVideo'].get('id'):
live_vod_id = is_live['archiveVideo']['id']
print(f'{Fore.CYAN}Live VOD ID detected: {live_vod_id}{Style.RESET_ALL}')
live_chat_process = self._download_live_chat(live_vod_id, chat_json_path)
elif self.downloadLiveCHAT == 1:
print(f'{Fore.YELLOW}⚠ No VOD ID available yet for live chat download{Style.RESET_ALL}')
# Record the live stream
recording_completed = self._record_livestream(is_live, live_raw_path)
# If shutdown was requested during recording, try to finalize
if self.shutdown_requested:
print(f'{Fore.YELLOW}Attempting to process any recorded content...{Style.RESET_ALL}')
# Process the raw stream file
self._process_raw_stream(live_raw_path, live_proc_path)
# Wait for live chat download if it was started
live_chat_downloaded = False
if live_chat_process is not None:
live_chat_downloaded = self._wait_for_chat_download(live_chat_process, chat_json_path)
# Render live chat if downloaded successfully
if live_chat_downloaded:
chat_video_path = os.path.join(self.chatMP4_path, f"{PREFIX_CHAT}{filename_base}.mp4")
self._render_chat(chat_json_path, chat_video_path)
# Skip VOD/chat download if shutdown was requested or vodTimeout is 0
vod_response = None
if self.shutdown_requested:
print(f'{Fore.YELLOW}Skipping VOD and chat download due to shutdown request{Style.RESET_ALL}')
elif self.vodTimeout == 0:
print(f'{Fore.CYAN}VOD check disabled (vodTimeout=0). Skipping VOD download.{Style.RESET_ALL}')
else:
# Try to match stream with VOD (with timeout)
print(f'{Fore.CYAN}Waiting for VOD to become available (timeout: {self.vodTimeout}s)...{Style.RESET_ALL}')
vod_found = False
vod_wait_start = time.time()
while time.time() - vod_wait_start < self.vodTimeout and not self.shutdown_requested:
vod_response = self._get_latest_vod()
if vod_response and vod_response['data']['user']['videos']['edges']:
current_vod = vod_response['data']['user']['videos']['edges'][0]['node']
vod_date = datetime.strptime(
current_vod["recordedAt"], '%Y-%m-%dT%H:%M:%SZ'
).replace(tzinfo=timezone('UTC')).astimezone(tz=None).replace(tzinfo=None)
# Check if VOD matches the stream (within 1 minute tolerance)
time_tolerance = timedelta(minutes=1)
if (live_date - time_tolerance) <= vod_date <= (live_date + time_tolerance):
vod_found = True
break
# Wait before checking again
if not vod_found:
print(f'{Fore.CYAN}VOD not found yet, waiting...{Style.RESET_ALL}', end='\r')
if not self._interruptible_sleep(min(10, self.vodTimeout - (time.time() - vod_wait_start))):
break
if not vod_found:
if self.shutdown_requested:
print(f'\n{Fore.YELLOW}VOD check interrupted by shutdown{Style.RESET_ALL}')
else:
print(f'\n{Fore.YELLOW}⚠ VOD not found after {self.vodTimeout}s - streamer may have VODs disabled{Style.RESET_ALL}')
print(f'{Fore.CYAN} → Live recording and chat (if enabled) were saved successfully{Style.RESET_ALL}')
vod_response = None
if not self.shutdown_requested and vod_response and vod_response['data']['user']['videos']['edges']:
current_vod = vod_response['data']['user']['videos']['edges'][0]['node']
vod_date = datetime.strptime(
current_vod["recordedAt"], '%Y-%m-%dT%H:%M:%SZ'
).replace(tzinfo=timezone('UTC')).astimezone(tz=None).replace(tzinfo=None)
# Check if VOD matches the stream (within 1 minute tolerance)
time_tolerance = timedelta(minutes=1)
if (live_date - time_tolerance) <= vod_date <= (live_date + time_tolerance):
print(f'\n{Fore.GREEN}✓ Found matching VOD{Style.RESET_ALL}')
# Save metadata
self._save_metadata(current_vod, filename_base)
# Download VOD
vod_ext = '.mp3' if self.quality == 'audio_only' else '.mp4'
vod_path = os.path.join(self.video_path, f"{PREFIX_VOD}{filename_base}{vod_ext}")
self._download_vod(current_vod, vod_path)
# Download and render chat from VOD (if not already done via live chat)
if not live_chat_downloaded:
chat_video_path = os.path.join(self.chatMP4_path, f"{PREFIX_CHAT}{filename_base}.mp4")
self._download_and_render_chat(current_vod, chat_json_path, chat_video_path)
else:
print(f'{Fore.CYAN}Chat already downloaded from live stream, skipping VOD chat download{Style.RESET_ALL}')
else:
print(f'{Fore.YELLOW}⚠ No matching VOD found for this stream{Style.RESET_ALL}')
# Clean up raw files if configured
if self.cleanRaw == 1 and os.path.exists(live_raw_path):
print(f'{Fore.YELLOW}Deleting raw .ts file...{Style.RESET_ALL}')
os.remove(live_raw_path)
# Upload to cloud if configured
upload_success = self._upload_to_cloud(filename_base)
# Delete local files if configured and upload succeeded
if self.deleteFiles == 1 and upload_success:
self._delete_local_files(filename_base, live_raw_path, live_proc_path)
# Done processing this stream
if self.shutdown_requested:
print(f'\n{Fore.YELLOW}{"=" * 60}{Style.RESET_ALL}')
print(f'{Fore.YELLOW}✓ Stream processing stopped by user{Style.RESET_ALL}')
print(f'{Fore.YELLOW}{"=" * 60}{Style.RESET_ALL}\n')
break
else:
print(f'\n{Fore.GREEN}{"=" * 60}{Style.RESET_ALL}')
print(f'{Fore.GREEN}✓ Stream processing complete{Style.RESET_ALL}')
print(f'{Fore.GREEN}{"=" * 60}{Style.RESET_ALL}\n')
self.send_notification(f'✓ Complete - {filename_base}',
'Stream processing finished. Resuming monitoring.')
self._interruptible_sleep(self.refresh)
except KeyboardInterrupt:
# Additional catch for any other KeyboardInterrupt not handled by signal
if not self.shutdown_requested:
self.shutdown_requested = True
print(f'\n{Fore.YELLOW}{"=" * 60}{Style.RESET_ALL}')
print(f'{Fore.YELLOW}⚠ Interrupted. Cleaning up...{Style.RESET_ALL}')
print(f'{Fore.YELLOW}{"=" * 60}{Style.RESET_ALL}\n')
break
except Exception as e:
print(f'\n{Fore.RED}{"=" * 60}{Style.RESET_ALL}')
print(f'{Fore.RED}✗ ERROR: {str(e)}{Style.RESET_ALL}')
print(f'{Fore.YELLOW}Waiting {self.refresh} seconds before retrying...{Style.RESET_ALL}')
print(f'{Fore.RED}{"=" * 60}{Style.RESET_ALL}\n')
self.send_notification('⚠ Error - Recovery',
f'Error: {str(e)}\nRetrying after {self.refresh} seconds.')
# Check for shutdown during sleep
if self.shutdown_requested:
break
self._interruptible_sleep(self.refresh)
# Final cleanup message
print(f'{Fore.GREEN}✓ Monitoring stopped cleanly{Style.RESET_ALL}')
def _upload_to_cloud(self, filename_base: str) -> bool:
"""
Upload archived files to cloud storage using rclone.
Args:
filename_base: Base filename (without prefixes/extensions)
Returns:
bool: True if upload succeeded or is disabled, False if failed
"""
if self.uploadCloud != 1:
return True # Consider upload "successful" if disabled
print(f'\n{Fore.CYAN}Uploading to cloud storage...{Style.RESET_ALL}')
self.send_notification(f'☁ Uploading - {filename_base}', 'Uploading files to cloud storage')
# Create list of files to upload
bin_path = self._get_bin_path()
upload_list_path = os.path.join(bin_path, 'temp', 'upload.txt')
# Ensure temp directory exists
os.makedirs(os.path.dirname(upload_list_path), exist_ok=True)
files_to_upload = [
f"{PREFIX_LIVE}{filename_base}.ts",
f"{PREFIX_LIVE}{filename_base}.mp4",
f"{PREFIX_LIVE}{filename_base}.mp3",
f"{PREFIX_VOD}{filename_base}.ts",
f"{PREFIX_VOD}{filename_base}.mp4",
f"{PREFIX_VOD}{filename_base}.mp3",
f"{PREFIX_METADATA}{filename_base}.json",
f"{PREFIX_CHAT}{filename_base}.json",
f"{PREFIX_CHAT}{filename_base}.mp4"
]
with open(upload_list_path, 'w') as f:
f.write('\n'.join(files_to_upload))
# Run rclone
try:
result = subprocess.call([
'rclone', 'copy',
str(pathlib.Path(self.root_path).resolve()),
self.rclone_path,
'--include-from', upload_list_path
])
# Clean up upload list
if os.path.exists(upload_list_path):
os.remove(upload_list_path)
if result == 0:
print(f'{Fore.GREEN}✓ Upload complete{Style.RESET_ALL}')
self.send_notification(f'✓ Upload Success - {filename_base}',
'All files uploaded successfully')
return True
else:
print(f'{Fore.RED}✗ Upload failed (exit code: {result}){Style.RESET_ALL}')
print(f'{Fore.YELLOW}Files preserved locally due to upload failure{Style.RESET_ALL}')
self.send_notification(f'✗ Upload Failed - {filename_base}',
f'Upload failed with code {result}. Files preserved locally.')
return False
except Exception as e:
print(f'{Fore.RED}✗ Upload error: {str(e)}{Style.RESET_ALL}')
return False
def _delete_local_files(self, filename_base: str, live_raw_path: str, live_proc_path: str) -> None:
"""
Delete local archive files after successful upload.
Args:
filename_base: Base filename (without prefixes/extensions)
live_raw_path: Path to live raw file
live_proc_path: Path to live processed file
"""
print(f'\n{Fore.RED}{"=" * 60}{Style.RESET_ALL}')
print(f'{Fore.RED}⚠ DELETING LOCAL FILES{Style.RESET_ALL}')
print(f'{Fore.RED}{"=" * 60}{Style.RESET_ALL}\n')
self.send_notification(f'🗑 Deleting - {filename_base}',
'Deleting local files after successful upload')
files_to_delete = []
# Live files
if self.cleanRaw == 0 and os.path.exists(live_raw_path):
files_to_delete.append(live_raw_path)
if os.path.exists(live_proc_path):
files_to_delete.append(live_proc_path)
# VOD files
if self.downloadVOD == 1:
vod_raw = os.path.join(self.raw_path, f"{PREFIX_VOD}{filename_base}.ts")
vod_mp4 = os.path.join(self.video_path, f"{PREFIX_VOD}{filename_base}.mp4")
vod_mp3 = os.path.join(self.video_path, f"{PREFIX_VOD}{filename_base}.mp3")
if self.cleanRaw == 0 and os.path.exists(vod_raw):
files_to_delete.append(vod_raw)
if os.path.exists(vod_mp4):
files_to_delete.append(vod_mp4)
if os.path.exists(vod_mp3):
files_to_delete.append(vod_mp3)
# Chat files
if self.downloadCHAT == 1:
chat_json = os.path.join(self.chatJSON_path, f"{PREFIX_CHAT}{filename_base}.json")
chat_mp4 = os.path.join(self.chatMP4_path, f"{PREFIX_CHAT}{filename_base}.mp4")
if os.path.exists(chat_json):
files_to_delete.append(chat_json)
if os.path.exists(chat_mp4):
files_to_delete.append(chat_mp4)
# Metadata files
if self.downloadMETADATA == 1:
metadata = os.path.join(self.metadata_path, f"{PREFIX_METADATA}{filename_base}.json")
if os.path.exists(metadata):
files_to_delete.append(metadata)
# Delete all files
for filepath in files_to_delete:
try:
print(f'{Fore.RED} Deleting: {os.path.basename(filepath)}{Style.RESET_ALL}')
os.remove(filepath)
except Exception as e:
print(f'{Fore.YELLOW} ⚠ Failed to delete {filepath}: {e}{Style.RESET_ALL}')
print(f'{Fore.RED}\n✓ Cleanup complete{Style.RESET_ALL}')
# ============================================================================
# COMMAND-LINE INTERFACE
# ============================================================================
def main(argv: list) -> None:
"""
Main entry point for command-line execution.
Parses command-line arguments and starts the archive system.
Args:
argv: Command-line arguments
"""
twitch_archive = TwitchArchive()
help_msg = f'''
{Fore.CYAN}{"=" * 70}
TWITCH ARCHIVE - Automated Stream Recording & Archiving
{"=" * 70}{Style.RESET_ALL}
{Fore.GREEN}USAGE:{Style.RESET_ALL}
python twitch-archive.py [OPTIONS]
{Fore.GREEN}OPTIONS:{Style.RESET_ALL}
-h, --help Display this help information
-u, --username <name> Twitch channel username to monitor
-q, --quality <qual> Stream quality: best/source, high/720p,
medium/480p, low/360p, audio_only
-a, --ttv-lol <0|1> Enable ad-blocking (1) or disable (0)
-v, --vod <0|1> Download VODs after stream ends
-c, --chat <0|1> Download and render chat
-m, --metadata <0|1> Download stream metadata
-r, --upload <0|1> Upload to cloud storage via rclone
-d, --delete <0|1> Delete local files after upload (CAREFUL!)
-n, --notifications <0|1> Send email notifications
{Fore.YELLOW}TIPS:{Style.RESET_ALL}
• Configure settings in config.json (copy from config.sample.json)
• Set up API credentials in .env file
• Most users only need to edit config.json, no command-line args needed
{Fore.CYAN}{"=" * 70}{Style.RESET_ALL}
'''
try:
opts, args = getopt.getopt(
argv,
"h:u:q:a:v:c:m:r:d:n:",
["help", "username=", "quality=", "ttv-lol=", "vod=", "chat=",
"metadata=", "upload=", "delete=", "notifications="]
)
except getopt.GetoptError as e:
print(f'{Fore.RED}Error: {e}{Style.RESET_ALL}\n')
print(help_msg)
sys.exit(2)
for opt, arg in opts:
if opt in ('-h', '--help'):
print(help_msg)
sys.exit(0)
elif opt in ("-u", "--username"):
twitch_archive.username = arg
elif opt in ("-q", "--quality"):
twitch_archive.quality = arg
elif opt in ("-a", "--ttv-lol"):
twitch_archive.streamlink_ttvlol = int(arg)
elif opt in ("-v", "--vod"):
twitch_archive.downloadVOD = int(arg)
elif opt in ("-c", "--chat"):
twitch_archive.downloadCHAT = int(arg)
elif opt in ("-m", "--metadata"):
twitch_archive.downloadMETADATA = int(arg)
elif opt in ("-r", "--upload"):
twitch_archive.uploadCloud = int(arg)
elif opt in ("-d", "--delete"):
twitch_archive.deleteFiles = int(arg)
elif opt in ("-n", "--notifications"):
twitch_archive.notifications = int(arg)
# Start the archive system
twitch_archive.run()
if __name__ == "__main__":
try:
main(sys.argv[1:])
except KeyboardInterrupt:
# Suppress stack trace for clean exit
print(f'\n{Fore.GREEN}✓ Graceful shutdown complete{Style.RESET_ALL}')
sys.exit(0)