All checks were successful
Publish Twitch Archive Container / publish (push) Successful in 7m36s
- Introduced a new docker-compose.nvidia.yml for NVIDIA GPU support. - Updated dockerstart.bat to allow optional NVIDIA runtime. - Enhanced ContentDownloader to manage chat rendering status and font settings. - Improved hardware acceleration detection in utils.py. - Added tests for hardware acceleration and chat rendering behavior. Co-authored-by: Copilot <copilot@github.com>
762 lines
30 KiB
Python
762 lines
30 KiB
Python
"""
|
|
Unit tests for Twitch Archive command-line options and configuration.
|
|
|
|
Tests focus on:
|
|
- Command-line argument parsing (via getopt simulation)
|
|
- Options and option combinations
|
|
- Configuration logic (filtering, merging, etc.)
|
|
- Mode selection logic
|
|
|
|
Excludes actual download/processing functionality.
|
|
|
|
To run these tests:
|
|
python test_twitch_archive_simple.py
|
|
or
|
|
python -m pytest test_twitch_archive_simple.py -v
|
|
"""
|
|
|
|
import unittest
|
|
import sys
|
|
import os
|
|
import json
|
|
import getopt
|
|
import tempfile
|
|
import importlib.util
|
|
from pathlib import Path
|
|
from unittest.mock import patch, MagicMock, Mock
|
|
|
|
# Add parent directory to path for imports
|
|
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
|
|
|
from modules.constants import DEFAULT_CONFIG
|
|
from modules.file_manager import FileManager
|
|
from modules.downloader import ContentDownloader
|
|
from modules.utils import get_ffmpeg_executable, get_twitch_downloader_executable, detect_hardware_acceleration, resolve_hwaccel_type
|
|
|
|
|
|
def load_twitch_archive_module():
|
|
"""Load the main script module for targeted regression tests."""
|
|
module_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'twitch-archive.py')
|
|
spec = importlib.util.spec_from_file_location('twitch_archive_main', module_path)
|
|
module = importlib.util.module_from_spec(spec)
|
|
spec.loader.exec_module(module)
|
|
return module
|
|
|
|
|
|
class TestCommandLineArgumentParsing(unittest.TestCase):
|
|
"""Test command-line argument parsing logic using getopt directly."""
|
|
|
|
def test_help_short_option(self):
|
|
"""Test -h option parsing."""
|
|
argv = ['-h']
|
|
opts, args = getopt.getopt(
|
|
argv,
|
|
"hu:q:a:v:c:m:r:d:n:",
|
|
["help", "username=", "quality=", "ttv-lol=", "vod=", "chat=",
|
|
"metadata=", "upload=", "delete=", "notifications=", "legacy", "verbose",
|
|
"chat-only", "use-chat-downloader-primary", "no-chat-downloader-fallback"]
|
|
)
|
|
|
|
# Should parse successfully
|
|
self.assertEqual(len(opts), 1)
|
|
self.assertEqual(opts[0][0], '-h')
|
|
|
|
def test_help_long_option(self):
|
|
"""Test --help option parsing."""
|
|
argv = ['--help']
|
|
opts, args = getopt.getopt(
|
|
argv,
|
|
"hu:q:a:v:c:m:r:d:n:",
|
|
["help", "username=", "quality=", "ttv-lol=", "vod=", "chat=",
|
|
"metadata=", "upload=", "delete=", "notifications=", "legacy", "verbose",
|
|
"chat-only", "use-chat-downloader-primary", "no-chat-downloader-fallback"]
|
|
)
|
|
|
|
self.assertEqual(len(opts), 1)
|
|
self.assertEqual(opts[0][0], '--help')
|
|
|
|
def test_username_short_option(self):
|
|
"""Test -u username option parsing."""
|
|
argv = ['-u', 'teststreamer']
|
|
opts, args = getopt.getopt(
|
|
argv,
|
|
"hu:q:a:v:c:m:r:d:n:",
|
|
["help", "username=", "quality=", "ttv-lol=", "vod=", "chat=",
|
|
"metadata=", "upload=", "delete=", "notifications=", "legacy", "verbose",
|
|
"chat-only", "use-chat-downloader-primary", "no-chat-downloader-fallback"]
|
|
)
|
|
|
|
self.assertEqual(len(opts), 1)
|
|
self.assertEqual(opts[0], ('-u', 'teststreamer'))
|
|
|
|
def test_username_long_option(self):
|
|
"""Test --username option parsing."""
|
|
argv = ['--username', 'teststreamer']
|
|
opts, args = getopt.getopt(
|
|
argv,
|
|
"hu:q:a:v:c:m:r:d:n:",
|
|
["help", "username=", "quality=", "ttv-lol=", "vod=", "chat=",
|
|
"metadata=", "upload=", "delete=", "notifications=", "legacy", "verbose",
|
|
"chat-only", "use-chat-downloader-primary", "no-chat-downloader-fallback"]
|
|
)
|
|
|
|
self.assertEqual(len(opts), 1)
|
|
self.assertEqual(opts[0], ('--username', 'teststreamer'))
|
|
|
|
def test_verbose_option(self):
|
|
"""Test --verbose option parsing."""
|
|
argv = ['--verbose']
|
|
opts, args = getopt.getopt(
|
|
argv,
|
|
"hu:q:a:v:c:m:r:d:n:",
|
|
["help", "username=", "quality=", "ttv-lol=", "vod=", "chat=",
|
|
"metadata=", "upload=", "delete=", "notifications=", "legacy", "verbose",
|
|
"chat-only", "use-chat-downloader-primary", "no-chat-downloader-fallback"]
|
|
)
|
|
|
|
self.assertEqual(len(opts), 1)
|
|
self.assertEqual(opts[0], ('--verbose', ''))
|
|
|
|
def test_chat_only_option(self):
|
|
"""Test --chat-only option parsing."""
|
|
argv = ['--chat-only']
|
|
opts, args = getopt.getopt(
|
|
argv,
|
|
"hu:q:a:v:c:m:r:d:n:",
|
|
["help", "username=", "quality=", "ttv-lol=", "vod=", "chat=",
|
|
"metadata=", "upload=", "delete=", "notifications=", "legacy", "verbose",
|
|
"chat-only", "use-chat-downloader-primary", "no-chat-downloader-fallback"]
|
|
)
|
|
|
|
self.assertEqual(len(opts), 1)
|
|
self.assertEqual(opts[0], ('--chat-only', ''))
|
|
|
|
def test_rclone_smoke_test_option(self):
|
|
"""Test --rclone-smoke-test option parsing."""
|
|
argv = ['--rclone-smoke-test']
|
|
opts, args = getopt.getopt(
|
|
argv,
|
|
"hu:q:a:v:c:m:r:d:n:",
|
|
["help", "username=", "quality=", "ttv-lol=", "vod=", "chat=",
|
|
"metadata=", "upload=", "delete=", "notifications=", "legacy", "verbose",
|
|
"chat-only", "healthcheck", "rclone-smoke-test", "use-chat-downloader-primary", "no-chat-downloader-fallback"]
|
|
)
|
|
|
|
self.assertEqual(len(opts), 1)
|
|
self.assertEqual(opts[0], ('--rclone-smoke-test', ''))
|
|
|
|
def test_healthcheck_option(self):
|
|
"""Test --healthcheck option parsing."""
|
|
argv = ['--healthcheck']
|
|
opts, args = getopt.getopt(
|
|
argv,
|
|
"hu:q:a:v:c:m:r:d:n:",
|
|
["help", "username=", "quality=", "ttv-lol=", "vod=", "chat=",
|
|
"metadata=", "upload=", "delete=", "notifications=", "legacy", "verbose",
|
|
"chat-only", "healthcheck", "rclone-smoke-test", "use-chat-downloader-primary", "no-chat-downloader-fallback"]
|
|
)
|
|
|
|
self.assertEqual(len(opts), 1)
|
|
self.assertEqual(opts[0], ('--healthcheck', ''))
|
|
|
|
def test_legacy_option(self):
|
|
"""Test --legacy option parsing."""
|
|
argv = ['--legacy']
|
|
opts, args = getopt.getopt(
|
|
argv,
|
|
"hu:q:a:v:c:m:r:d:n:",
|
|
["help", "username=", "quality=", "ttv-lol=", "vod=", "chat=",
|
|
"metadata=", "upload=", "delete=", "notifications=", "legacy", "verbose",
|
|
"chat-only", "use-chat-downloader-primary", "no-chat-downloader-fallback"]
|
|
)
|
|
|
|
self.assertEqual(len(opts), 1)
|
|
self.assertEqual(opts[0], ('--legacy', ''))
|
|
|
|
def test_chat_downloader_options(self):
|
|
"""Test chat downloader option parsing."""
|
|
argv = ['--use-chat-downloader-primary', '--no-chat-downloader-fallback']
|
|
opts, args = getopt.getopt(
|
|
argv,
|
|
"hu:q:a:v:c:m:r:d:n:",
|
|
["help", "username=", "quality=", "ttv-lol=", "vod=", "chat=",
|
|
"metadata=", "upload=", "delete=", "notifications=", "legacy", "verbose",
|
|
"chat-only", "use-chat-downloader-primary", "no-chat-downloader-fallback"]
|
|
)
|
|
|
|
self.assertEqual(len(opts), 2)
|
|
self.assertEqual(opts[0], ('--use-chat-downloader-primary', ''))
|
|
self.assertEqual(opts[1], ('--no-chat-downloader-fallback', ''))
|
|
|
|
def test_legacy_quality_option(self):
|
|
"""Test -q quality option parsing."""
|
|
argv = ['-q', '720p']
|
|
opts, args = getopt.getopt(
|
|
argv,
|
|
"hu:q:a:v:c:m:r:d:n:",
|
|
["help", "username=", "quality=", "ttv-lol=", "vod=", "chat=",
|
|
"metadata=", "upload=", "delete=", "notifications=", "legacy", "verbose",
|
|
"chat-only", "use-chat-downloader-primary", "no-chat-downloader-fallback"]
|
|
)
|
|
|
|
self.assertEqual(len(opts), 1)
|
|
self.assertEqual(opts[0], ('-q', '720p'))
|
|
|
|
def test_legacy_boolean_options(self):
|
|
"""Test legacy boolean option parsing."""
|
|
argv = ['-v', '1', '-c', '0', '-m', '1']
|
|
opts, args = getopt.getopt(
|
|
argv,
|
|
"hu:q:a:v:c:m:r:d:n:",
|
|
["help", "username=", "quality=", "ttv-lol=", "vod=", "chat=",
|
|
"metadata=", "upload=", "delete=", "notifications=", "legacy", "verbose",
|
|
"chat-only", "use-chat-downloader-primary", "no-chat-downloader-fallback"]
|
|
)
|
|
|
|
self.assertEqual(len(opts), 3)
|
|
self.assertEqual(opts[0], ('-v', '1'))
|
|
self.assertEqual(opts[1], ('-c', '0'))
|
|
self.assertEqual(opts[2], ('-m', '1'))
|
|
|
|
def test_invalid_option(self):
|
|
"""Test that invalid option raises error."""
|
|
argv = ['--invalid-option']
|
|
|
|
with self.assertRaises(getopt.GetoptError):
|
|
getopt.getopt(
|
|
argv,
|
|
"hu:q:a:v:c:m:r:d:n:",
|
|
["help", "username=", "quality=", "ttv-lol=", "vod=", "chat=",
|
|
"metadata=", "upload=", "delete=", "notifications=", "legacy", "verbose",
|
|
"chat-only", "use-chat-downloader-primary", "no-chat-downloader-fallback"]
|
|
)
|
|
|
|
def test_option_combination_username_verbose(self):
|
|
"""Test combining -u and --verbose options."""
|
|
argv = ['-u', 'testuser', '--verbose']
|
|
opts, args = getopt.getopt(
|
|
argv,
|
|
"hu:q:a:v:c:m:r:d:n:",
|
|
["help", "username=", "quality=", "ttv-lol=", "vod=", "chat=",
|
|
"metadata=", "upload=", "delete=", "notifications=", "legacy", "verbose",
|
|
"chat-only", "use-chat-downloader-primary", "no-chat-downloader-fallback"]
|
|
)
|
|
|
|
self.assertEqual(len(opts), 2)
|
|
self.assertEqual(opts[0], ('-u', 'testuser'))
|
|
self.assertEqual(opts[1], ('--verbose', ''))
|
|
|
|
def test_option_combination_all_test_flags(self):
|
|
"""Test combining all test-related flags."""
|
|
argv = ['-u', 'testuser', '--verbose', '--chat-only', '--use-chat-downloader-primary']
|
|
opts, args = getopt.getopt(
|
|
argv,
|
|
"hu:q:a:v:c:m:r:d:n:",
|
|
["help", "username=", "quality=", "ttv-lol=", "vod=", "chat=",
|
|
"metadata=", "upload=", "delete=", "notifications=", "legacy", "verbose",
|
|
"chat-only", "use-chat-downloader-primary", "no-chat-downloader-fallback"]
|
|
)
|
|
|
|
self.assertEqual(len(opts), 4)
|
|
opt_dict = dict(opts)
|
|
self.assertEqual(opt_dict['-u'], 'testuser')
|
|
self.assertIn('--verbose', opt_dict)
|
|
self.assertIn('--chat-only', opt_dict)
|
|
self.assertIn('--use-chat-downloader-primary', opt_dict)
|
|
|
|
def test_option_combination_legacy_mode_with_overrides(self):
|
|
"""Test legacy mode with multiple overrides."""
|
|
argv = ['--legacy', '-q', '720p', '-v', '1', '-c', '1', '-m', '0']
|
|
opts, args = getopt.getopt(
|
|
argv,
|
|
"hu:q:a:v:c:m:r:d:n:",
|
|
["help", "username=", "quality=", "ttv-lol=", "vod=", "chat=",
|
|
"metadata=", "upload=", "delete=", "notifications=", "legacy", "verbose",
|
|
"chat-only", "use-chat-downloader-primary", "no-chat-downloader-fallback"]
|
|
)
|
|
|
|
self.assertEqual(len(opts), 5)
|
|
opt_dict = dict(opts)
|
|
self.assertIn('--legacy', opt_dict)
|
|
self.assertEqual(opt_dict['-q'], '720p')
|
|
self.assertEqual(opt_dict['-v'], '1')
|
|
self.assertEqual(opt_dict['-c'], '1')
|
|
self.assertEqual(opt_dict['-m'], '0')
|
|
|
|
|
|
class TestOptionLogicProcessing(unittest.TestCase):
|
|
"""Test the logic that processes parsed options."""
|
|
|
|
def test_boolean_conversion_true(self):
|
|
"""Test converting '1' to boolean True."""
|
|
value = '1'
|
|
result = bool(int(value))
|
|
self.assertTrue(result)
|
|
|
|
def test_boolean_conversion_false(self):
|
|
"""Test converting '0' to boolean False."""
|
|
value = '0'
|
|
result = bool(int(value))
|
|
self.assertFalse(result)
|
|
|
|
def test_chat_only_auto_enables_verbose(self):
|
|
"""Test that chat-only mode should auto-enable verbose."""
|
|
# Simulate the logic from main()
|
|
chat_only_mode = True
|
|
verbose_mode = False
|
|
|
|
if chat_only_mode:
|
|
verbose_mode = True
|
|
|
|
self.assertTrue(verbose_mode)
|
|
|
|
def test_default_chat_downloader_fallback(self):
|
|
"""Test that chat downloader fallback defaults to enabled."""
|
|
use_chat_downloader_fallback = True # Default value
|
|
|
|
# Unless explicitly disabled
|
|
self.assertTrue(use_chat_downloader_fallback)
|
|
|
|
def test_mode_selection_legacy_with_config_json(self):
|
|
"""Test mode selection logic when config.json exists."""
|
|
# Simulate conditions
|
|
use_legacy_mode = False
|
|
legacy_config_exists = True
|
|
specific_streamer = None
|
|
global_config_exists = False
|
|
|
|
# Logic from main() function
|
|
should_use_legacy = use_legacy_mode or (
|
|
legacy_config_exists and not specific_streamer and not global_config_exists
|
|
)
|
|
|
|
self.assertTrue(should_use_legacy)
|
|
|
|
def test_mode_selection_multi_streamer_with_global_json(self):
|
|
"""Test mode selection logic when global.json exists."""
|
|
use_legacy_mode = False
|
|
legacy_config_exists = True
|
|
specific_streamer = None
|
|
global_config_exists = True
|
|
|
|
should_use_legacy = use_legacy_mode or (
|
|
legacy_config_exists and not specific_streamer and not global_config_exists
|
|
)
|
|
|
|
self.assertFalse(should_use_legacy)
|
|
|
|
def test_mode_selection_multi_streamer_with_username_flag(self):
|
|
"""Test mode selection when -u flag is used."""
|
|
use_legacy_mode = False
|
|
legacy_config_exists = True
|
|
specific_streamer = 'testuser'
|
|
global_config_exists = False
|
|
|
|
should_use_legacy = use_legacy_mode or (
|
|
legacy_config_exists and not specific_streamer and not global_config_exists
|
|
)
|
|
|
|
self.assertFalse(should_use_legacy)
|
|
|
|
def test_mode_selection_explicit_legacy_flag(self):
|
|
"""Test mode selection with explicit --legacy flag."""
|
|
use_legacy_mode = True
|
|
legacy_config_exists = False
|
|
specific_streamer = None
|
|
global_config_exists = True
|
|
|
|
should_use_legacy = use_legacy_mode or (
|
|
legacy_config_exists and not specific_streamer and not global_config_exists
|
|
)
|
|
|
|
self.assertTrue(should_use_legacy)
|
|
|
|
|
|
class TestConfigLogic(unittest.TestCase):
|
|
"""Test configuration management logic."""
|
|
|
|
def test_comment_filtering_logic(self):
|
|
"""Test that comment fields are filtered out."""
|
|
user_config = {
|
|
'_comment': 'This is a comment',
|
|
'quality': '720p',
|
|
'_note': 'Another comment',
|
|
'username': 'testuser'
|
|
}
|
|
|
|
# Apply the filtering logic (same as in ConfigManager)
|
|
filtered = {k: v for k, v in user_config.items() if not k.startswith('_')}
|
|
|
|
self.assertNotIn('_comment', filtered)
|
|
self.assertNotIn('_note', filtered)
|
|
self.assertIn('quality', filtered)
|
|
self.assertIn('username', filtered)
|
|
self.assertEqual(filtered['quality'], '720p')
|
|
|
|
def test_schema_filtering_logic(self):
|
|
"""Test that $schema field is filtered out."""
|
|
user_config = {
|
|
'$schema': './config.schema.json',
|
|
'quality': '720p',
|
|
'username': 'testuser'
|
|
}
|
|
|
|
# Apply the filtering logic (same as in ConfigManager)
|
|
filtered = {k: v for k, v in user_config.items()
|
|
if not k.startswith('_') and k != '$schema'}
|
|
|
|
self.assertNotIn('$schema', filtered)
|
|
self.assertIn('quality', filtered)
|
|
self.assertIn('username', filtered)
|
|
|
|
def test_config_merging_logic(self):
|
|
"""Test config merging logic (streamer overrides global)."""
|
|
global_config = {
|
|
'quality': '720p',
|
|
'downloadVOD': True,
|
|
'downloadCHAT': False,
|
|
'username': 'default'
|
|
}
|
|
|
|
streamer_config = {
|
|
'username': 'specificstreamer',
|
|
'quality': 'source', # Override
|
|
# downloadVOD and downloadCHAT inherited from global
|
|
}
|
|
|
|
# Simulate merging (same as in ConfigManager.load_streamer_config)
|
|
merged = global_config.copy()
|
|
merged.update(streamer_config)
|
|
|
|
# Check overrides
|
|
self.assertEqual(merged['quality'], 'source') # Overridden
|
|
self.assertEqual(merged['username'], 'specificstreamer') # Overridden
|
|
# Check inherited values
|
|
self.assertTrue(merged['downloadVOD'])
|
|
self.assertFalse(merged['downloadCHAT'])
|
|
|
|
def test_default_config_structure(self):
|
|
"""Test that DEFAULT_CONFIG has expected keys."""
|
|
self.assertIn('username', DEFAULT_CONFIG)
|
|
self.assertIn('quality', DEFAULT_CONFIG)
|
|
self.assertIn('downloadVOD', DEFAULT_CONFIG)
|
|
self.assertIn('downloadCHAT', DEFAULT_CONFIG)
|
|
self.assertIn('downloadMETADATA', DEFAULT_CONFIG)
|
|
self.assertIn('uploadCloud', DEFAULT_CONFIG)
|
|
self.assertIn('deleteFiles', DEFAULT_CONFIG)
|
|
self.assertIn('notifications', DEFAULT_CONFIG)
|
|
self.assertIn('refresh', DEFAULT_CONFIG)
|
|
|
|
def test_enabled_flag_filtering_logic(self):
|
|
"""Test filtering streamers by enabled flag."""
|
|
streamers = [
|
|
{'username': 'streamer1', 'enabled': True},
|
|
{'username': 'streamer2', 'enabled': True},
|
|
{'username': 'streamer3', 'enabled': False},
|
|
{'username': 'streamer4'}, # No enabled field
|
|
]
|
|
|
|
# Simulate filtering logic (same as in ConfigManager.get_all_enabled_streamers)
|
|
enabled_streamers = [
|
|
s['username'] for s in streamers
|
|
if s.get('enabled', True) # Default to True if not specified
|
|
]
|
|
|
|
self.assertIn('streamer1', enabled_streamers)
|
|
self.assertIn('streamer2', enabled_streamers)
|
|
self.assertNotIn('streamer3', enabled_streamers)
|
|
self.assertIn('streamer4', enabled_streamers) # Default enabled
|
|
|
|
def test_default_streamer_config_structure(self):
|
|
"""Test default streamer config structure."""
|
|
# Simulate what create_default_streamer_config creates
|
|
username = 'newstreamer'
|
|
default_config = {
|
|
"$schema": "../streamer.schema.json",
|
|
"username": username,
|
|
"enabled": True
|
|
}
|
|
|
|
self.assertEqual(default_config['username'], username)
|
|
self.assertTrue(default_config['enabled'])
|
|
self.assertIn('$schema', default_config)
|
|
|
|
|
|
class TestFileManagerUploadPaths(unittest.TestCase):
|
|
"""Test rclone upload path preparation."""
|
|
|
|
def test_build_upload_relative_paths_uses_forward_slashes(self):
|
|
"""Rclone --files-from entries must use POSIX separators on Windows."""
|
|
with tempfile.TemporaryDirectory() as temp_dir:
|
|
manager = FileManager(
|
|
root_path=temp_dir,
|
|
username='testuser',
|
|
config={
|
|
'uploadCloud': True,
|
|
'uploadPreMergeVideo': True,
|
|
'uploadMergedVideo': True,
|
|
'uploadChatVideo': True
|
|
}
|
|
)
|
|
|
|
relative_paths = manager._build_upload_relative_paths('20260424_12h00m00s')
|
|
|
|
self.assertTrue(relative_paths)
|
|
self.assertTrue(all('\\' not in path for path in relative_paths))
|
|
self.assertIn('testuser/metadata/METADA_20260424_12h00m00s.json', relative_paths)
|
|
self.assertIn('testuser/chat/json/CHAT_20260424_12h00m00s.json', relative_paths)
|
|
|
|
|
|
class TestDownloaderConfiguration(unittest.TestCase):
|
|
"""Regression tests for downloader config wiring."""
|
|
|
|
def test_download_vod_method_not_shadowed_by_boolean_flag(self):
|
|
"""Config booleans must not overwrite callable downloader methods."""
|
|
downloader = ContentDownloader(
|
|
twitch_downloader_path='TwitchDownloaderCLI',
|
|
ffmpeg_path='ffmpeg',
|
|
config={
|
|
'downloadVOD': True,
|
|
'downloadCHAT': True,
|
|
'downloadLiveCHAT': True
|
|
}
|
|
)
|
|
|
|
self.assertTrue(callable(downloader.download_vod))
|
|
self.assertTrue(downloader.download_vod_enabled)
|
|
|
|
|
|
class TestLinuxToolResolution(unittest.TestCase):
|
|
"""Ensure Linux containers prefer their own installed toolchain."""
|
|
|
|
@patch('modules.utils.shutil.which')
|
|
def test_linux_prefers_system_ffmpeg(self, mock_which):
|
|
mock_which.return_value = '/usr/bin/ffmpeg'
|
|
|
|
self.assertEqual(get_ffmpeg_executable('linux'), '/usr/bin/ffmpeg')
|
|
|
|
@patch('modules.utils.shutil.which')
|
|
def test_linux_prefers_system_twitch_downloader(self, mock_which):
|
|
mock_which.return_value = '/usr/local/bin/TwitchDownloaderCLI'
|
|
|
|
self.assertEqual(get_twitch_downloader_executable('linux'), '/usr/local/bin/TwitchDownloaderCLI')
|
|
|
|
@patch('modules.utils.os.path.exists', return_value=False)
|
|
@patch('modules.utils.shutil.which', return_value=None)
|
|
def test_linux_auto_hwaccel_falls_back_to_software_without_runtime(self, _mock_which, _mock_exists):
|
|
detected = detect_hardware_acceleration('auto', 'linux')
|
|
|
|
self.assertEqual(detected, 'none')
|
|
self.assertEqual(resolve_hwaccel_type(detected, 'linux'), 'none')
|
|
|
|
@patch('modules.utils.shutil.which', return_value='/usr/bin/nvidia-smi')
|
|
def test_linux_auto_hwaccel_uses_nvenc_when_nvidia_runtime_visible(self, _mock_which):
|
|
detected = detect_hardware_acceleration('auto', 'linux')
|
|
|
|
self.assertEqual(detected, 'nvenc')
|
|
|
|
|
|
class TestChatRenderBehavior(unittest.TestCase):
|
|
"""Regression tests for chat rendering defaults and retry behavior."""
|
|
|
|
@patch('modules.downloader.sys.platform', 'linux')
|
|
@patch('modules.downloader.subprocess.Popen')
|
|
def test_linux_chat_render_uses_container_safe_font(self, mock_popen):
|
|
with tempfile.TemporaryDirectory() as temp_dir:
|
|
json_path = os.path.join(temp_dir, 'chat.json')
|
|
video_path = os.path.join(temp_dir, 'chat.mp4')
|
|
|
|
with open(json_path, 'w', encoding='utf-8') as handle:
|
|
json.dump(
|
|
{
|
|
'comments': [
|
|
{
|
|
'message': {'body': 'hello world ' * 20},
|
|
'commenter': {'display_name': 'tester'}
|
|
}
|
|
]
|
|
},
|
|
handle
|
|
)
|
|
|
|
class FakeProcess:
|
|
def __init__(self, output_path: str):
|
|
self.stdout = iter(['rendering'])
|
|
self.output_path = output_path
|
|
|
|
def wait(self):
|
|
with open(self.output_path, 'wb') as handle:
|
|
handle.write(b'x' * 2048)
|
|
return 0
|
|
|
|
captured = {}
|
|
|
|
def build_process(cmd, **_kwargs):
|
|
captured['cmd'] = cmd
|
|
return FakeProcess(video_path)
|
|
|
|
mock_popen.side_effect = build_process
|
|
|
|
downloader = ContentDownloader('TwitchDownloaderCLI', 'ffmpeg', {'downloadCHAT': True})
|
|
|
|
result = downloader.render_chat(json_path, video_path, '-c:v libx264 "{save_path}"')
|
|
|
|
self.assertTrue(result)
|
|
self.assertIn('DejaVu Sans', captured['cmd'])
|
|
|
|
|
|
class TestMultiStreamerCleanupRegression(unittest.TestCase):
|
|
"""Regression tests for multi-streamer conversion and cleanup behavior."""
|
|
|
|
def setUp(self):
|
|
self.module = load_twitch_archive_module()
|
|
|
|
def _build_archiver(self, temp_dir: str, upload_cloud: bool = True):
|
|
archiver = MagicMock()
|
|
archiver.username = 'maddoscientist0'
|
|
archiver.os_type = 'linux'
|
|
archiver.quality = 'best'
|
|
archiver.downloadLiveCHAT = False
|
|
archiver.downloadCHAT = False
|
|
archiver.downloadVOD = False
|
|
archiver.downloadMETADATA = False
|
|
archiver.mergeVideoChat = False
|
|
archiver.mergeChatLayout = 'side-by-side'
|
|
archiver.onlyRaw = False
|
|
archiver.vodTimeout = 0
|
|
archiver.shutdown_requested = False
|
|
archiver.deleteFiles = True
|
|
archiver.uploadCloud = upload_cloud
|
|
|
|
archiver.notification_manager = MagicMock()
|
|
archiver.recorder = MagicMock()
|
|
archiver.processor = MagicMock()
|
|
archiver.downloader = MagicMock()
|
|
archiver.stream_monitor = MagicMock()
|
|
archiver.file_manager = MagicMock()
|
|
archiver.file_manager.raw_path = Path(temp_dir) / 'raw'
|
|
archiver.file_manager.video_path = Path(temp_dir) / 'video'
|
|
archiver.file_manager.chat_json_path = Path(temp_dir) / 'chat_json'
|
|
archiver.file_manager.chat_mp4_path = Path(temp_dir) / 'chat'
|
|
|
|
os.makedirs(archiver.file_manager.raw_path, exist_ok=True)
|
|
os.makedirs(archiver.file_manager.video_path, exist_ok=True)
|
|
os.makedirs(archiver.file_manager.chat_json_path, exist_ok=True)
|
|
os.makedirs(archiver.file_manager.chat_mp4_path, exist_ok=True)
|
|
|
|
return archiver
|
|
|
|
def test_process_stream_keeps_raw_when_conversion_fails(self):
|
|
manager = self.module.TwitchArchiveManager(specific_streamer='maddoscientist0')
|
|
|
|
with tempfile.TemporaryDirectory() as temp_dir:
|
|
archiver = self._build_archiver(temp_dir)
|
|
archiver.processor.process_raw_stream.return_value = False
|
|
archiver.file_manager.upload_to_cloud.return_value = True
|
|
|
|
def write_raw_file(_stream_info, raw_path):
|
|
with open(raw_path, 'wb') as handle:
|
|
handle.write(b'x' * 4096)
|
|
return True
|
|
|
|
archiver.recorder.record.side_effect = write_raw_file
|
|
|
|
stream_info = {
|
|
'title': 'Test',
|
|
'createdAt': '2026-04-25T09:14:01Z'
|
|
}
|
|
|
|
manager._process_stream(archiver, stream_info, 'stream-id')
|
|
|
|
archiver.file_manager.clean_raw_file.assert_not_called()
|
|
archiver.file_manager.delete_local_files.assert_called_once()
|
|
|
|
def test_process_stream_only_deletes_rendered_files_after_real_upload(self):
|
|
manager = self.module.TwitchArchiveManager(specific_streamer='maddoscientist0')
|
|
|
|
with tempfile.TemporaryDirectory() as temp_dir:
|
|
archiver = self._build_archiver(temp_dir, upload_cloud=False)
|
|
archiver.processor.process_raw_stream.return_value = True
|
|
archiver.file_manager.upload_to_cloud.return_value = True
|
|
|
|
def write_raw_file(_stream_info, raw_path):
|
|
with open(raw_path, 'wb') as handle:
|
|
handle.write(b'x' * 4096)
|
|
return True
|
|
|
|
archiver.recorder.record.side_effect = write_raw_file
|
|
|
|
stream_info = {
|
|
'title': 'Test',
|
|
'createdAt': '2026-04-25T09:14:01Z'
|
|
}
|
|
|
|
manager._process_stream(archiver, stream_info, 'stream-id')
|
|
|
|
archiver.file_manager.clean_raw_file.assert_called_once()
|
|
archiver.file_manager.delete_local_files.assert_not_called()
|
|
|
|
upload_filename_base = archiver.file_manager.upload_to_cloud.call_args.args[0]
|
|
self.assertFalse(upload_filename_base.startswith('LIVE_'))
|
|
|
|
def test_process_stream_preserves_files_when_chat_render_fails(self):
|
|
manager = self.module.TwitchArchiveManager(specific_streamer='maddoscientist0')
|
|
|
|
with tempfile.TemporaryDirectory() as temp_dir:
|
|
archiver = self._build_archiver(temp_dir)
|
|
archiver.downloadCHAT = True
|
|
archiver.downloadVOD = True
|
|
archiver.vodTimeout = 30
|
|
archiver.processor.process_raw_stream.return_value = True
|
|
archiver.file_manager.upload_to_cloud.return_value = True
|
|
archiver.downloader.last_chat_render_attempted = False
|
|
archiver.downloader.last_chat_render_succeeded = False
|
|
|
|
def write_raw_file(_stream_info, raw_path):
|
|
with open(raw_path, 'wb') as handle:
|
|
handle.write(b'x' * 4096)
|
|
return True
|
|
|
|
def failed_chat_render(*_args, **_kwargs):
|
|
archiver.downloader.last_chat_render_attempted = True
|
|
archiver.downloader.last_chat_render_succeeded = False
|
|
return False
|
|
|
|
archiver.recorder.record.side_effect = write_raw_file
|
|
archiver.downloader.download_and_render_chat.side_effect = failed_chat_render
|
|
archiver.stream_monitor.get_latest_vod.return_value = {
|
|
'data': {
|
|
'user': {
|
|
'videos': {
|
|
'edges': [
|
|
{
|
|
'node': {
|
|
'id': '2756589076',
|
|
'title': 'Test',
|
|
'recordedAt': '2026-04-25T09:14:01Z'
|
|
}
|
|
}
|
|
]
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
stream_info = {
|
|
'title': 'Test',
|
|
'createdAt': '2026-04-25T09:14:01Z'
|
|
}
|
|
|
|
manager._process_stream(archiver, stream_info, 'stream-id')
|
|
|
|
archiver.file_manager.clean_raw_file.assert_not_called()
|
|
archiver.file_manager.delete_local_files.assert_not_called()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
# Run tests with verbose output
|
|
print("="*70)
|
|
print("TWITCH ARCHIVE - Unit Tests for Options and Configuration")
|
|
print("="*70)
|
|
print()
|
|
unittest.main(verbosity=2)
|