TwitchDownloader/modules/file_manager.py

237 lines
9.2 KiB
Python

"""
Cloud storage and file management for Twitch Archive.
"""
import os
import json
import pathlib
import subprocess
from typing import List
from colorama import Fore, Style
from .constants import PREFIX_LIVE, PREFIX_VOD, PREFIX_CHAT, PREFIX_METADATA
from .utils import get_bin_path
class FileManager:
"""Handles file operations, cloud uploads, and cleanup."""
def __init__(self, root_path: str, username: str, config: dict):
"""
Initialize the file manager.
Args:
root_path: Root directory for archives
username: Twitch username
config: Configuration dictionary
"""
self.root_path = pathlib.Path(root_path)
self.username = username
self.upload_cloud = config.get('uploadCloud', True)
self.delete_files = config.get('deleteFiles', False)
self.clean_raw = config.get('cleanRaw', True)
self.download_vod = config.get('downloadVOD', True)
self.download_chat = config.get('downloadCHAT', True)
self.download_metadata = config.get('downloadMETADATA', True)
self.rclone_path = config.get('rclone_path', 'remote:path')
# Initialize paths
self.raw_path = self.root_path / username / "video" / "raw"
self.video_path = self.root_path / username / "video"
self.chat_json_path = self.root_path / username / "chat" / "json"
self.chat_mp4_path = self.root_path / username / "chat"
self.metadata_path = self.root_path / username / "metadata"
self.log_file = self.root_path / ".log"
def initialize_directories(self) -> None:
"""Create all necessary directory structures."""
for path in [self.raw_path, self.video_path, self.chat_json_path,
self.chat_mp4_path, self.metadata_path]:
path.mkdir(parents=True, exist_ok=True)
# Create log file if it doesn't exist
if not self.log_file.exists():
self.log_file.touch()
def is_stream_processed(self, stream_id: str) -> bool:
"""
Check if a stream has already been processed.
Args:
stream_id: Unique identifier for the stream
Returns:
bool: True if already processed, False otherwise
"""
with open(self.log_file, 'r', encoding='utf-8') as f:
return stream_id in f.read()
def mark_stream_processed(self, stream_id: str) -> None:
"""Add stream to log file to prevent re-processing."""
with open(self.log_file, 'a', encoding='utf-8') as f:
f.write(f"{stream_id}\n")
def save_metadata(self, vod_info: dict, filename_base: str) -> None:
"""
Save VOD metadata to JSON file.
Args:
vod_info: VOD metadata from Twitch API
filename_base: Base filename (without extension)
"""
if not self.download_metadata:
return
metadata_path = self.metadata_path / f"{PREFIX_METADATA}{filename_base}.json"
with open(metadata_path, 'w', encoding='utf-8') as f:
json.dump(vod_info, f, ensure_ascii=False, indent=4)
print(f'{Fore.GREEN}✓ Metadata saved{Style.RESET_ALL}')
def clean_raw_file(self, raw_path: str) -> None:
"""
Delete raw .ts file if configured.
Args:
raw_path: Path to raw file
"""
if self.clean_raw and os.path.exists(raw_path):
print(f'{Fore.YELLOW}Deleting raw .ts file...{Style.RESET_ALL}')
os.remove(raw_path)
def upload_to_cloud(self, filename_base: str, notification_callback=None) -> bool:
"""
Upload archived files to cloud storage using rclone.
Args:
filename_base: Base filename (without prefixes/extensions)
notification_callback: Optional callback to send notifications
Returns:
bool: True if upload succeeded or is disabled, False if failed
"""
if not self.upload_cloud:
return True
print(f'\n{Fore.CYAN}Uploading to cloud storage...{Style.RESET_ALL}')
if notification_callback:
notification_callback(f'☁ Uploading - {filename_base}', 'Uploading files to cloud storage')
# Create list of files to upload
bin_path = get_bin_path()
upload_list_path = os.path.join(bin_path, 'temp', 'upload.txt')
# Ensure temp directory exists
os.makedirs(os.path.dirname(upload_list_path), exist_ok=True)
files_to_upload = [
f"{PREFIX_LIVE}{filename_base}.ts",
f"{PREFIX_LIVE}{filename_base}.mp4",
f"{PREFIX_LIVE}{filename_base}.mp3",
f"{PREFIX_VOD}{filename_base}.ts",
f"{PREFIX_VOD}{filename_base}.mp4",
f"{PREFIX_VOD}{filename_base}.mp3",
f"{PREFIX_METADATA}{filename_base}.json",
f"{PREFIX_CHAT}{filename_base}.json",
f"{PREFIX_CHAT}{filename_base}.mp4"
]
with open(upload_list_path, 'w') as f:
f.write('\n'.join(files_to_upload))
# Run rclone
try:
result = subprocess.call([
'rclone', 'copy',
str(self.root_path.resolve()),
self.rclone_path,
'--include-from', upload_list_path
])
# Clean up upload list
if os.path.exists(upload_list_path):
os.remove(upload_list_path)
if result == 0:
print(f'{Fore.GREEN}✓ Upload complete{Style.RESET_ALL}')
if notification_callback:
notification_callback(f'✓ Upload Success - {filename_base}', 'All files uploaded successfully')
return True
else:
print(f'{Fore.RED}✗ Upload failed (exit code: {result}){Style.RESET_ALL}')
print(f'{Fore.YELLOW}Files preserved locally due to upload failure{Style.RESET_ALL}')
if notification_callback:
notification_callback(f'✗ Upload Failed - {filename_base}',
f'Upload failed with code {result}. Files preserved locally.')
return False
except Exception as e:
print(f'{Fore.RED}✗ Upload error: {str(e)}{Style.RESET_ALL}')
return False
def delete_local_files(self, filename_base: str, live_raw_path: str,
live_proc_path: str, notification_callback=None) -> None:
"""
Delete local archive files after successful upload.
Args:
filename_base: Base filename (without prefixes/extensions)
live_raw_path: Path to live raw file
live_proc_path: Path to live processed file
notification_callback: Optional callback to send notifications
"""
print(f'\n{Fore.RED}{"=" * 60}{Style.RESET_ALL}')
print(f'{Fore.RED}⚠ DELETING LOCAL FILES{Style.RESET_ALL}')
print(f'{Fore.RED}{"=" * 60}{Style.RESET_ALL}\n')
if notification_callback:
notification_callback(f'🗑 Deleting - {filename_base}',
'Deleting local files after successful upload')
files_to_delete: List[str] = []
# Live files
if not self.clean_raw and os.path.exists(live_raw_path):
files_to_delete.append(live_raw_path)
if os.path.exists(live_proc_path):
files_to_delete.append(live_proc_path)
# VOD files
if self.download_vod:
vod_raw = self.raw_path / f"{PREFIX_VOD}{filename_base}.ts"
vod_mp4 = self.video_path / f"{PREFIX_VOD}{filename_base}.mp4"
vod_mp3 = self.video_path / f"{PREFIX_VOD}{filename_base}.mp3"
if not self.clean_raw and vod_raw.exists():
files_to_delete.append(str(vod_raw))
if vod_mp4.exists():
files_to_delete.append(str(vod_mp4))
if vod_mp3.exists():
files_to_delete.append(str(vod_mp3))
# Chat files
if self.download_chat:
chat_json = self.chat_json_path / f"{PREFIX_CHAT}{filename_base}.json"
chat_mp4 = self.chat_mp4_path / f"{PREFIX_CHAT}{filename_base}.mp4"
if chat_json.exists():
files_to_delete.append(str(chat_json))
if chat_mp4.exists():
files_to_delete.append(str(chat_mp4))
# Metadata files
if self.download_metadata:
metadata = self.metadata_path / f"{PREFIX_METADATA}{filename_base}.json"
if metadata.exists():
files_to_delete.append(str(metadata))
# Delete all files
for filepath in files_to_delete:
try:
print(f'{Fore.RED} Deleting: {os.path.basename(filepath)}{Style.RESET_ALL}')
os.remove(filepath)
except Exception as e:
print(f'{Fore.YELLOW} ⚠ Failed to delete {filepath}: {e}{Style.RESET_ALL}')
print(f'{Fore.RED}\n✓ Cleanup complete{Style.RESET_ALL}')