import sys import signal signal.signal(signal.SIGINT, signal.SIG_IGN) import numpy as np import face_recognition import argparse import pickle import multiprocessing import os from tqdm import tqdm from pathlib import Path from datetime import datetime date_time = datetime.now().strftime("%Y%m%d_%H%M%S") default_log_filename = f"encoder_log_{date_time}.txt" default_out_filename = f"face_encodings_{date_time}.pkl" def format_time(seconds, total_images): hours, rem = divmod(seconds, 3600) minutes, seconds_final = divmod(rem, 60) time_str = "" if hours > 0: time_str += f"{int(hours)}h " if minutes > 0: time_str += f"{int(minutes)}m " time_str += f"{seconds_final:.2f}s" avg_speed = total_images / seconds return time_str, avg_speed def resolve_path(path, default): default_dirname = "output" default_filename = default if not path: resolved_path = Path(default_dirname) / default_filename return resolved_path.resolve() resolved_path = Path(path).resolve() if resolved_path.is_dir() or path.endswith(os.sep) or path.endswith('/') or not resolved_path.suffix: resolved_path = resolved_path / default_filename return resolved_path.resolve() return resolved_path def init_worker(): signal.signal(signal.SIGINT, signal.SIG_IGN) def process_image_worker(args): path, root_path = args try: image = face_recognition.load_image_file(path) encoding = face_recognition.face_encodings(image) results = [] for enc in encoding: results.append((enc, str(path.relative_to(root_path)))) return path, results, None except MemoryError: return path, None, "RAM esaurita (MemoryError). Riprova con meno core." except Exception as e: if "bad allocation" in str(e).lower() or "allocate" in str(e).lower(): return path, None, "Errore di allocazione RAM. Riduci il numero di core con -c." return path, None, str(e) def encode_images(images_dir, log, recursive=False, include_tn=False, multicore_level=3): encodings = [] filenames = [] log_path = resolve_path(log, default_log_filename) log_path.parent.mkdir(parents=True, exist_ok=True) images_dir_path = Path(images_dir).resolve() if not images_dir_path.exists() or not images_dir_path.is_dir(): print(f"Errore: La cartella {images_dir} non esiste.") with open(log_path, "w", encoding="utf-8") as log_f: log_f.write(f"--- [ERRORE] La cartella {images_dir} non esiste ---\n") sys.exit(1) extensions = ['*.jpg', '*.jpeg', '*.png', '*.bmp', '*.JPG', '*.JPEG', '*.PNG', '*.BMP'] files_to_process = [] if recursive: for ext in extensions: files_to_process.extend(images_dir_path.rglob(ext)) else: for ext in extensions: files_to_process.extend(images_dir_path.glob(ext)) files_to_process = sorted(list(set(files_to_process))) if not files_to_process: print("Nessuna immagine trovata da elaborare.") with open(log_path, "w", encoding="utf-8") as log_f: log_f.write(f"--- [INFO] Nessuna immagine trovata da elaborare ---\n") sys.exit(1) print(f"Trovate {len(files_to_process)} immagini da elaborare.") with open(log_path, "w", encoding="utf-8") as log_f: log_f.write(f"--- [INFO] Trovate {len(files_to_process)} immagini da elaborare ---\n") if not include_tn: total_images = len(files_to_process) files_to_process = [f for f in files_to_process if not f.name.lower().startswith("tn_")] print(f"Filtro-tn attivo. Rimosse {total_images - len(files_to_process)} immagini thumbnail. Rimaste {len(files_to_process)} immagini.") with open(log_path, "a", encoding="utf-8") as log_f: log_f.write(f"--- [INFO] Filtro-tn attivo. Rimosse {total_images - len(files_to_process)} immagini thumbnail. Rimaste {len(files_to_process)} immagini ---\n") else: print(f"Filtro-tn disattivato.") with open(log_path, "a", encoding="utf-8") as log_f: log_f.write(f"--- [INFO] Filtro-tn disattivato ---\n") print(f"Avvio codifica immagini da {images_dir_path}{' in modalità ricorsiva' if recursive else ''}") with open(log_path, "a", encoding="utf-8") as log_f: log_f.write(f"--- [INFO] Codifica avviata da {images_dir_path} {'in modalità ricorsiva' if recursive else ''} ---\n") total_cores = multiprocessing.cpu_count() if multicore_level == 1: cores_to_use = max(1, total_cores // 8) elif multicore_level == 2: cores_to_use = max(1, total_cores // 4) elif multicore_level == 3: cores_to_use = max(1, total_cores // 2) elif multicore_level == 4: cores_to_use = max(1, int(total_cores * (3/4))) elif multicore_level == 5: cores_to_use = max(1, total_cores - 2) print(f"Avvio elaborazione parallela: multicore impostato a livello {multicore_level}, {'utilizzato' if cores_to_use == 1 else 'utilizzati'} {cores_to_use} core su {total_cores}.") with open(log_path, "a", encoding="utf-8") as log_f: log_f.write(f"--- [INFO] Multicore impostato a livello {multicore_level}, {'utilizzato' if cores_to_use == 1 else 'utilizzati'} {cores_to_use} core su {total_cores} ---\n") tasks = [(path, images_dir_path) for path in files_to_process] with open(log_path, "a", encoding="utf-8") as log_f: log_f.write(f"\n============== [INIZIO ELABORAZIONE] ==============\n\n") pool = multiprocessing.Pool(processes=cores_to_use, initializer=init_worker) pbar = tqdm(total=len(tasks), desc="Elaborazione", unit="img", leave=True) iterator = pool.imap_unordered(process_image_worker, tasks) start_time = None try: while True: try: path, result_list, error = iterator.next(timeout=0.5) except multiprocessing.TimeoutError: continue except StopIteration: break except Exception as e: break if start_time is None: start_time = datetime.now().timestamp() if error: err_msg = f"Errore durante l'elaborazione di {path.name}: {error}" pbar.write(err_msg) with open(log_path, "a", encoding="utf-8") as log_f: log_f.write(f"--- [ERRORE] {err_msg} ---\n") elif result_list is not None: nfaces = len(result_list) msg = f"{path.relative_to(images_dir_path)} - [{nfaces:<2} {'volto' if nfaces == 1 else 'volti'}]" pbar.write(msg) with open(log_path, "a", encoding="utf-8") as log_f: log_f.write(f"{msg}\n") for enc, fname in result_list: encodings.append(enc) filenames.append(fname) pbar.update(1) except (KeyboardInterrupt, IndexError): pbar.disable = True pbar.close() print("\nInterruzione manuale rilevata. Arresto dei processi in corso...") with open(log_path, "a", encoding="utf-8") as log_f: log_f.write("\n============== [ELABORAZIONE INTERROTTA MANUALMENTE] ==============\n") pool.terminate() try: pool.join() except Exception: pass else: pool.close() pool.join() pbar.close() with open(log_path, "a", encoding="utf-8") as log_f: log_f.write("\n============== [ELABORAZIONE COMPLETATA CON SUCCESSO] ==============\n") finally: execution_time = datetime.now().timestamp() - start_time time_str, avg_speed = format_time(execution_time, len(set(filenames))) with open(log_path, "a", encoding="utf-8") as log_f: log_f.write(f"\n--- [INFO] Tempo impiegato: {time_str} ---") log_f.write(f"\n--- [INFO] Velocità media: {avg_speed:.1f} img/s ---") return encodings, filenames def save_encodings(encodings, filenames, output, log): data = {"encodings": encodings, "filenames": filenames} output_path = resolve_path(output, default_out_filename) log_path = resolve_path(log, default_log_filename) try: output_path.parent.mkdir(parents=True, exist_ok=True) with open(output_path, "wb") as f: pickle.dump(data, f) print(f"Codifica terminata, encodings salvati in {output_path}") with open(log_path, "a", encoding="utf-8") as log_f: log_f.write(f"\n--- [INFO] Codifica terminata, encodings salvati in {output_path} ---\n") except Exception as e: print(f"Errore durante il salvataggio: {e}") with open(log_path, "a", encoding="utf-8") as log_f: log_f.write(f"\n--- [ERRORE] Errore durante il salvataggio: {e} ---\n") def main(): signal.signal(signal.SIGINT, signal.default_int_handler) parser = argparse.ArgumentParser(description="VERSIONE CPU.\nGenera gli encoding, codificando le foto 'unknown'.") parser.add_argument("-i", "--images", required=True, help="Cartella contenente le foto da codificare") parser.add_argument("-o", "--out", help="Percorso del file di output contentente gli encoding. Default: './output/face_encodings_[datetime].pkl'") parser.add_argument("-l", "--log", help="Percorso del file di log. Default: './output/encoder_log_[datetime].txt'") parser.add_argument("-r", "--recursive", action="store_true", help="Cerca immagini anche nelle sottocartelle") parser.add_argument("-t", "--include-tn", action="store_true", help="Include nell'elabortazione anche le immagini thumbnail che iniziano con 'tn_'") parser.add_argument("-m", "--multicore", type=int, choices=[1, 2, 3, 4, 5], default=3, help="Livello di potenza del multicore da 1 a 5. Default: 3 (ovvero 2/3 dei core)") args = parser.parse_args() encodings, filenames = encode_images(args.images, args.log, args.recursive, args.include_tn, args.multicore) if encodings: save_encodings(encodings, filenames, args.out, args.log) if __name__ == "__main__": main()