TwitchDownloader/modules/file_manager.py

290 lines
12 KiB
Python

"""
Cloud storage and file management for Twitch Archive.
"""
import os
import json
import pathlib
import subprocess
from typing import List
from colorama import Fore, Style
from .constants import PREFIX_LIVE, PREFIX_VOD, PREFIX_CHAT, PREFIX_METADATA, PREFIX_MERGED
from .utils import get_bin_path
class FileManager:
"""Handles file operations, cloud uploads, and cleanup."""
def __init__(self, root_path: str, username: str, config: dict):
"""
Initialize the file manager.
Args:
root_path: Root directory for archives
username: Twitch username
config: Configuration dictionary
"""
self.root_path = pathlib.Path(root_path)
self.username = username
self.upload_cloud = config.get('uploadCloud', True)
self.upload_pre_merge_video = config.get('uploadPreMergeVideo', True)
self.upload_merged_video = config.get('uploadMergedVideo', True)
self.upload_chat_video = config.get('uploadChatVideo', True)
self.delete_files = config.get('deleteFiles', False)
self.clean_raw = config.get('cleanRaw', True)
self.download_vod = config.get('downloadVOD', True)
self.download_chat = config.get('downloadCHAT', True)
self.download_metadata = config.get('downloadMETADATA', True)
self.rclone_path = config.get('rclone_path', 'remote:path')
# Initialize paths
self.raw_path = self.root_path / username / "video" / "raw"
self.video_path = self.root_path / username / "video"
self.chat_json_path = self.root_path / username / "chat" / "json"
self.chat_mp4_path = self.root_path / username / "chat"
self.metadata_path = self.root_path / username / "metadata"
self.log_file = self.root_path / ".log"
def initialize_directories(self) -> None:
"""Create all necessary directory structures."""
for path in [self.raw_path, self.video_path, self.chat_json_path,
self.chat_mp4_path, self.metadata_path]:
path.mkdir(parents=True, exist_ok=True)
# Create log file if it doesn't exist
if not self.log_file.exists():
self.log_file.touch()
def is_stream_processed(self, stream_id: str) -> bool:
"""
Check if a stream has already been processed.
Args:
stream_id: Unique identifier for the stream
Returns:
bool: True if already processed, False otherwise
"""
with open(self.log_file, 'r', encoding='utf-8') as f:
return stream_id in f.read()
def mark_stream_processed(self, stream_id: str) -> None:
"""Add stream to log file to prevent re-processing."""
with open(self.log_file, 'a', encoding='utf-8') as f:
f.write(f"{stream_id}\n")
def save_metadata(self, vod_info: dict, filename_base: str) -> None:
"""
Save VOD metadata to JSON file.
Args:
vod_info: VOD metadata from Twitch API
filename_base: Base filename (without extension)
"""
if not self.download_metadata:
return
metadata_path = self.metadata_path / f"{PREFIX_METADATA}{filename_base}.json"
with open(metadata_path, 'w', encoding='utf-8') as f:
json.dump(vod_info, f, ensure_ascii=False, indent=4)
print(f'{Fore.GREEN}✓ Metadata saved{Style.RESET_ALL}')
def clean_raw_file(self, raw_path: str) -> None:
"""
Delete raw .ts file if configured.
Args:
raw_path: Path to raw file
"""
if self.clean_raw and os.path.exists(raw_path):
print(f'{Fore.YELLOW}Deleting raw .ts file...{Style.RESET_ALL}')
os.remove(raw_path)
def upload_to_cloud(self, filename_base: str, notification_callback=None) -> bool:
"""
Upload archived files to cloud storage using rclone.
Args:
filename_base: Base filename (without prefixes/extensions)
notification_callback: Optional callback to send notifications
Returns:
bool: True if upload succeeded or is disabled, False if failed
"""
if not self.upload_cloud:
return True
print(f'\n{Fore.CYAN}Uploading to cloud storage...{Style.RESET_ALL}')
if notification_callback:
notification_callback(f'☁ Uploading - {filename_base}', 'Uploading files to cloud storage')
# Create list of files to upload
bin_path = get_bin_path()
upload_list_path = os.path.join(bin_path, 'temp', 'upload.txt')
# Ensure temp directory exists
os.makedirs(os.path.dirname(upload_list_path), exist_ok=True)
files_to_upload = []
# Build files list relative to root_path so rclone can read them with --files-from
# Metadata and chat JSON
files_to_upload.append(os.path.join(self.username, 'metadata', f"{PREFIX_METADATA}{filename_base}.json"))
files_to_upload.append(os.path.join(self.username, 'chat', 'json', f"{PREFIX_CHAT}{filename_base}.json"))
# Pre-merge videos (raw .ts in video/raw, mp4/mp3 in video)
if self.upload_pre_merge_video:
files_to_upload.extend([
os.path.join(self.username, 'video', 'raw', f"{PREFIX_LIVE}{filename_base}.ts"),
os.path.join(self.username, 'video', f"{PREFIX_LIVE}{filename_base}.mp4"),
os.path.join(self.username, 'video', f"{PREFIX_LIVE}{filename_base}.mp3"),
os.path.join(self.username, 'video', 'raw', f"{PREFIX_VOD}{filename_base}.ts"),
os.path.join(self.username, 'video', f"{PREFIX_VOD}{filename_base}.mp4"),
os.path.join(self.username, 'video', f"{PREFIX_VOD}{filename_base}.mp3")
])
# Merged videos (in video folder)
if self.upload_merged_video:
files_to_upload.extend([
os.path.join(self.username, 'video', f"{PREFIX_MERGED}{filename_base}.mp4"),
os.path.join(self.username, 'video', f"{PREFIX_MERGED}{filename_base}.mp3"),
os.path.join(self.username, 'video', f"{PREFIX_MERGED}{PREFIX_VOD}{filename_base}.mp4"),
os.path.join(self.username, 'video', f"{PREFIX_MERGED}{PREFIX_VOD}{filename_base}.mp3")
])
# Standalone chat video (in chat folder)
if self.upload_chat_video:
files_to_upload.append(os.path.join(self.username, 'chat', f"{PREFIX_CHAT}{filename_base}.mp4"))
with open(upload_list_path, 'w') as f:
f.write('\n'.join(files_to_upload))
# Run rclone using --files-from so the listed paths (relative to root_path) are uploaded.
try:
cmd = [
'rclone', 'copy',
str(self.root_path.resolve()),
self.rclone_path,
'--files-from', upload_list_path
]
# Stream rclone output to console so user can see progress/errors
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
if proc.stdout:
for line in proc.stdout:
print(line, end='')
proc.wait()
result = proc.returncode
# Clean up upload list
if os.path.exists(upload_list_path):
os.remove(upload_list_path)
if result == 0:
print(f'{Fore.GREEN}✓ Upload complete{Style.RESET_ALL}')
if notification_callback:
notification_callback(f'✓ Upload Success - {filename_base}', 'All files uploaded successfully')
return True
else:
print(f'{Fore.RED}✗ Upload failed (exit code: {result}){Style.RESET_ALL}')
print(f'{Fore.YELLOW}Files preserved locally due to upload failure{Style.RESET_ALL}')
if notification_callback:
notification_callback(f'✗ Upload Failed - {filename_base}',
f'Upload failed with code {result}. Files preserved locally.')
return False
except Exception as e:
print(f'{Fore.RED}✗ Upload error: {str(e)}{Style.RESET_ALL}')
return False
def delete_local_files(self, filename_base: str, live_raw_path: str,
live_proc_path: str, notification_callback=None) -> None:
"""
Delete local archive files after successful upload.
Only deletes files that were configured to be uploaded.
Args:
filename_base: Base filename (without prefixes/extensions)
live_raw_path: Path to live raw file
live_proc_path: Path to live processed file
notification_callback: Optional callback to send notifications
"""
print(f'\n{Fore.RED}{"=" * 60}{Style.RESET_ALL}')
print(f'{Fore.RED}⚠ DELETING LOCAL FILES{Style.RESET_ALL}')
print(f'{Fore.RED}{"=" * 60}{Style.RESET_ALL}\n')
if notification_callback:
notification_callback(f'🗑 Deleting - {filename_base}',
'Deleting local files after successful upload')
files_to_delete: List[str] = []
# Live files (only if pre-merge videos are uploaded)
if self.upload_pre_merge_video:
if not self.clean_raw and os.path.exists(live_raw_path):
files_to_delete.append(live_raw_path)
if os.path.exists(live_proc_path):
files_to_delete.append(live_proc_path)
# VOD files (only if pre-merge videos are uploaded)
if self.download_vod and self.upload_pre_merge_video:
vod_raw = self.raw_path / f"{PREFIX_VOD}{filename_base}.ts"
vod_mp4 = self.video_path / f"{PREFIX_VOD}{filename_base}.mp4"
vod_mp3 = self.video_path / f"{PREFIX_VOD}{filename_base}.mp3"
if not self.clean_raw and vod_raw.exists():
files_to_delete.append(str(vod_raw))
if vod_mp4.exists():
files_to_delete.append(str(vod_mp4))
if vod_mp3.exists():
files_to_delete.append(str(vod_mp3))
# Merged video files (only if merged videos are uploaded)
if self.upload_merged_video:
merged_live_mp4 = self.video_path / f"{PREFIX_MERGED}{filename_base}.mp4"
merged_live_mp3 = self.video_path / f"{PREFIX_MERGED}{filename_base}.mp3"
merged_vod_mp4 = self.video_path / f"{PREFIX_MERGED}{PREFIX_VOD}{filename_base}.mp4"
merged_vod_mp3 = self.video_path / f"{PREFIX_MERGED}{PREFIX_VOD}{filename_base}.mp3"
if merged_live_mp4.exists():
files_to_delete.append(str(merged_live_mp4))
if merged_live_mp3.exists():
files_to_delete.append(str(merged_live_mp3))
if merged_vod_mp4.exists():
files_to_delete.append(str(merged_vod_mp4))
if merged_vod_mp3.exists():
files_to_delete.append(str(merged_vod_mp3))
# Chat files
if self.download_chat:
chat_json = self.chat_json_path / f"{PREFIX_CHAT}{filename_base}.json"
# Always delete JSON (it's always uploaded)
if chat_json.exists():
files_to_delete.append(str(chat_json))
# Only delete chat MP4 if chat videos are uploaded
if self.upload_chat_video:
chat_mp4 = self.chat_mp4_path / f"{PREFIX_CHAT}{filename_base}.mp4"
if chat_mp4.exists():
files_to_delete.append(str(chat_mp4))
# Metadata files (always uploaded)
if self.download_metadata:
metadata = self.metadata_path / f"{PREFIX_METADATA}{filename_base}.json"
if metadata.exists():
files_to_delete.append(str(metadata))
# Delete all files
for filepath in files_to_delete:
try:
print(f'{Fore.RED} Deleting: {os.path.basename(filepath)}{Style.RESET_ALL}')
os.remove(filepath)
except Exception as e:
print(f'{Fore.YELLOW} ⚠ Failed to delete {filepath}: {e}{Style.RESET_ALL}')
print(f'{Fore.RED}\n✓ Cleanup complete{Style.RESET_ALL}')