TwitchDownloader/modules/stream_monitor.py
MaddoScientisto f97e0200d6 Refactor downloader and file manager for improved rclone integration and add healthcheck and smoke test options
- Renamed download flags in ContentDownloader for clarity.
- Enhanced FileManager with methods to build upload paths and verify existing files for rclone uploads.
- Updated StreamProcessor to return success status for stream processing.
- Added rclone smoke test and healthcheck functions to validate configuration and tool availability.
- Improved environment variable handling with a utility function.
- Updated TwitchArchive to incorporate new rclone verification and processing logic.
- Added unit tests for new functionality and refactored existing tests for clarity and coverage.

Co-authored-by: Copilot <copilot@github.com>
2026-04-25 11:54:03 +02:00

171 lines
6.4 KiB
Python

"""
Stream monitoring and API interaction for Twitch Archive.
"""
import os
import sys
from typing import Dict, Optional, Any
import requests
from colorama import Fore, Style
from .constants import TWITCH_OAUTH_URL, TWITCH_API_URL, TWITCH_GQL_URL, TWITCH_GQL_CLIENT_ID
from .utils import get_env_value
class StreamMonitor:
"""Handles Twitch API interactions for monitoring stream status."""
def __init__(self, username: str):
"""
Initialize the stream monitor.
Args:
username: Twitch username to monitor
"""
self.username = username
self._oauth_token = None
def get_oauth_token(self) -> str:
"""
Get OAuth token from Twitch API.
Uses CLIENT-ID and CLIENT-SECRET from environment variables.
Returns:
str: OAuth access token
Raises:
SystemExit: If authentication fails
"""
if self._oauth_token:
return self._oauth_token
try:
client_id = get_env_value('CLIENT-ID', 'CLIENT_ID')
client_secret = get_env_value('CLIENT-SECRET', 'CLIENT_SECRET')
url = f"{TWITCH_OAUTH_URL}?client_id={client_id}&client_secret={client_secret}&grant_type=client_credentials"
response = requests.post(url, timeout=15)
response.raise_for_status()
self._oauth_token = response.json()['access_token']
return self._oauth_token
except requests.exceptions.RequestException as e:
print(f'{Fore.RED}✗ ERROR: Failed to authenticate with Twitch API{Style.RESET_ALL}')
print(f'{Fore.YELLOW} {str(e)}{Style.RESET_ALL}')
print(f'{Fore.CYAN} → Check your CLIENT-ID and CLIENT-SECRET in the .env file{Style.RESET_ALL}')
sys.exit(1)
except KeyError:
print(f'{Fore.RED}✗ ERROR: Invalid response from Twitch API{Style.RESET_ALL}')
print(f'{Fore.CYAN} → Verify your CLIENT-ID and CLIENT-SECRET are correct{Style.RESET_ALL}')
sys.exit(1)
def validate_username(self) -> bool:
"""
Validate that the configured Twitch username exists.
Returns:
bool: True if username exists, False otherwise
Raises:
SystemExit: If username is invalid or doesn't exist
"""
try:
url = f'{TWITCH_API_URL}/users?login={self.username}'
headers = {
"Authorization": f"Bearer {self.get_oauth_token()}",
"Client-ID": get_env_value('CLIENT-ID', 'CLIENT_ID')
}
response = requests.get(url, headers=headers, timeout=15)
response.raise_for_status()
data = response.json()
if not data.get('data'):
print(f'{Fore.RED}✗ ERROR: Twitch user "{self.username}" not found{Style.RESET_ALL}')
print(f'{Fore.CYAN} → Check the username in your config file{Style.RESET_ALL}')
sys.exit(1)
print(f'{Fore.GREEN}✓ Username "{self.username}" validated{Style.RESET_ALL}')
return True
except requests.exceptions.RequestException as e:
print(f'{Fore.RED}✗ ERROR: Could not validate username{Style.RESET_ALL}')
print(f'{Fore.YELLOW} {str(e)}{Style.RESET_ALL}')
sys.exit(1)
def check_stream_status(self) -> Optional[Dict[str, Any]]:
"""
Check if the configured user is currently live.
Returns:
dict: Stream information if live, None if offline
Raises:
SystemExit: If API request fails
"""
query = f'query{{user(login: "{self.username}") {{stream{{archiveVideo{{id}}title createdAt}}}}}}'
try:
response = requests.post(
TWITCH_GQL_URL,
json={'query': query},
headers={"Client-ID": TWITCH_GQL_CLIENT_ID},
timeout=15
)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
print(f'{Fore.RED}✗ ERROR: Failed to check stream status{Style.RESET_ALL}')
print(f'{Fore.YELLOW} {str(e)}{Style.RESET_ALL}')
sys.exit(1)
def is_user_live(self) -> bool:
"""
Check if the configured user is currently live.
Returns:
bool: True if user is live, False if offline
Raises:
Exception: If API request fails (caller should handle)
"""
query = f'query{{user(login: "{self.username}") {{stream{{id title}}}}}}'
try:
response = requests.post(
TWITCH_GQL_URL,
json={'query': query},
headers={"Client-ID": TWITCH_GQL_CLIENT_ID},
timeout=15
)
response.raise_for_status()
data = response.json()
stream_data = data.get('data', {}).get('user', {}).get('stream')
return stream_data is not None
except requests.exceptions.RequestException as e:
# Don't exit, let caller handle this
raise Exception(f"Failed to check if user is live: {str(e)}")
def get_latest_vod(self) -> Optional[Dict[str, Any]]:
"""
Get the most recent VOD for the configured user.
Returns:
dict: VOD information, or None if no VODs found
"""
query = f'query {{user(login: "{self.username}") {{videos(first: 1) {{edges {{node {{id title description recordedAt lengthSeconds animatedPreviewURL previewThumbnailURL(height: 1280, width: 720) thumbnailURLs(height: 1280, width: 720)}}}}}}}}}}'
try:
response = requests.post(
TWITCH_GQL_URL,
json={'query': query},
headers={"Client-ID": TWITCH_GQL_CLIENT_ID},
timeout=15
)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
print(f'{Fore.YELLOW}⚠ Warning: Could not fetch latest VOD{Style.RESET_ALL}')
print(f'{Fore.YELLOW} {str(e)}{Style.RESET_ALL}')
return None