From e297e484ee286934bd3c9d712d4e5b7585f5d81e Mon Sep 17 00:00:00 2001 From: MaddoScientisto Date: Wed, 1 Apr 2026 08:12:53 +0200 Subject: [PATCH] First Commit --- face_encoder_cpu_u.py | 248 +++++++++++++++++++++++++++++++++++++ face_encoder_cpu_w.py | 275 ++++++++++++++++++++++++++++++++++++++++++ face_encoder_gpu.py | 207 +++++++++++++++++++++++++++++++ face_matcher.py | 174 ++++++++++++++++++++++++++ 4 files changed, 904 insertions(+) create mode 100644 face_encoder_cpu_u.py create mode 100644 face_encoder_cpu_w.py create mode 100644 face_encoder_gpu.py create mode 100644 face_matcher.py diff --git a/face_encoder_cpu_u.py b/face_encoder_cpu_u.py new file mode 100644 index 0000000..5c85e71 --- /dev/null +++ b/face_encoder_cpu_u.py @@ -0,0 +1,248 @@ +import sys +import signal + +signal.signal(signal.SIGINT, signal.SIG_IGN) + +import numpy as np +import face_recognition +import argparse +import pickle +import multiprocessing +import os +from tqdm import tqdm +from pathlib import Path +from datetime import datetime + +date_time = datetime.now().strftime("%Y%m%d_%H%M%S") +default_log_filename = f"encoder_log_{date_time}.txt" +default_out_filename = f"face_encodings_{date_time}.pkl" + +def format_time(seconds, total_images): + hours, rem = divmod(seconds, 3600) + minutes, seconds_final = divmod(rem, 60) + + time_str = "" + if hours > 0: time_str += f"{int(hours)}h " + if minutes > 0: time_str += f"{int(minutes)}m " + time_str += f"{seconds_final:.2f}s" + + avg_speed = total_images / seconds + + return time_str, avg_speed + +def resolve_path(path, default): + default_dirname = "output" + default_filename = default + + if not path: + resolved_path = Path(default_dirname) / default_filename + return resolved_path.resolve() + + resolved_path = Path(path).resolve() + + if resolved_path.is_dir() or path.endswith(os.sep) or path.endswith('/') or not resolved_path.suffix: + resolved_path = resolved_path / default_filename + return resolved_path.resolve() + + return resolved_path + +def init_worker(): + signal.signal(signal.SIGINT, signal.SIG_IGN) + +def process_image_worker(args): + path, root_path = args + try: + image = face_recognition.load_image_file(path) + encoding = face_recognition.face_encodings(image) + results = [] + for enc in encoding: + results.append((enc, str(path.relative_to(root_path)))) + + return path, results, None + + except MemoryError: + return path, None, "RAM esaurita (MemoryError). Riprova con meno core." + + except Exception as e: + if "bad allocation" in str(e).lower() or "allocate" in str(e).lower(): + return path, None, "Errore di allocazione RAM. Riduci il numero di core con -c." + + return path, None, str(e) + +def encode_images(images_dir, log, recursive=False, include_tn=False, multicore_level=3): + encodings = [] + filenames = [] + + log_path = resolve_path(log, default_log_filename) + log_path.parent.mkdir(parents=True, exist_ok=True) + + images_dir_path = Path(images_dir).resolve() + if not images_dir_path.exists() or not images_dir_path.is_dir(): + print(f"Errore: La cartella {images_dir} non esiste.") + with open(log_path, "w", encoding="utf-8") as log_f: + log_f.write(f"--- [ERRORE] La cartella {images_dir} non esiste ---\n") + sys.exit(1) + + extensions = ['*.jpg', '*.jpeg', '*.png', '*.bmp', '*.JPG', '*.JPEG', '*.PNG', '*.BMP'] + files_to_process = [] + + if recursive: + for ext in extensions: + files_to_process.extend(images_dir_path.rglob(ext)) + else: + for ext in extensions: + files_to_process.extend(images_dir_path.glob(ext)) + + files_to_process = sorted(list(set(files_to_process))) + + if not files_to_process: + print("Nessuna immagine trovata da elaborare.") + with open(log_path, "w", encoding="utf-8") as log_f: + log_f.write(f"--- [INFO] Nessuna immagine trovata da elaborare ---\n") + sys.exit(1) + print(f"Trovate {len(files_to_process)} immagini da elaborare.") + with open(log_path, "w", encoding="utf-8") as log_f: + log_f.write(f"--- [INFO] Trovate {len(files_to_process)} immagini da elaborare ---\n") + + if not include_tn: + total_images = len(files_to_process) + files_to_process = [f for f in files_to_process if not f.name.lower().startswith("tn_")] + print(f"Filtro-tn attivo. Rimosse {total_images - len(files_to_process)} immagini thumbnail. Rimaste {len(files_to_process)} immagini.") + with open(log_path, "a", encoding="utf-8") as log_f: + log_f.write(f"--- [INFO] Filtro-tn attivo. Rimosse {total_images - len(files_to_process)} immagini thumbnail. Rimaste {len(files_to_process)} immagini ---\n") + else: + print(f"Filtro-tn disattivato.") + with open(log_path, "a", encoding="utf-8") as log_f: + log_f.write(f"--- [INFO] Filtro-tn disattivato ---\n") + + print(f"Avvio codifica immagini da {images_dir_path}{' in modalità ricorsiva' if recursive else ''}") + with open(log_path, "a", encoding="utf-8") as log_f: + log_f.write(f"--- [INFO] Codifica avviata da {images_dir_path} {'in modalità ricorsiva' if recursive else ''} ---\n") + + total_cores = multiprocessing.cpu_count() + + if multicore_level == 1: + cores_to_use = max(1, total_cores // 8) + elif multicore_level == 2: + cores_to_use = max(1, total_cores // 4) + elif multicore_level == 3: + cores_to_use = max(1, total_cores // 2) + elif multicore_level == 4: + cores_to_use = max(1, int(total_cores * (3/4))) + elif multicore_level == 5: + cores_to_use = max(1, total_cores - 2) + + print(f"Avvio elaborazione parallela: multicore impostato a livello {multicore_level}, {'utilizzato' if cores_to_use == 1 else 'utilizzati'} {cores_to_use} core su {total_cores}.") + with open(log_path, "a", encoding="utf-8") as log_f: + log_f.write(f"--- [INFO] Multicore impostato a livello {multicore_level}, {'utilizzato' if cores_to_use == 1 else 'utilizzati'} {cores_to_use} core su {total_cores} ---\n") + + tasks = [(path, images_dir_path) for path in files_to_process] + + with open(log_path, "a", encoding="utf-8") as log_f: + log_f.write(f"\n============== [INIZIO ELABORAZIONE] ==============\n\n") + + pool = multiprocessing.Pool(processes=cores_to_use, initializer=init_worker) + pbar = tqdm(total=len(tasks), desc="Elaborazione", unit="img", leave=True) + iterator = pool.imap_unordered(process_image_worker, tasks) + + start_time = None + + try: + while True: + try: + path, result_list, error = iterator.next(timeout=0.5) + except multiprocessing.TimeoutError: + continue + except StopIteration: + break + except Exception as e: + break + + if start_time is None: + start_time = datetime.now().timestamp() + + if error: + err_msg = f"Errore durante l'elaborazione di {path.name}: {error}" + pbar.write(err_msg) + with open(log_path, "a", encoding="utf-8") as log_f: + log_f.write(f"--- [ERRORE] {err_msg} ---\n") + + elif result_list is not None: + nfaces = len(result_list) + msg = f"{path.relative_to(images_dir_path)} - [{nfaces:<2} {'volto' if nfaces == 1 else 'volti'}]" + pbar.write(msg) + + with open(log_path, "a", encoding="utf-8") as log_f: + log_f.write(f"{msg}\n") + + for enc, fname in result_list: + encodings.append(enc) + filenames.append(fname) + + pbar.update(1) + + except (KeyboardInterrupt, IndexError): + pbar.disable = True + pbar.close() + print("\nInterruzione manuale rilevata. Arresto dei processi in corso...") + with open(log_path, "a", encoding="utf-8") as log_f: + log_f.write("\n============== [ELABORAZIONE INTERROTTA MANUALMENTE] ==============\n") + + pool.terminate() + try: + pool.join() + except Exception: + pass + else: + pool.close() + pool.join() + pbar.close() + with open(log_path, "a", encoding="utf-8") as log_f: + log_f.write("\n============== [ELABORAZIONE COMPLETATA CON SUCCESSO] ==============\n") + + finally: + execution_time = datetime.now().timestamp() - start_time + time_str, avg_speed = format_time(execution_time, len(set(filenames))) + with open(log_path, "a", encoding="utf-8") as log_f: + log_f.write(f"\n--- [INFO] Tempo impiegato: {time_str} ---") + log_f.write(f"\n--- [INFO] Velocità media: {avg_speed:.1f} img/s ---") + + return encodings, filenames + +def save_encodings(encodings, filenames, output, log): + data = {"encodings": encodings, "filenames": filenames} + output_path = resolve_path(output, default_out_filename) + log_path = resolve_path(log, default_log_filename) + + try: + output_path.parent.mkdir(parents=True, exist_ok=True) + with open(output_path, "wb") as f: + pickle.dump(data, f) + print(f"Codifica terminata, encodings salvati in {output_path}") + + with open(log_path, "a", encoding="utf-8") as log_f: + log_f.write(f"\n--- [INFO] Codifica terminata, encodings salvati in {output_path} ---\n") + except Exception as e: + print(f"Errore durante il salvataggio: {e}") + with open(log_path, "a", encoding="utf-8") as log_f: + log_f.write(f"\n--- [ERRORE] Errore durante il salvataggio: {e} ---\n") + +def main(): + signal.signal(signal.SIGINT, signal.default_int_handler) + + parser = argparse.ArgumentParser(description="VERSIONE CPU.\nGenera gli encoding, codificando le foto 'unknown'.") + parser.add_argument("-i", "--images", required=True, help="Cartella contenente le foto da codificare") + parser.add_argument("-o", "--out", help="Percorso del file di output contentente gli encoding. Default: './output/face_encodings_[datetime].pkl'") + parser.add_argument("-l", "--log", help="Percorso del file di log. Default: './output/encoder_log_[datetime].txt'") + parser.add_argument("-r", "--recursive", action="store_true", help="Cerca immagini anche nelle sottocartelle") + parser.add_argument("-t", "--include-tn", action="store_true", help="Include nell'elabortazione anche le immagini thumbnail che iniziano con 'tn_'") + parser.add_argument("-m", "--multicore", type=int, choices=[1, 2, 3, 4, 5], default=3, help="Livello di potenza del multicore da 1 a 5. Default: 3 (ovvero 2/3 dei core)") + args = parser.parse_args() + + encodings, filenames = encode_images(args.images, args.log, args.recursive, args.include_tn, args.multicore) + + if encodings: + save_encodings(encodings, filenames, args.out, args.log) + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/face_encoder_cpu_w.py b/face_encoder_cpu_w.py new file mode 100644 index 0000000..9e567fa --- /dev/null +++ b/face_encoder_cpu_w.py @@ -0,0 +1,275 @@ +import os +import signal + +signal.signal(signal.SIGINT, signal.SIG_IGN) + +import time +import sys +import signal +import numpy as np +import psutil +import face_recognition +import multiprocessing +import argparse +import pickle +from pathlib import Path +from tqdm import tqdm +from datetime import datetime + +date_time = datetime.now().strftime("%Y%m%d_%H%M%S") +default_log_filename = f"encoder_log_{date_time}.txt" +default_out_filename = f"face_encodings_{date_time}.pkl" + +def format_time(seconds, total_images): + hours, rem = divmod(seconds, 3600) + minutes, seconds_final = divmod(rem, 60) + + time_str = "" + if hours > 0: time_str += f"{int(hours)}h " + if minutes > 0: time_str += f"{int(minutes)}m " + time_str += f"{seconds_final:.2f}s" + + avg_speed = total_images / seconds + + return time_str, avg_speed + +def resolve_path(path, default): + default_dirname = "output" + default_filename = default + + if not path: + resolved_path = Path(default_dirname) / default_filename + return resolved_path.resolve() + + resolved_path = Path(path).resolve() + + if resolved_path.is_dir() or path.endswith(os.sep) or path.endswith('/') or not resolved_path.suffix: + resolved_path = resolved_path / default_filename + return resolved_path.resolve() + + return resolved_path + +def get_safe_cores(requested_cores): + available_ram_gb = (psutil.virtual_memory().available / (1024**3)) * (7/8) + + ram_per_process = 0.8 + + max_ram_cores = int(available_ram_gb // ram_per_process) + safe_cores = max(1, min(requested_cores, max_ram_cores)) + + return safe_cores, available_ram_gb + +def init_worker(): + signal.signal(signal.SIGINT, signal.SIG_IGN) + if hasattr(signal, 'SIGBREAK'): + signal.signal(signal.SIGBREAK, signal.SIG_IGN) + + dummy_image = np.zeros((100, 100, 3), dtype=np.uint8) + _ = face_recognition.face_locations(dummy_image, model='hog') + +def process_image_worker(args): + path, root_path = args + try: + image = face_recognition.load_image_file(path) + encoding = face_recognition.face_encodings(image) + results = [] + for enc in encoding: + results.append((enc, str(path.relative_to(root_path)))) + + return path, results, None + except MemoryError: + return path, None, "RAM esaurita (MemoryError). Riprova con meno core." + except Exception as e: + if "bad allocation" in str(e).lower() or "allocate" in str(e).lower(): + return path, None, "Errore di allocazione RAM. Riduci il numero di core con -c." + return path, None, str(e) + +def encode_images(images_dir, log, recursive=False, include_tn=False, multicore_level=3): + start_time = None + + encodings = [] + filenames = [] + + log_path = resolve_path(log, default_log_filename) + log_path.parent.mkdir(parents=True, exist_ok=True) + + images_dir_path = Path(images_dir).resolve() + if not images_dir_path.exists() or not images_dir_path.is_dir(): + print(f"Errore: La cartella {images_dir} non esiste.") + with open(log_path, "w", encoding="utf-8") as log_f: + log_f.write(f"--- [ERRORE] La cartella {images_dir} non esiste ---\n") + sys.exit(1) + + extensions = ['*.jpg', '*.jpeg', '*.png', '*.bmp', '*.JPG', '*.JPEG', '*.PNG', '*.BMP'] + files_to_process = [] + + if recursive: + for ext in extensions: + files_to_process.extend(images_dir_path.rglob(ext)) + else: + for ext in extensions: + files_to_process.extend(images_dir_path.glob(ext)) + + files_to_process = sorted(list(set(files_to_process))) + + if not files_to_process: + print("Nessuna immagine trovata da elaborare.") + with open(log_path, "w", encoding="utf-8") as log_f: + log_f.write(f"--- [INFO] Nessuna immagine trovata da elaborare ---\n") + sys.exit(1) + print(f"Trovate {len(files_to_process)} immagini da elaborare.") + with open(log_path, "w", encoding="utf-8") as log_f: + log_f.write(f"--- [INFO] Trovate {len(files_to_process)} immagini da elaborare ---\n") + + if not include_tn: + total_images = len(files_to_process) + files_to_process = [f for f in files_to_process if not f.name.lower().startswith("tn_")] + print(f"Filtro-tn attivo. Rimosse {total_images - len(files_to_process)} immagini thumbnail. Rimaste {len(files_to_process)} immagini.") + with open(log_path, "a", encoding="utf-8") as log_f: + log_f.write(f"--- [INFO] Filtro-tn attivo. Rimosse {total_images - len(files_to_process)} immagini thumbnail. Rimaste {len(files_to_process)} immagini ---\n") + else: + print(f"Filtro-tn disattivato.") + with open(log_path, "a", encoding="utf-8") as log_f: + log_f.write(f"--- [INFO] Filtro-tn disattivato ---\n") + + print(f"Avvio codifica immagini da {images_dir_path}{' in modalità ricorsiva' if recursive else ''}") + with open(log_path, "a", encoding="utf-8") as log_f: + log_f.write(f"--- [INFO] Codifica avviata da {images_dir_path} {'in modalità ricorsiva' if recursive else ''} ---\n") + + total_cores = multiprocessing.cpu_count() + + if multicore_level == 1: + requested_cores = max(1, total_cores // 8) + elif multicore_level == 2: + requested_cores = max(1, total_cores // 4) + elif multicore_level == 3: + requested_cores = max(1, total_cores // 2) + elif multicore_level == 4: + requested_cores = max(1, int(total_cores * (3/4))) + elif multicore_level == 5: + requested_cores = max(1, total_cores - 2) + + cores_to_use, ram_free = get_safe_cores(requested_cores) + if cores_to_use < requested_cores: + print("\n" + "#" * 80) + print("OTTIMIZZAZIONE AUTOMATICA RISORSE") + print(f"Rilevati {ram_free:.1f} GB di RAM disponibili.") + print("Ricalibrazione del numero di core per garantire la massima sicurezza e stabilità.") + print(f"Core ridotti da {requested_cores} a {cores_to_use}.") + print("#" * 80 + "\n") + with open(log_path, "a", encoding="utf-8") as log_f: + log_f.write(f"--- [ATTENZIONE] Ricalibrazione risorse eseguita per impedire errori durante l'elaborazione parallela. Core ridotti da {requested_cores} a {cores_to_use} ---\n") + + print(f"Avvio elaborazione parallela: multicore impostato a livello {multicore_level}, {'utilizzato' if cores_to_use == 1 else 'utilizzati'} {cores_to_use} core su {total_cores}.") + with open(log_path, "a", encoding="utf-8") as log_f: + log_f.write(f"--- [INFO] Multicore impostato a livello {multicore_level}, {'utilizzato' if cores_to_use == 1 else 'utilizzati'} {cores_to_use} core su {total_cores} ---\n") + + tasks = [(path, images_dir_path) for path in files_to_process] + + with open(log_path, "a", encoding="utf-8") as log_f: + log_f.write(f"\n============== [INIZIO ELABORAZIONE] ==============\n\n") + + pool = multiprocessing.Pool(processes=cores_to_use, initializer=init_worker) + pbar = tqdm(total=len(tasks), desc="Elaborazione", unit="img", leave=True) + start_time = None + + time.sleep(1) + signal.signal(signal.SIGINT, signal.default_int_handler) + + try: + for path, result_list, error in pool.imap_unordered(process_image_worker, tasks): + if start_time is None: + start_time = datetime.now().timestamp() + + if error: + err_msg = f"Errore durante l'elaborazione di {path.name}: {error}" + pbar.write(err_msg) + with open(log_path, "a", encoding="utf-8") as log_f: + log_f.write(f"--- [ERRORE] {err_msg} ---\n") + + elif result_list is not None: + nfaces = len(result_list) + msg = f"{path.relative_to(images_dir_path)} - [{nfaces:<2} {'volto' if nfaces == 1 else 'volti'}]" + pbar.write(msg) + + with open(log_path, "a", encoding="utf-8") as log_f: + log_f.write(f"{msg}\n") + + for enc, fname in result_list: + encodings.append(enc) + filenames.append(fname) + + pbar.update(1) + + except KeyboardInterrupt: + signal.signal(signal.SIGINT, signal.SIG_IGN) + pbar.disable = True + pbar.close() + print("\nInterruzione manuale rilevata. Arresto dei processi in corso...") + with open(log_path, "a", encoding="utf-8") as log_f: + log_f.write("\n============== [ELABORAZIONE INTERROTTA MANUALMENTE] ==============\n") + + pool.terminate() + pool.join() + + except Exception as e: + print(f"[ERRORE] {e}") + + else: + pool.close() + pool.join() + pbar.close() + with open(log_path, "a", encoding="utf-8") as log_f: + log_f.write("\n============== [ELABORAZIONE COMPLETATA CON SUCCESSO] ==============\n") + + finally: + if start_time is not None: + execution_time = datetime.now().timestamp() - start_time + time_str, avg_speed = format_time(execution_time, len(set(filenames))) + with open(log_path, "a", encoding="utf-8") as log_f: + log_f.write(f"\n--- [INFO] Tempo impiegato: {time_str} ---") + log_f.write(f"\n--- [INFO] Velocità media: {avg_speed:.1f} img/s ---") + + return encodings, filenames + +def save_encodings(encodings, filenames, output, log): + data = {"encodings": encodings, "filenames": filenames} + output_path = resolve_path(output, default_out_filename) + log_path = resolve_path(log, default_log_filename) + + try: + output_path.parent.mkdir(parents=True, exist_ok=True) + with open(output_path, "wb") as f: + pickle.dump(data, f) + print(f"Codifica terminata, encodings salvati in {output_path}") + + with open(log_path, "a", encoding="utf-8") as log_f: + log_f.write(f"\n--- [INFO] Codifica terminata, encodings salvati in {output_path} ---\n") + except Exception as e: + print(f"Errore durante il salvataggio: {e}") + with open(log_path, "a", encoding="utf-8") as log_f: + log_f.write(f"\n--- [ERRORE] Errore durante il salvataggio: {e} ---\n") + +def main(): + parser = argparse.ArgumentParser(description="VERSIONE CPU.\nGenera gli encoding, codificando le foto 'unknown'.") + parser.add_argument("-i", "--images", required=True, help="Cartella contenente le foto da codificare") + parser.add_argument("-o", "--out", help="Percorso del file di output contentente gli encoding. Default: './output/face_encodings_[datetime].pkl'") + parser.add_argument("-l", "--log", help="Percorso del file di log. Default: './output/encoder_log_[datetime].txt'") + parser.add_argument("-r", "--recursive", action="store_true", help="Cerca immagini anche nelle sottocartelle") + parser.add_argument("-t", "--include-tn", action="store_true", help="Include nell'elabortazione anche le immagini thumbnail che iniziano con 'tn_'") + parser.add_argument("-m", "--multicore", type=int, choices=[1, 2, 3, 4, 5], default=3, help="Livello di potenza del multicore da 1 a 5. Default: 3 (ovvero 2/3 dei core)") + args = parser.parse_args() + + encodings, filenames = encode_images(args.images, args.log, args.recursive, args.include_tn, args.multicore) + + if encodings: + save_encodings(encodings, filenames, args.out, args.log) + +if __name__ == "__main__": + multiprocessing.freeze_support() + try: + main() + except KeyboardInterrupt: + os._exit(0) + except Exception as e: + os._exit(0) \ No newline at end of file diff --git a/face_encoder_gpu.py b/face_encoder_gpu.py new file mode 100644 index 0000000..8bf1f23 --- /dev/null +++ b/face_encoder_gpu.py @@ -0,0 +1,207 @@ +import sys +import signal + +signal.signal(signal.SIGINT, signal.SIG_IGN) + +import numpy as np +import dlib +import os +import face_recognition +import argparse +import pickle +from tqdm import tqdm +from pathlib import Path +from datetime import datetime + +date_time = datetime.now().strftime("%Y%m%d_%H%M%S") +default_log_filename = f"encoder_log_{date_time}.txt" +default_out_filename = f"face_encodings_{date_time}.pkl" + +def format_time(seconds, total_images): + hours, rem = divmod(seconds, 3600) + minutes, seconds_final = divmod(rem, 60) + + time_str = "" + if hours > 0: time_str += f"{int(hours)}h " + if minutes > 0: time_str += f"{int(minutes)}m " + time_str += f"{seconds_final:.2f}s" + + avg_speed = total_images / seconds + + return time_str, avg_speed + +def resolve_path(path, default): + default_dirname = "output" + default_filename = default + + if not path: + resolved_path = Path(default_dirname) / default_filename + return resolved_path.resolve() + + resolved_path = Path(path).resolve() + + if resolved_path.is_dir() or path.endswith(os.sep) or path.endswith('/') or not resolved_path.suffix: + resolved_path = resolved_path / default_filename + return resolved_path.resolve() + + return resolved_path + +def encode_images(images_dir, log, recursive=False, include_tn=False): + encodings = [] + filenames = [] + + log_path = resolve_path(log, default_log_filename) + log_path.parent.mkdir(parents=True, exist_ok=True) + + if not dlib.DLIB_USE_CUDA: + print("\n" + "#" * 80) + print("ERRORE CRITICO: GPU non rilevata.") + print("Il programma è configurato per funzionare esclusivamente con CUDA.") + print("#" * 80 + "\n") + with open(log_path, "w", encoding="utf-8") as log_f: + log_f.write(f"--- [ERRORE] GPU non rilevata ---\n") + sys.exit(1) + + model_type = "cnn" + print("Modalità GPU (CUDA) rilevata e attivata correttamente.") + with open(log_path, "w", encoding="utf-8") as log_f: + log_f.write(f"--- [INFO] Modalità GPU (CUDA) rilevata e attivata correttamente ---\n") + + images_dir_path = Path(images_dir).resolve() + if not images_dir_path.exists() or not images_dir_path.is_dir(): + print(f"Errore: La cartella {images_dir_path} non esiste.") + with open(log_path, "a", encoding="utf-8") as log_f: + log_f.write(f"--- [ERRORE] La cartella {images_dir_path} non esiste ---\n") + sys.exit(1) + + + extensions = ['*.jpg', '*.jpeg', '*.png', '*.bmp', '*.JPG', '*.JPEG', '*.PNG', '*.BMP'] + files_to_process = [] + + if recursive: + for ext in extensions: + files_to_process.extend(images_dir_path.rglob(ext)) + else: + for ext in extensions: + files_to_process.extend(images_dir_path.glob(ext)) + + files_to_process = sorted(list(set(files_to_process))) + + if not files_to_process: + print("Nessuna immagine trovata da elaborare.") + with open(log_path, "a", encoding="utf-8") as log_f: + log_f.write(f"--- [INFO] Nessuna immagine trovata da elaborare ---\n") + sys.exit(1) + print(f"Trovate {len(files_to_process)} immagini da elaborare.") + with open(log_path, "w", encoding="utf-8") as log_f: + log_f.write(f"--- [INFO] Trovate {len(files_to_process)} immagini da elaborare ---\n") + + if not include_tn: + total_images = len(files_to_process) + files_to_process = [f for f in files_to_process if not f.name.lower().startswith("tn_")] + print(f"Filtro-tn attivo. Rimosse {total_images - len(files_to_process)} immagini thumbnail. Rimaste {len(files_to_process)} immagini.") + with open(log_path, "a", encoding="utf-8") as log_f: + log_f.write(f"--- [INFO] Filtro-tn attivo. Rimosse {total_images - len(files_to_process)} immagini thumbnail. Rimaste {len(files_to_process)} immagini ---\n") + else: + print(f"Filtro-tn disattivato.") + with open(log_path, "a", encoding="utf-8") as log_f: + log_f.write(f"--- [INFO] Filtro-tn disattivato ---\n") + + print(f"Avvio codifica immagini da {images_dir_path}{' in modalità ricorsiva' if recursive else ''})") + with open(log_path, "a", encoding="utf-8") as log_f: + log_f.write(f"--- [INFO] Codifica avviata da {images_dir_path} {'in modalità ricorsiva' if recursive else ''} ---\n") + + with open(log_path, "a", encoding="utf-8") as log_f: + log_f.write(f"\n============== [INIZIO ELABORAZIONE] ==============\n\n") + + dummy_image = np.zeros((100, 100, 3), dtype=np.uint8) + _ = face_recognition.face_locations(dummy_image, model=model_type) + pbar = tqdm(total=len(files_to_process), desc="Elaborazione immagini", unit="img") + start_time = datetime.now().timestamp() + + signal.signal(signal.SIGINT, signal.default_int_handler) + + try: + for path in files_to_process: + try: + image = face_recognition.load_image_file(path) + + face_locations = face_recognition.face_locations(image, model=model_type) + face_encodings = face_recognition.face_encodings(image, known_face_locations=face_locations) + + for face_encoding in face_encodings: + encodings.append(face_encoding) + filenames.append(str(path.relative_to(images_dir_path))) + + nfaces = len(face_locations) + msg = f"{path.relative_to(images_dir_path)} - [{nfaces:<2} {'volto' if nfaces == 1 else 'volti'}]" + pbar.write(msg) + + with open(log_path, "a", encoding="utf-8") as log_f: + log_f.write(f"{msg}\n") + + except Exception as e: + err_msg = f"Errore durante l'elaborazione di {path.name}: {e}" + pbar.write(err_msg) + + with open(log_path, "a", encoding="utf-8") as log_f: + log_f.write(f"--- [ERRORE] {err_msg} ---\n") + + pbar.update(1) + + except KeyboardInterrupt: + pbar.disable = True + pbar.close() + + print("\nInterruzione manuale dell'elaborazione, salvataggio dei dati finora elaborati...") + + with open(log_path, "a", encoding="utf-8") as log_f: + log_f.write("\n============== [ELABORAZIONE INTERROTTA MANUALMENTE] ==============\n") + + else: + pbar.close() + with open(log_path, "a", encoding="utf-8") as log_f: + log_f.write("\n============== [ELABORAZIONE COMPLETATA CON SUCCESSO] ==============\n") + + finally: + execution_time = datetime.now().timestamp() - start_time + time_str, avg_speed = format_time(execution_time, len(set(filenames))) + with open(log_path, "a", encoding="utf-8") as log_f: + log_f.write(f"\n--- [INFO] Tempo impiegato: {time_str} ---") + log_f.write(f"\n--- [INFO] Velocità media: {avg_speed:.1f} img/s ---") + + return encodings, filenames + +def save_encodings(encodings, filenames, output, log): + data = {"encodings": encodings, "filenames": filenames} + output_path = resolve_path(output, default_out_filename) + log_path = resolve_path(log, default_log_filename) + + try: + output_path.parent.mkdir(parents=True, exist_ok=True) + with open(output_path, "wb") as f: + pickle.dump(data, f) + print(f"Codifica terminata, encodings salvati in {output_path}") + with open(log_path, "a", encoding="utf-8") as log_f: + log_f.write(f"\n--- [INFO] Codifica terminata, encodings salvati in {output_path} ---\n") + except Exception as e: + print(f"Errore di salvataggio: {e}") + with open(log_path, "a", encoding="utf-8") as log_f: + log_f.write(f"\n--- [ERRORE] Errore durante il salvataggio: {e} ---\n") + +def main(): + parser = argparse.ArgumentParser(description="VERSIONE GPU [CUDA].\nGenera gli encoding, codificando le foto 'unknown'.") + parser.add_argument("-i", "--images", required=True, help="Cartella contenente le foto da codificare") + parser.add_argument("-o", "--out", help="Percorso del file di output contentente gli encoding. Default: './output/face_encodings_[datetime].pkl'") + parser.add_argument("-l", "--log", help="Percorso del file di log. Default: './output/encoder_log_[datetime].txt'") + parser.add_argument("-r", "--recursive", action="store_true", help="Cerca immagini anche nelle sottocartelle") + parser.add_argument("-t", "--include-tn", action="store_true", help="Include nell'elabortazione anche le immagini thumbnail che iniziano con 'tn_'") + args = parser.parse_args() + + encodings, filenames = encode_images(args.images, args.log, args.recursive, args.include_tn) + + if encodings: + save_encodings(encodings, filenames, args.out, args.log) + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/face_matcher.py b/face_matcher.py new file mode 100644 index 0000000..6458cb1 --- /dev/null +++ b/face_matcher.py @@ -0,0 +1,174 @@ +import sys +import signal + +signal.signal(signal.SIGINT, signal.SIG_IGN) + +import face_recognition +import os +import argparse +import pickle +import csv +import numpy as np +from pathlib import Path +from datetime import datetime + +date_time = datetime.now().strftime("%Y%m%d_%H%M%S") +default_log_filename = f"matcher_log_{date_time}.txt" +default_out_filename = f"result_{date_time}.csv" + +def resolve_path(path, default): + default_dirname = "output" + default_filename = default + + if not path: + resolved_path = Path(default_dirname) / default_filename + return resolved_path.resolve() + + resolved_path = Path(path).resolve() + + if resolved_path.is_dir() or path.endswith(os.sep) or path.endswith('/') or not resolved_path.suffix: + resolved_path = resolved_path / default_filename + return resolved_path.resolve() + + return resolved_path + +def load_encodings(encodings, log): + encodings_path = Path(encodings).resolve() + log_path = resolve_path(log, default_log_filename) + log_path.parent.mkdir(parents=True, exist_ok=True) + + try: + with open(encodings_path, "rb") as f: + data = pickle.load(f) + print(f"Dati caricati correttamente da '{encodings_path}'") + with open(log_path, "w", encoding="utf-8") as log_f: + log_f.write(f"--- [INFO] Encodings caricati correttamente da '{encodings_path}' ---\n") + return data["encodings"], data["filenames"] + except FileNotFoundError: + print(f"Errore: Il file '{encodings_path}' non esiste.") + with open(log_path, "w", encoding="utf-8") as log_f: + log_f.write(f"--- [ERRORE] Il file '{encodings_path}' non esiste ---\n") + sys.exit(1) + +def encode_image(image, log): + image_path = Path(image).resolve() + log_path = resolve_path(log, default_log_filename) + log_path.parent.mkdir(parents=True, exist_ok=True) + + if not image_path.is_file(): + print(f"Errore: Il file {image_path} non esiste.") + with open(log_path, "a", encoding="utf-8") as log_f: + log_f.write(f"--- [ERRORE] Il file {image_path} non esiste ---\n") + return None + + print(f"Elaborazione di: {image_path}...") + with open(log_path, "a", encoding="utf-8") as log_f: + log_f.write(f"--- [INFO] Match avviato per {image_path} ---\n") + + image = face_recognition.load_image_file(image_path) + image_encoding = face_recognition.face_encodings(image) + + if image_encoding: + return image_encoding[0] + else: + print(f"Nessun volto trovato in {image_path}") + with open(log_path, "a", encoding="utf-8") as log_f: + log_f.write(f"--- [INFO] Nessun volto trovato in {image_path} ---\n") + sys.exit(1) + +def match_faces(image_encoding, encodings, filenames, log): + log_path = resolve_path(log, default_log_filename) + log_path.parent.mkdir(parents=True, exist_ok=True) + + tolerance = 0.5 + results = [] + + if not encodings: + return results + + face_distances = face_recognition.face_distance(encodings, image_encoding) + + with open(log_path, "a", encoding="utf-8") as log_f: + log_f.write(f"\n============== [INIZIO ELABORAZIONE] ==============\n") + + current_file = None + face_index = 0 + total_matches = 0 + + try: + for i, distance in enumerate(face_distances): + filename = filenames[i] + + if filename != current_file: + current_file = filename + face_index = 1 + print() + log_f.write("\n") + else: + face_index += 1 + + is_match = distance < tolerance + status = '✓' if is_match else '' + somiglianza = max(0, 100 - (distance * 100)) + + msg = f"Volto {face_index:<2} in {filename} - [Somiglianza: {somiglianza:>5.1f}%] {status}" + print(msg) + log_f.write(f"{msg}\n") + + if is_match: + total_matches += 1 + if filename not in results: + results.append(filename) + + print("\nMatch completato con successo.") + log_f.write(f"\n============== [ELABORAZIONE COMPLETATA CON SUCCESSO] ==============\n\n") + + except KeyboardInterrupt: + print("\nInterruzione manuale dell'elaborazione, salvataggio dei dati finora elaborati...") + log_f.write("\n============== [ELABORAZIONE INTERROTTA MANUALMENTE] ==============\n") + + finally: + print(f"Trovati {total_matches} match su {len(encodings)} confronti") + print(f"Volto trovato in {len(results)} foto") + log_f.write(f"--- [INFO] Trovati {total_matches} match su {len(encodings)} confronti ---\n") + log_f.write(f"--- [INFO] Volto trovato in {len(results)} foto ---\n") + + return results + +def save_output(results, output, log): + output_path = resolve_path(output, default_out_filename) + log_path = resolve_path(log, default_log_filename) + log_path.parent.mkdir(parents=True, exist_ok=True) + try: + output_path.parent.mkdir(parents=True, exist_ok=True) + with open(output_path, "w", newline="", encoding="utf-8") as csvfile: + writer = csv.writer(csvfile) + formatted_data = [[f] for f in results] + writer.writerows(formatted_data) + print(f"Risultati salvati in {output_path.resolve()}") + with open(log_path, "a", encoding="utf-8") as log_f: + log_f.write(f"--- [INFO] Risultati salvati in {output_path.resolve()} ---\n") + except Exception as e: + print(f"Errore durante il salvataggio: {e}") + with open(log_path, "a", encoding="utf-8") as log_f: + log_f.write(f"--- [ERRORE] Errore durante il salvataggio: {e} ---\n") + +def main(): + signal.signal(signal.SIGINT, signal.default_int_handler) + + parser = argparse.ArgumentParser(description="Associa ad un volto noto un set di immagini in cui compare (tramite encodings precalcolati)") + parser.add_argument("-i", "--image", required=True, help="Percorso dell'immagine del volto da riconoscere") + parser.add_argument("-e", "--encodings", required=True, help="Percorso del file contenente gli encodings delle foto 'unknown'.") + parser.add_argument("-o", "--out", help="Percorso del file/cartella di output. Default: './output/result_[datetime].csv'") + parser.add_argument("-l", "--log", help="Percorso del file di log. Default: './output/matcher_log_[datetime].txt'") + args = parser.parse_args() + + encodings, filenames = load_encodings(args.encodings, args.log) + image_encoding = encode_image(args.image, args.log) + + if image_encoding is not None and encodings and filenames: + matches = match_faces(image_encoding, encodings, filenames, args.log) + save_output(matches, args.out, args.log) + +if __name__ == "__main__": + main() \ No newline at end of file