TwitchDownloader/modules/downloader.py
MaddoScientisto ec44981a9d
All checks were successful
Publish Twitch Archive Container / publish (push) Successful in 7m36s
Add NVIDIA support for FFmpeg in Docker and enhance chat rendering functionality
- Introduced a new docker-compose.nvidia.yml for NVIDIA GPU support.
- Updated dockerstart.bat to allow optional NVIDIA runtime.
- Enhanced ContentDownloader to manage chat rendering status and font settings.
- Improved hardware acceleration detection in utils.py.
- Added tests for hardware acceleration and chat rendering behavior.

Co-authored-by: Copilot <copilot@github.com>
2026-04-25 12:28:59 +02:00

1009 lines
45 KiB
Python

"""
VOD and chat downloading functionality using TwitchDownloaderCLI.
Includes fallback support for chat_downloader when VOD-based methods fail.
"""
import os
import sys
import subprocess
import json
import threading
import time
import socket
import re
from typing import Dict, Any, Optional
from colorama import Fore, Style
from .utils import get_bin_path
# Try to import chat_downloader (optional dependency)
try:
from chat_downloader import ChatDownloader
CHAT_DOWNLOADER_AVAILABLE = True
except ImportError:
CHAT_DOWNLOADER_AVAILABLE = False
ChatDownloader = None
class ContentDownloader:
"""Handles VOD and chat downloading using TwitchDownloaderCLI."""
def __init__(self, twitch_downloader_path: str, ffmpeg_path: str, config: dict):
"""
Initialize the content downloader.
Args:
twitch_downloader_path: Path to TwitchDownloaderCLI executable
ffmpeg_path: Path to FFmpeg executable
config: Configuration dictionary
"""
self.twitch_downloader_path = twitch_downloader_path
self.ffmpeg_path = ffmpeg_path
self.quality = config.get('quality', 'best')
self.hls_segments_vod = config.get('hls_segmentsVOD', 10)
self.download_vod_enabled = config.get('downloadVOD', True)
self.download_chat_enabled = config.get('downloadCHAT', True)
self.download_live_chat_enabled = config.get('downloadLiveCHAT', True)
self.use_chat_downloader_primary = config.get('useChatDownloaderPrimary', False)
self.use_chat_downloader_fallback = config.get('useChatDownloaderFallback', True)
default_chat_font = 'Arial' if sys.platform.startswith('win') else 'DejaVu Sans'
self.chat_render_font = config.get('chat_render_font', default_chat_font)
self.last_chat_render_attempted = False
self.last_chat_render_succeeded = False
# Initialize chat_downloader if available
self.chat_downloader = None
if CHAT_DOWNLOADER_AVAILABLE:
try:
self.chat_downloader = ChatDownloader()
except Exception as e:
print(f'{Fore.YELLOW}⚠ Failed to initialize chat_downloader: {e}{Style.RESET_ALL}')
elif self.use_chat_downloader_primary or self.use_chat_downloader_fallback:
print(f'{Fore.YELLOW}⚠ chat_downloader not available but requested in config{Style.RESET_ALL}')
print(f'{Fore.YELLOW} Install with: pip install chat-downloader{Style.RESET_ALL}')
# Thread management for chat_downloader
self.chat_thread = None
self.chat_thread_success = False
self.chat_thread_error = None
def reset_chat_render_status(self) -> None:
"""Reset chat render tracking before a processing pass."""
self.last_chat_render_attempted = False
self.last_chat_render_succeeded = False
def download_vod(self, vod_info: Dict[str, Any], output_path: str) -> bool:
"""
Download VOD using TwitchDownloaderCLI.
Args:
vod_info: VOD metadata from Twitch API
output_path: Path where the VOD will be saved
Returns:
bool: True if download succeeded, False otherwise
"""
if not self.download_vod_enabled:
return False
print(f'\n{Fore.CYAN}Downloading VOD: {vod_info["title"]}{Style.RESET_ALL}')
# Extract numeric VOD ID
vod_id = vod_info["id"]
if isinstance(vod_id, str) and vod_id.startswith('v'):
vod_id = vod_id[1:]
vod_url = f"https://www.twitch.tv/videos/{vod_id}"
print(f'{Fore.YELLOW}VOD URL: {vod_url}{Style.RESET_ALL}')
bin_path = get_bin_path()
cmd = [
self.twitch_downloader_path,
'videodownload',
'-u', vod_url,
'-q', self.quality,
'-t', str(self.hls_segments_vod),
'--ffmpeg-path', self.ffmpeg_path,
'--temp-path', os.path.join(bin_path, 'temp'),
'--collision', 'Rename',
'-o', output_path
]
try:
result = subprocess.call(cmd)
if result == 0:
print(f'{Fore.GREEN}✓ VOD downloaded{Style.RESET_ALL}')
return True
else:
print(f'{Fore.RED}✗ VOD download failed with exit code: {result}{Style.RESET_ALL}')
return False
except Exception as e:
print(f'{Fore.RED}✗ VOD download failed: {str(e)}{Style.RESET_ALL}')
return False
def download_chat_json(self, vod_id: str, json_path: str) -> bool:
"""
Download chat JSON for a VOD.
Args:
vod_id: VOD ID
json_path: Path to save chat JSON
Returns:
bool: True if succeeded, False otherwise
"""
# Remove 'v' prefix if present
if isinstance(vod_id, str) and vod_id.startswith('v'):
vod_id = vod_id[1:]
print(f'{Fore.YELLOW}Downloading chat JSON for VOD {vod_id}...{Style.RESET_ALL}')
try:
result = subprocess.call([
self.twitch_downloader_path, 'chatdownload',
'--id', vod_id,
'--embed-images',
'--collision', 'Rename',
'-o', json_path
])
if result != 0:
print(f'{Fore.RED}✗ Chat JSON download failed with exit code: {result}{Style.RESET_ALL}')
return False
if not os.path.exists(json_path):
print(f'{Fore.RED}✗ Chat JSON file was not created{Style.RESET_ALL}')
return False
print(f'{Fore.GREEN}✓ Chat JSON downloaded{Style.RESET_ALL}')
return True
except Exception as e:
print(f'{Fore.RED}✗ Chat download failed: {str(e)}{Style.RESET_ALL}')
return False
def render_chat(self, json_path: str, video_path: str, output_args: str,
video_duration: Optional[float] = None) -> bool:
"""
Render chat JSON as a video.
Args:
json_path: Path to chat JSON file
video_path: Path to save rendered chat video
output_args: FFmpeg output arguments for encoding
video_duration: Optional video duration in seconds to trim chat to match
Returns:
bool: True if succeeded, False otherwise
"""
if not os.path.exists(json_path):
print(f'{Fore.RED}✗ Chat JSON file not found: {json_path}{Style.RESET_ALL}')
return False
# Validate JSON file has content (check if file size is reasonable)
try:
file_size = os.path.getsize(json_path)
if file_size < 100: # Less than 100 bytes means likely empty or invalid
print(f'{Fore.RED}✗ Chat JSON file is too small or incomplete ({file_size} bytes){Style.RESET_ALL}')
print(f'{Fore.YELLOW} This can happen when stream recording is interrupted{Style.RESET_ALL}')
return False
except Exception as e:
print(f'{Fore.RED}✗ Error checking chat JSON file: {str(e)}{Style.RESET_ALL}')
return False
bin_path = get_bin_path()
# Chat rendering settings
chat_settings = [
'--background-color', '#FF111111',
'-w', '500',
'-h', '1080',
'--framerate', '30',
'--outline',
'-f', self.chat_render_font,
'--font-size', '22',
'--update-rate', '1.0',
'--offline',
'--ffmpeg-path', self.ffmpeg_path,
'--temp-path', os.path.join(bin_path, 'temp'),
'--collision', 'Rename'
]
# Always start from beginning
chat_settings.extend(['-b', '0'])
# Trim chat to match video duration if provided
if video_duration is not None and video_duration > 0:
# Format duration as seconds with 1 decimal place
duration_str = f'{video_duration:.1f}s'
chat_settings.extend(['-e', duration_str])
print(f'{Fore.CYAN} Trimming chat to match video duration: {duration_str}{Style.RESET_ALL}')
# Add output args using = syntax to avoid parsing issues
if output_args:
chat_settings.append(f'--output-args={output_args}')
try:
print(f'{Fore.YELLOW}Rendering chat video...{Style.RESET_ALL}')
print(f'{Fore.CYAN}Using chat font: {self.chat_render_font}{Style.RESET_ALL}')
self.last_chat_render_attempted = True
self.last_chat_render_succeeded = False
# Build complete command
full_cmd = [self.twitch_downloader_path, 'chatrender', '-i', json_path, '-o', video_path] + chat_settings
# Capture output to see what TwitchDownloaderCLI says
process = subprocess.Popen(
full_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
encoding='utf-8',
errors='replace'
)
# Stream output in real-time
for line in process.stdout:
print(line.rstrip())
result = process.wait()
if result != 0:
print(f'{Fore.RED}✗ Chat render failed with exit code: {result}{Style.RESET_ALL}')
return False
# Verify the output file was created and has content
if not os.path.exists(video_path):
print(f'{Fore.RED}✗ Chat video file was not created{Style.RESET_ALL}')
return False
file_size = os.path.getsize(video_path)
if file_size < 1024: # Less than 1KB is suspicious
print(f'{Fore.RED}✗ Chat video file is too small ({file_size} bytes){Style.RESET_ALL}')
return False
self.last_chat_render_succeeded = True
print(f'{Fore.GREEN}✓ Chat rendered ({file_size:,} bytes){Style.RESET_ALL}')
return True
except Exception as e:
print(f'{Fore.RED}✗ Chat rendering failed: {str(e)}{Style.RESET_ALL}')
return False
def download_and_render_chat(self, vod_info: Dict[str, Any], json_path: str,
video_path: str, output_args: str,
video_duration: Optional[float] = None) -> bool:
"""
Download chat logs and render them as video.
Args:
vod_info: VOD metadata from Twitch API
json_path: Path to save chat JSON
video_path: Path to save rendered chat video
output_args: FFmpeg output arguments for encoding
video_duration: Optional video duration in seconds to trim chat to match
Returns:
bool: True if succeeded, False otherwise
"""
if not self.download_chat_enabled:
return False
print(f'\n{Fore.CYAN}Downloading chat: {vod_info["title"]}{Style.RESET_ALL}')
# Extract numeric VOD ID
vod_id = vod_info["id"]
# Download chat JSON
if not self.download_chat_json(vod_id, json_path):
return False
# Render chat video with optional duration trimming
return self.render_chat(json_path, video_path, output_args, video_duration=video_duration)
def start_live_chat_download(self, vod_id: str, json_path: str) -> Optional[subprocess.Popen]:
"""
Start downloading live chat in the background while stream is recording.
Args:
vod_id: The VOD/stream ID to download chat from
json_path: Path to save chat JSON
Returns:
subprocess.Popen: The process handle, or None if failed to start
"""
if not self.download_live_chat_enabled:
return None
print(f'\n{Fore.CYAN}Starting live chat download...{Style.RESET_ALL}')
# Remove 'v' prefix if present
if isinstance(vod_id, str) and vod_id.startswith('v'):
vod_id = vod_id[1:]
try:
cmd = [
self.twitch_downloader_path, 'chatdownload',
'--id', vod_id,
'--embed-images',
'--collision', 'Rename',
'-o', json_path
]
print(f'{Fore.YELLOW}Live chat download started in background for VOD {vod_id}{Style.RESET_ALL}')
process = subprocess.Popen(
cmd,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL
)
return process
except Exception as e:
print(f'{Fore.RED}✗ Failed to start live chat download: {str(e)}{Style.RESET_ALL}')
return None
def _download_live_chat_via_irc(self, username: str, json_path: str,
max_messages: Optional[int] = None,
timeout: Optional[float] = None,
shutdown_check: Optional[callable] = None,
stream_monitor = None,
verbose: bool = False) -> bool:
"""
Simple IRC-based fallback to capture Twitch chat when GraphQL methods fail.
This writes newline-delimited JSON objects with at least: timestamp (ms),
author (dict with `name`), and `message`.
"""
try:
sock = socket.socket()
sock.connect(('irc.chat.twitch.tv', 6667))
sock.settimeout(1.0)
# Request tags & capabilities
sock.sendall(b'CAP REQ :twitch.tv/tags twitch.tv/commands twitch.tv/membership\r\n')
sock.sendall(b'PASS SCHMOOPIIE\r\n')
sock.sendall(b'NICK justinfan67420\r\n')
sock.sendall(f'JOIN #{username}\r\n'.encode('utf-8'))
messages_written = 0
start_time = time.time()
# Open file for streaming newline-delimited JSON
os.makedirs(os.path.dirname(json_path), exist_ok=True)
with open(json_path, 'w', encoding='utf-8') as out_f:
buffer = ''
while True:
# Shutdown/timeouts
if shutdown_check and shutdown_check():
break
if timeout and (time.time() - start_time) > timeout:
break
if stream_monitor:
try:
if not stream_monitor.is_user_live():
break
except Exception:
pass
try:
data = sock.recv(4096).decode('utf-8', 'ignore')
except socket.timeout:
continue
except Exception as e:
print(f'{Fore.YELLOW}⚠ IRC recv error: {e}{Style.RESET_ALL}')
break
if not data:
continue
buffer += data
lines = buffer.split('\r\n')
buffer = lines.pop() # remainder
for line in lines:
if not line:
continue
# Respond to PINGs
if line.startswith('PING'):
try:
sock.sendall(b'PONG :tmi.twitch.tv\r\n')
except Exception:
pass
continue
# Extract PRIVMSG lines
m = re.match(r'(?:@[^ ]+ )?:([^!]+)!.* PRIVMSG #[^ ]+ :(.+)', line)
if not m:
continue
author = m.group(1)
msg_text = m.group(2)
timestamp_ms = int(time.time() * 1000)
item = {
'timestamp': timestamp_ms,
'author': {'name': author},
'message': msg_text
}
out_f.write(json.dumps(item, ensure_ascii=False) + '\n')
out_f.flush()
messages_written += 1
if verbose and (messages_written % 10 == 0):
print(f'\n{Fore.GREEN}💬 {author}: {Fore.WHITE}{msg_text}{Style.RESET_ALL}')
if max_messages and messages_written >= max_messages:
break
if max_messages and messages_written >= max_messages:
break
sock.close()
if messages_written > 0:
print(f'\n{Fore.GREEN}✓ IRC fallback captured {messages_written} messages{Style.RESET_ALL}')
return True
else:
print(f'\n{Fore.RED}✗ IRC fallback captured no messages{Style.RESET_ALL}')
return False
except Exception as e:
print(f'{Fore.RED}✗ IRC fallback failed: {e}{Style.RESET_ALL}')
import traceback
traceback.print_exc()
return False
def wait_for_chat_download(self, process: Optional[subprocess.Popen],
json_path: str, timeout: int = 300) -> bool:
"""
Wait for live chat download process to complete.
Args:
process: The chat download process handle
json_path: Path where chat JSON should be saved
timeout: Maximum time to wait in seconds
Returns:
bool: True if chat download succeeded, False otherwise
"""
if process is None:
return False
try:
print(f'{Fore.YELLOW}Waiting for live chat download to complete...{Style.RESET_ALL}')
return_code = process.wait(timeout=timeout)
if return_code == 0 and os.path.exists(json_path):
print(f'{Fore.GREEN}✓ Live chat JSON downloaded{Style.RESET_ALL}')
return True
else:
print(f'{Fore.RED}✗ Live chat download failed (exit code: {return_code}){Style.RESET_ALL}')
return False
except subprocess.TimeoutExpired:
print(f'{Fore.YELLOW}⚠ Live chat download timed out, terminating...{Style.RESET_ALL}')
process.terminate()
return False
except Exception as e:
print(f'{Fore.RED}✗ Error waiting for chat download: {str(e)}{Style.RESET_ALL}')
return False
def download_live_chat_with_chat_downloader(self, username: str, json_path: str,
max_messages: Optional[int] = None,
timeout: Optional[float] = None,
shutdown_check: Optional[callable] = None,
stream_monitor = None,
verbose: bool = False) -> bool:
"""
Download live chat using chat_downloader library as fallback.
This works even when VOD is disabled on the channel.
Args:
username: Twitch username/channel name
json_path: Path to save chat JSON
max_messages: Maximum messages to download (None = unlimited)
timeout: Stop after this many seconds (None = until stream ends)
shutdown_check: Optional callback function that returns True when shutdown requested
stream_monitor: Optional stream monitor to check if stream is still live
verbose: Show chat message previews
Returns:
bool: True if chat download succeeded, False otherwise
"""
if not CHAT_DOWNLOADER_AVAILABLE or self.chat_downloader is None:
print(f'{Fore.RED}✗ chat_downloader not available{Style.RESET_ALL}')
print(f'{Fore.YELLOW} Install with: pip install chat-downloader{Style.RESET_ALL}')
return False
if not self.download_live_chat_enabled:
print(f'{Fore.YELLOW}⚠ downloadLiveCHAT is disabled in config{Style.RESET_ALL}')
return False
# If a stream monitor was provided, check that the user is currently live
if stream_monitor is not None:
try:
if not stream_monitor.is_user_live():
print(f'{Fore.YELLOW}⚠ Stream is not live; skipping chat download{Style.RESET_ALL}')
return False
except Exception as e:
# If we couldn't determine live status, continue and let chat_downloader handle it
print(f'{Fore.YELLOW}⚠ Could not determine live status: {e} - proceeding with chat download{Style.RESET_ALL}')
print(f'\n{Fore.CYAN}Starting live chat download (chat_downloader)...{Style.RESET_ALL}')
print(f'{Fore.MAGENTA}[VERBOSE] chat_downloader library version: {ChatDownloader.__module__}{Style.RESET_ALL}')
try:
# Construct Twitch stream URL
stream_url = f'https://www.twitch.tv/{username}'
print(f'{Fore.YELLOW}Downloading chat from: {stream_url}{Style.RESET_ALL}')
print(f'{Fore.MAGENTA}[VERBOSE] Output path: {json_path}{Style.RESET_ALL}')
print(f'{Fore.MAGENTA}[VERBOSE] Timeout: {timeout}s (None = unlimited){Style.RESET_ALL}')
print(f'{Fore.MAGENTA}[VERBOSE] Max messages: {max_messages} (None = unlimited){Style.RESET_ALL}')
# Get chat messages with a small retry loop to handle transient GQL/network issues
print(f'{Fore.CYAN}Connecting to Twitch chat...{Style.RESET_ALL}')
chat = None
max_attempts = 3
for attempt in range(1, max_attempts + 1):
try:
chat = self.chat_downloader.get_chat(
stream_url,
message_types=['text_message'], # Basic text messages
output=json_path,
timeout=timeout,
max_messages=max_messages
)
break
except Exception as e:
# Provide a clearer, user-facing message for common failures
print(f"{Fore.YELLOW}⚠ chat_downloader attempt {attempt}/{max_attempts} failed: {str(e)}{Style.RESET_ALL}")
# On final attempt, dump traceback to help diagnose library internals
if attempt >= max_attempts:
print(f"{Fore.RED}✗ chat_downloader failed after {max_attempts} attempts. This may be caused by Twitch GraphQL changes or rate-limiting.{Style.RESET_ALL}")
print(f"{Fore.YELLOW} Try upgrading the chat-downloader package: pip install -U chat-downloader{Style.RESET_ALL}")
import traceback
traceback.print_exc()
# Try IRC fallback before giving up
print(f"{Fore.MAGENTA}[VERBOSE] Attempting IRC fallback for chat capture...{Style.RESET_ALL}")
try:
return self._download_live_chat_via_irc(username, json_path,
max_messages=max_messages,
timeout=timeout,
shutdown_check=shutdown_check,
stream_monitor=stream_monitor,
verbose=verbose)
except Exception as fallback_err:
print(f"{Fore.RED}✗ IRC fallback failed: {fallback_err}{Style.RESET_ALL}")
traceback.print_exc()
return False
else:
time.sleep(1)
continue
# The get_chat with output parameter writes to file automatically
# We just need to iterate to trigger the download
message_count = 0
last_check_time = time.time()
check_interval = 10.0 # Check if stream is still live every 10 seconds
print(f'{Fore.CYAN}Receiving chat messages (will stop when stream ends)...{Style.RESET_ALL}')
try:
for message in chat:
# Check for shutdown request
if shutdown_check and shutdown_check():
print(f'\n{Fore.YELLOW}⚠ Chat download stopped by shutdown request{Style.RESET_ALL}')
break
# Periodically check if stream is still live
current_time = time.time()
if stream_monitor and (current_time - last_check_time) >= check_interval:
last_check_time = current_time
try:
is_live = stream_monitor.is_user_live()
if not is_live:
print(f'\n{Fore.YELLOW}⚠ Stream ended, stopping chat download{Style.RESET_ALL}')
break
except Exception as check_error:
print(f'\n{Fore.YELLOW}⚠ Could not check stream status: {check_error}{Style.RESET_ALL}')
# Continue downloading to avoid false positives from API errors
message_count += 1
# Show progress every 100 messages
if message_count % 100 == 0:
print(f'{Fore.CYAN} Downloaded {message_count} messages...{Style.RESET_ALL}', end='\r')
# Show chat previews in verbose mode (every 10 messages)
if verbose and message_count % 10 == 0:
author = message.get('author', {}).get('name', 'Unknown')
msg_text = message.get('message', 'N/A')
# Truncate long messages
if len(msg_text) > 60:
msg_text = msg_text[:60] + '...'
print(f'\n{Fore.GREEN}💬 {author}: {Fore.WHITE}{msg_text}{Style.RESET_ALL}')
# Print sample message every 500 for extra debugging
if message_count % 500 == 0:
print(f'\n{Fore.MAGENTA}[VERBOSE] Sample message #{message_count}: {message.get("message", "N/A")[:50]}...{Style.RESET_ALL}')
except KeyboardInterrupt:
print(f'\n{Fore.YELLOW}⚠ Chat download interrupted by user{Style.RESET_ALL}')
except Exception as e:
# Stream might have ended
print(f'\n{Fore.YELLOW}Chat download stopped: {str(e)}{Style.RESET_ALL}')
# Check if file was created
if os.path.exists(json_path):
file_size = os.path.getsize(json_path)
if file_size > 100:
print(f'\n{Fore.GREEN}✓ Live chat downloaded ({message_count} messages, {file_size} bytes){Style.RESET_ALL}')
return True
else:
print(f'\n{Fore.RED}✗ Chat file too small ({file_size} bytes){Style.RESET_ALL}')
return False
else:
print(f'\n{Fore.RED}✗ Chat file was not created{Style.RESET_ALL}')
return False
except Exception as e:
print(f'{Fore.RED}✗ chat_downloader failed: {str(e)}{Style.RESET_ALL}')
import traceback
traceback.print_exc()
return False
def start_chat_downloader_thread(self, username: str, json_path: str,
shutdown_check: Optional[callable] = None,
stream_monitor = None,
verbose: bool = False) -> threading.Thread:
"""
Start chat_downloader in a background thread.
Args:
username: Twitch username
json_path: Path to save chat JSON
shutdown_check: Callback to check for shutdown
stream_monitor: Optional stream monitor to check if stream is still live
verbose: Show chat previews
Returns:
threading.Thread: The thread running the download
"""
def download_thread():
try:
self.chat_thread_success = self.download_live_chat_with_chat_downloader(
username, json_path,
shutdown_check=shutdown_check,
stream_monitor=stream_monitor,
verbose=verbose
)
except Exception as e:
self.chat_thread_error = e
self.chat_thread_success = False
print(f'\n{Fore.RED}✗ Chat thread error: {e}{Style.RESET_ALL}')
import traceback
traceback.print_exc()
thread = threading.Thread(target=download_thread, daemon=True)
thread.start()
self.chat_thread = thread
return thread
def wait_for_chat_thread(self, timeout: Optional[float] = None) -> bool:
"""
Wait for chat download thread to complete.
Args:
timeout: Maximum time to wait in seconds
Returns:
bool: True if chat download succeeded
"""
if self.chat_thread is None:
return False
try:
self.chat_thread.join(timeout=timeout)
# Give a moment for file to be fully written and closed
if self.chat_thread_success:
print(f'{Fore.CYAN}Ensuring chat file is fully written and closed...{Style.RESET_ALL}')
time.sleep(1.0) # Increased from 0.5s
# Force garbage collection to help close file handles
import gc
gc.collect()
time.sleep(0.5)
return self.chat_thread_success
except Exception as e:
print(f'{Fore.RED}✗ Error waiting for chat thread: {e}{Style.RESET_ALL}')
return False
finally:
self.chat_thread = None
def wait_for_file_access(self, file_path: str, max_attempts: int = 10, delay: float = 0.5) -> bool:
"""
Wait for a file to be accessible (not locked by another process).
Args:
file_path: Path to the file to check
max_attempts: Maximum number of attempts
delay: Delay between attempts in seconds
Returns:
bool: True if file is accessible, False otherwise
"""
for attempt in range(max_attempts):
try:
# Try to open the file for reading with shared access
with open(file_path, 'r', encoding='utf-8') as f:
# Try to read a bit to ensure it's really accessible
f.read(1)
print(f'{Fore.GREEN}✓ File is accessible and ready to use{Style.RESET_ALL}')
return True
except PermissionError:
if attempt < max_attempts - 1:
print(f'{Fore.YELLOW}⚠ File still locked, waiting... (attempt {attempt + 1}/{max_attempts}){Style.RESET_ALL}')
time.sleep(delay)
else:
print(f'{Fore.RED}✗ File still locked after {max_attempts} attempts{Style.RESET_ALL}')
return False
except Exception as e:
print(f'{Fore.RED}✗ Error checking file access: {e}{Style.RESET_ALL}')
return False
return False
def convert_chat_downloader_to_twitch_format(self, input_path: str, output_path: str,
video_duration: Optional[float] = None) -> bool:
"""
Convert chat_downloader JSON format to TwitchDownloaderCLI format.
Args:
input_path: Path to chat_downloader JSON file
output_path: Path to save converted TwitchDownloaderCLI format
video_duration: Optional video duration in seconds (overrides calculated duration)
Returns:
bool: True if conversion succeeded, False otherwise
"""
try:
from datetime import datetime, timezone as dt_timezone
print(f'{Fore.CYAN}Converting chat format for rendering...{Style.RESET_ALL}')
# Read chat_downloader format (JSON array)
with open(input_path, 'r', encoding='utf-8') as f:
messages = json.load(f)
# Ensure we have a list
if not isinstance(messages, list):
print(f'{Fore.RED}✗ Expected JSON array, got {type(messages)}{Style.RESET_ALL}')
return False
print(f'{Fore.CYAN}Converting {len(messages)} messages...{Style.RESET_ALL}')
# Track first message timestamp for offset calculation
first_timestamp = None
# Convert to TwitchDownloaderCLI format
comments = []
for msg in messages:
# Skip non-dictionary items
if not isinstance(msg, dict):
continue
# Extract data from chat_downloader format
# Timestamp is in microseconds, convert to seconds
timestamp_us = msg.get('timestamp', 0)
timestamp_sec = timestamp_us / 1000000.0 if timestamp_us else 0.0
# Track first timestamp for offset calculation
if first_timestamp is None:
first_timestamp = timestamp_sec
# Calculate offset from start of stream
content_offset_seconds = timestamp_sec - first_timestamp
# Extract author info
author = msg.get('author', {})
author_name = author.get('display_name', author.get('name', 'Unknown'))
author_login = author.get('name', author_name.lower())
author_id = author.get('id', '')
# Extract message text
message_text = msg.get('message', '')
# Format timestamp as ISO 8601
dt = datetime.fromtimestamp(timestamp_sec, tz=dt_timezone.utc)
created_at = dt.strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z'
# Get user color (chat_downloader uses "colour" - British spelling)
user_color = msg.get('colour', msg.get('color', '#FFFFFF'))
if not user_color:
user_color = '#FFFFFF'
comment = {
"_id": msg.get('message_id', f"msg_{len(comments)}"),
"created_at": created_at,
"updated_at": created_at,
"channel_id": msg.get('channel_id', ''),
"content_type": "video",
"content_id": "",
"content_offset_seconds": content_offset_seconds,
"commenter": {
"display_name": author_name,
"name": author_login,
"_id": str(author_id),
"type": "user",
"bio": None,
"created_at": created_at,
"updated_at": created_at,
"logo": None
},
"source": "chat",
"state": "published",
"message": {
"body": message_text,
"bits_spent": 0,
"fragments": [
{
"text": message_text,
"emoticon": None
}
],
"is_action": False,
"user_badges": [],
"user_color": user_color,
"user_notice_params": {}
},
"more_replies": False
}
comments.append(comment)
if not comments:
print(f'{Fore.RED}✗ No valid comments to convert{Style.RESET_ALL}')
return False
# Use provided video duration, or calculate from last message
print(f'{Fore.MAGENTA}[DEBUG] Input video_duration parameter: {video_duration}{Style.RESET_ALL}')
if video_duration is None:
video_duration = int(comments[-1]["content_offset_seconds"]) if comments else 0
print(f'{Fore.MAGENTA}[DEBUG] No video duration provided, calculated from chat: {video_duration}s{Style.RESET_ALL}')
else:
video_duration = int(video_duration)
print(f'{Fore.MAGENTA}[DEBUG] Using provided video duration: {video_duration}s{Style.RESET_ALL}')
# Debug output
print(f'{Fore.MAGENTA}[DEBUG] First message offset: {comments[0]["content_offset_seconds"]:.2f}s{Style.RESET_ALL}')
print(f'{Fore.MAGENTA}[DEBUG] Last message offset: {comments[-1]["content_offset_seconds"]:.2f}s{Style.RESET_ALL}')
print(f'{Fore.MAGENTA}[DEBUG] Calculated duration: {video_duration}s{Style.RESET_ALL}')
print(f'{Fore.MAGENTA}[DEBUG] Sample comment structure:{Style.RESET_ALL}')
print(f'{Fore.MAGENTA} ID: {comments[0]["_id"]}{Style.RESET_ALL}')
print(f'{Fore.MAGENTA} Offset: {comments[0]["content_offset_seconds"]}{Style.RESET_ALL}')
print(f'{Fore.MAGENTA} Message: {comments[0]["message"]["body"][:50]}{Style.RESET_ALL}')
print(f'{Fore.MAGENTA}[DEBUG] Final JSON duration will be: {video_duration}s{Style.RESET_ALL}')
print(f'{Fore.MAGENTA}[DEBUG] Comment count: {len(comments)}{Style.RESET_ALL}')
# Create final structure matching TwitchDownloaderCLI format
from datetime import datetime as dt
created_at_iso = dt.now().isoformat() + 'Z'
output_data = {
"FileInfo": {
"Version": {
"Major": 1,
"Minor": 4,
"Patch": 0
},
"CreatedAt": created_at_iso,
"UpdatedAt": "0001-01-01T00:00:00"
},
"streamer": {
"name": comments[0]["channel_id"] if comments else "",
"login": comments[0]["channel_id"] if comments else "",
"id": int(comments[0]["channel_id"]) if comments and comments[0]["channel_id"].isdigit() else 0
},
"video": {
"title": "Live Chat",
"description": None,
"id": None,
"created_at": comments[0]["created_at"] if comments else created_at_iso,
"start": 0,
"end": video_duration,
"length": video_duration,
"viewCount": 0,
"game": None,
"chapters": []
},
"comments": comments,
"embeddedData": None
}
# Write to output file
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(output_data, f, indent=2, ensure_ascii=False)
# Verify output
file_size = os.path.getsize(output_path)
print(f'{Fore.MAGENTA}[DEBUG] Converted JSON file: {file_size:,} bytes{Style.RESET_ALL}')
print(f'{Fore.GREEN}✓ Chat format converted successfully{Style.RESET_ALL}')
return True
except Exception as e:
print(f'{Fore.RED}✗ Chat format conversion failed: {e}{Style.RESET_ALL}')
import traceback
traceback.print_exc()
return False
def start_live_chat_download_with_fallback(self, username: str, vod_id: Optional[str],
json_path: str) -> tuple[Optional[subprocess.Popen], str]:
"""
Start live chat download with automatic fallback.
Tries TwitchDownloaderCLI first (if VOD ID available and not using chat_downloader as primary),
falls back to chat_downloader if that fails or if VOD ID is not available.
Args:
username: Twitch username
vod_id: Optional VOD ID (may be None if VODs disabled)
json_path: Path to save chat JSON
Returns:
tuple: (process_handle or None, method_used)
method_used is one of: 'twitch_downloader', 'chat_downloader', 'failed'
"""
print(f'\n{Fore.MAGENTA}[VERBOSE] === CHAT DOWNLOAD FALLBACK LOGIC ==={Style.RESET_ALL}')
print(f'{Fore.MAGENTA}[VERBOSE] Username: {username}{Style.RESET_ALL}')
print(f'{Fore.MAGENTA}[VERBOSE] VOD ID: {vod_id}{Style.RESET_ALL}')
print(f'{Fore.MAGENTA}[VERBOSE] Primary method: {"chat_downloader" if self.use_chat_downloader_primary else "TwitchDownloaderCLI"}{Style.RESET_ALL}')
print(f'{Fore.MAGENTA}[VERBOSE] Fallback enabled: {self.use_chat_downloader_fallback}{Style.RESET_ALL}')
# Determine primary method
use_twitch_downloader_first = (
vod_id is not None and
not self.use_chat_downloader_primary and
self.download_live_chat
)
print(f'{Fore.MAGENTA}[VERBOSE] Will try TwitchDownloaderCLI first: {use_twitch_downloader_first}{Style.RESET_ALL}')
# Try TwitchDownloaderCLI first if conditions met
if use_twitch_downloader_first:
print(f'{Fore.CYAN}Attempting live chat download with TwitchDownloaderCLI...{Style.RESET_ALL}')
print(f'{Fore.MAGENTA}[VERBOSE] Starting TwitchDownloaderCLI process...{Style.RESET_ALL}')
process = self.start_live_chat_download(vod_id, json_path)
if process is not None:
print(f'{Fore.GREEN}✓ TwitchDownloaderCLI started successfully{Style.RESET_ALL}')
print(f'{Fore.MAGENTA}[VERBOSE] Process PID: {process.pid}{Style.RESET_ALL}')
return (process, 'twitch_downloader')
else:
print(f'{Fore.YELLOW}⚠ TwitchDownloaderCLI failed to start{Style.RESET_ALL}')
print(f'{Fore.MAGENTA}[VERBOSE] Will try fallback method...{Style.RESET_ALL}')
# Try chat_downloader as fallback (or as primary)
if self.use_chat_downloader_fallback or self.use_chat_downloader_primary:
if vod_id is None:
print(f'{Fore.YELLOW}⚠ No VOD ID available - using chat_downloader fallback{Style.RESET_ALL}')
print(f'{Fore.MAGENTA}[VERBOSE] This typically means VODs are disabled on this channel{Style.RESET_ALL}')
if CHAT_DOWNLOADER_AVAILABLE and self.chat_downloader is not None:
print(f'{Fore.CYAN}Using chat_downloader as {"primary method" if self.use_chat_downloader_primary else "fallback"}{Style.RESET_ALL}')
print(f'{Fore.MAGENTA}[VERBOSE] chat_downloader is available and initialized{Style.RESET_ALL}')
# Return special marker for chat_downloader
return (None, 'chat_downloader')
else:
print(f'{Fore.RED}✗ chat_downloader not available for fallback{Style.RESET_ALL}')
print(f'{Fore.MAGENTA}[VERBOSE] CHAT_DOWNLOADER_AVAILABLE: {CHAT_DOWNLOADER_AVAILABLE}{Style.RESET_ALL}')
print(f'{Fore.MAGENTA}[VERBOSE] chat_downloader instance: {self.chat_downloader}{Style.RESET_ALL}')
print(f'{Fore.YELLOW} Install with: pip install chat-downloader{Style.RESET_ALL}')
else:
print(f'{Fore.YELLOW}⚠ Fallback disabled in configuration{Style.RESET_ALL}')
# Both methods failed or unavailable
if vod_id is None:
print(f'{Fore.RED}✗ No VOD ID and no fallback available - cannot download live chat{Style.RESET_ALL}')
print(f'{Fore.MAGENTA}[VERBOSE] All chat download methods exhausted{Style.RESET_ALL}')
return (None, 'failed')