package it.acxent.face.dispatcher; import com.google.gson.JsonArray; import com.google.gson.JsonElement; import com.google.gson.JsonObject; import it.acxent.common.StatusMsg; import it.acxent.db.DBAdapter; import it.acxent.db.ResParm; import it.acxent.face.AppStats; import it.acxent.face.Evento; import it.acxent.face.FaceScore; import it.acxent.face.Foto; import it.acxent.face.PuntoFoto; import it.acxent.face.callable.IndexFotoFaceCallable; import it.acxent.face.callable.Scoring2PuntoFotoCallable; import it.acxent.face.scoring.SmartFaceScoringQueue; import it.acxent.log.Log; import it.acxent.util.Timer; import it.acxent.util.Vectumerator; import java.io.File; import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; public class EventoProcessingDispatcher { private static final Map activeDispatchers = new ConcurrentHashMap<>(); private static boolean debug = true; private final Evento evento; private final ScheduledExecutorService scheduler; private final AtomicBoolean isRunning = new AtomicBoolean(false); private static final String TAG_THREAD_MSG_CREAPF = "0 CREA PUNTI FOTO + INDEX"; private static final String TAG_THREAD_DISPATCHER = "DISPATCHER"; private volatile boolean threadSTOP = false; public EventoProcessingDispatcher(Evento evento) { this.evento = evento; long id = evento.getId_evento(); EventoProcessingDispatcher existing = activeDispatchers.putIfAbsent(Long.valueOf(id), this); if (existing != null) throw new IllegalStateException("Dispatcher già attivo per l'evento ID " + id); this.scheduler = Executors.newScheduledThreadPool(5); } public boolean isThreadSTOP() { return this.threadSTOP; } public void startProcessingLoop() { boolean localhost = true; if (!this.isRunning.compareAndSet(false, true)) { DBAdapter.printDebug(debug, "Dispatcher già in esecuzione per evento " + this.evento.getId_evento()); return; } this.threadSTOP = false; DBAdapter.printDebug(debug, "✅ Dispatcher avviato per evento ID " + this.evento.getId_evento()); StatusMsg.updateMsgByTag(this.evento.getApFull(), "DISPATCHER", "Dispatcher avviato per evento ID " + this.evento.getId_evento()); this.scheduler.scheduleAtFixedRate(this::safeRunCreaPuntiFotoIndex0, 0L, 15L, TimeUnit.SECONDS); if (!localhost) { this.scheduler.scheduleAtFixedRate(this::safeRunDetect1, 10L, 15L, TimeUnit.SECONDS); this.scheduler.scheduleAtFixedRate(() -> ScoringManager.submitTask(new ScoringTask(this.evento.getId_evento(), ScoringType.SCORING2, this::runScoring2)), 15L, 15L, TimeUnit.SECONDS); this.scheduler.scheduleAtFixedRate(() -> ScoringManager.submitTask(new ScoringTask(this.evento.getId_evento(), ScoringType.SCORING3, this::runScoring3)), 20L, 15L, TimeUnit.SECONDS); } } private void safeRunCreaPuntiFotoIndex0() { if (this.threadSTOP) return; if (!Evento.isThreadCreaPuntiFotoAttivo()) { DBAdapter.printDebug(debug, "🟡 [" + this.evento.getId_evento() + "] Avvio processo preparatorio (crea punti foto + indicizza)"); ResParm res = this.evento.startCreaPuntiFoto(this.evento, true, 1L); if (!res.getStatus()) DBAdapter.printDebug(debug, "⚠️ ThreadCreaPuntiFoto non avviato: " + res.getMsg()); } } @Deprecated private void safeRunCreatePointsIndexRedux0() { createNewPhotoPointsIndexRedux0(); } private void safeRunDetect1() { runFaceDetection1(); } private void safeRunScoring2() { runScoring2(); } private void runScoring3Test() { DBAdapter.printDebug(debug, "🔴 [" + this.evento.getId_evento() + "] 3 Avvio scoring global TEST..."); StatusMsg.updateMsgByTag(this.evento.getApFull(), "DISPATCHER", "3 TEST Avvio scoring global per evento ID " + this.evento.getId_evento()); DBAdapter.sleepInSecond(20); DBAdapter.printDebug(debug, "🔴 [" + this.evento.getId_evento() + "] 3 test scoring FINE: "); } private void safeRunScoring3() { runScoring3Test(); } @Deprecated private void createNewPhotoPointsIndexRedux0() { DBAdapter.printDebug(debug, "🟡 [" + this.evento.getId_evento() + "] 0 Scansione nuovi punti foto, indicizzazione immagini e riduzione..."); StatusMsg.updateMsgByTag(this.evento.getApFull(), "DISPATCHER", "0 Scansione nuovi punti foto, indicizzazione immagini e riduzione per evento ID " + this.evento.getId_evento()); boolean indicizza = true; Timer timer = new Timer(); timer.start(); ResParm rp = new ResParm(true); StatusMsg.updateMsgByTag(this.evento.getApFull(), "0 CREA PUNTI FOTO + INDEX", "Thread CREA PUNTI FOTO in esecuzione ...."); String dirBase = this.evento.getPathBaseFoto() + this.evento.getPathBaseFoto(); if (new File(dirBase).exists()) { File dir = new File(dirBase); try { processaDirectoryRicorsivamente(dir, dirBase, this.evento, indicizza, this.evento.getLastUpdId_user()); StatusMsg.updateMsgByTag(this.evento.getApFull(), "0 CREA PUNTI FOTO + INDEX", "Creazione punto foto evento " + this.evento.getDescrizione() + " ok!"); } catch (Exception e) { e.printStackTrace(); } } timer.stop(); DBAdapter.printDebug(debug, "🟡 [" + this.evento.getId_evento() + "] 0 Scansione nuovi punti foto, indicizzazione immagini e riduzione FINE. durata: " + timer.getDurataHourMin() + "\n" + rp.getMsg()); StatusMsg.updateMsgByTag(this.evento.getApFull(), "0 CREA PUNTI FOTO + INDEX", "Thread CREA PUNTI FOTO FINE: " + rp.getMsg()); try { DBAdapter.sleepInMilliSecond(4000); } catch (Exception e) {} StatusMsg.deleteMsgByTag(this.evento.getApFull(), "0 CREA PUNTI FOTO + INDEX"); StatusMsg.updateMsgByTag(this.evento.getApFull(), "DISPATCHER", "0 FINE Scansione nuovi punti foto, indicizzazione immagini e riduzione per evento ID " + this.evento.getId_evento()); } private void processaDirectoryRicorsivamente(File directory, String dirBase, Evento evento, boolean indicizza, long id_users) { if (directory == null || !directory.exists() || !directory.isDirectory()) return; File[] files = directory.listFiles(); if (files == null || files.length == 0) { creazioneEIndicizzazionePuntoFoto(directory, dirBase, evento, indicizza, id_users); return; } boolean haSubDir = false; for (File file : files) { if (file.isDirectory() && !file.getName().startsWith("_")) { haSubDir = true; break; } } if (!haSubDir) { creazioneEIndicizzazionePuntoFoto(directory, dirBase, evento, indicizza, id_users); } else { for (File file : files) { if (file.isDirectory()) processaDirectoryRicorsivamente(file, dirBase, evento, indicizza, id_users); } } } private void creazioneEIndicizzazionePuntoFoto(File directory, String dirBase, Evento evento, boolean indicizza, long id_users) { String theDir = directory.getAbsolutePath(); String path = theDir.substring(dirBase.length()); StatusMsg.updateMsgByTag(evento.getApFull(), "0 CREA PUNTI FOTO + INDEX", "Creazione punto foto " + path); PuntoFoto pf = new PuntoFoto(evento.getApFull()); pf.findByEventoPath(evento.getId_evento(), path); ResParm rp = new ResParm(true); if (pf.getDBState() == 0) { pf.setId_evento(evento.getId_evento()); pf.setPathRelativoFoto(path); pf.setDescrizione(path.replace("/", "")); rp = pf.save(); } if (rp.getStatus() && indicizza) { long totFotoIndicizzate = pf.getTotFoto(); long totJpgFile = pf.getTotJpgFiles(); if (totFotoIndicizzate < totJpgFile) { StatusMsg.updateMsgByTag(evento.getApFull(), "0 CREA PUNTI FOTO + INDEX", "Indicizzazione punto foto " + path + "...."); rp.append(pf.startIndexFoto(evento.getLastUpdId_user())); } } } private void runFaceDetection1() { DBAdapter.printDebug(debug, "🔵 [" + this.evento.getId_evento() + "] 1 Avvio face detection..."); StatusMsg.updateMsgByTag(this.evento.getApFull(), "DISPATCHER", "1 Avvio face detection per evento ID " + this.evento.getId_evento()); boolean debug2 = true; String TAG_THREAD_MSG = "1 DETECT FACE GARA " + this.evento.getDescrizioneEvento() + "(" + this.evento.getId_evento() + ")"; Timer timer = new Timer(); timer.start(); ResParm rp = new ResParm(true); StringBuffer err = new StringBuffer(); int i = 0; StringBuffer errMsg = new StringBuffer(); int maxNumberOfThread = this.evento.getParm("MAX_NUMBER_OF_THREAD_INDEXING_SCALING").getNumeroInt(); if (maxNumberOfThread <= 1) maxNumberOfThread = 10; if (this.evento.getId_evento() == 0L) { rp.setMsg("Indicizzazione foto: Bean evento non caricato"); rp.setStatus(false); DBAdapter.printDebug("Indicizzazione foto: Bean evento non caricato"); } else { try { boolean detectNumbers = (this.evento.getFlgDetectNumbers() == 1L); final ThreadLocal callerTaskCount = ThreadLocal.withInitial(() -> 0); int NUMB_OF_CORES = Runtime.getRuntime().availableProcessors(); int corePoolSize = 4; int maxPoolSize = NUMB_OF_CORES / 2; long keepAliveTime = 360L; int blockingQueueSize = maxPoolSize; ThreadPoolExecutor pool = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, TimeUnit.SECONDS, new LinkedBlockingQueue(blockingQueueSize), new ThreadPoolExecutor.CallerRunsPolicy() { public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { callerTaskCount.set(Integer.valueOf(callerTaskCount.get() + 1)); super.rejectedExecution(r, e); } }) { protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); if (Thread.currentThread() == Thread.currentThread()) { int count = callerTaskCount.get(); if (count > 0) callerTaskCount.set(Integer.valueOf(count - 1)); } } }; Vectumerator> vecF = new Vectumerator(); Foto foto = new Foto(this.evento.getApFull()); Foto rowFoto = new Foto(); Vectumerator vec = foto.findByEventoFaceScoringDone(this.evento.getId_evento(), 0L); long currentFotoSenzaVisi = 0L; long totFotoSenzaVisi = (long)vec.getTotNumberOfRecords(); while (currentFotoSenzaVisi != totFotoSenzaVisi) { DBAdapter.printDebug(debug2, TAG_THREAD_MSG + " INIZIO TOTFOTOSENZA VISI= " + TAG_THREAD_MSG + " current, prima del ciclo quindi dal 2 sicurmente più grande: " + totFotoSenzaVisi); currentFotoSenzaVisi = totFotoSenzaVisi; i = 0; try { while (vec.hasMoreElements() && !this.threadSTOP) { rowFoto = (Foto)vec.nextElement(); i++; if (rowFoto.getNumOfFaces() == 0L) { rowFoto.setId_faceDetectionMethod(0L); Thread.sleep(20L); IndexFotoFaceCallable ifc = new IndexFotoFaceCallable(rowFoto, detectNumbers, false); vecF.add(pool.submit(ifc)); if (i % 20 == 0) StatusMsg.updateMsgByTag(this.evento.getApFull(), TAG_THREAD_MSG, "max thread: " + maxNumberOfThread + " - SUBMIT ...index face foto - " + rowFoto.getFileName() + " - " + foto.getPuntoFoto().getDescrizioneCompleta() + " - " + i + 1 + " su " + vec.getTotNumberOfRecords() + " method: " + rowFoto.getFaceDetectionMethodReal().getDescrizione() + " eta: " + timer.getEta((long)vec.getTotNumberOfRecords(), (long)(i + 1))); continue; } if (i % 20 == 0) StatusMsg.updateMsgByTag(this.evento.getApFull(), TAG_THREAD_MSG, "max thread: " + maxNumberOfThread + " - SKIP ...index face foto - " + rowFoto.getFileName() + " - " + foto.getPuntoFoto().getDescrizioneCompleta() + " - " + i + 1 + " su " + vec.getTotNumberOfRecords() + " method: " + rowFoto.getFaceDetectionMethodReal().getDescrizione() + " eta: " + timer.getEta((long)vec.getTotNumberOfRecords(), (long)(i + 1))); } } catch (Exception e) { DBAdapter.printDebug("ECCEZIONE! " + e.getMessage()); errMsg.append("\nDetect face fallita sul file " + rowFoto.getFileName() + "
" + e.getMessage()); } rp.setMsg("Detect face completata.
Numero foto indicizzate:" + i + "
- Errori rilevati: " + errMsg.toString()); rp.setStatus(true); for (int j = 0; j < vecF.getTotNumberOfRecords(); j++) { Future currentFuture = (Future)vecF.nextElement(); rp = currentFuture.get(); if (!rp.getStatus()) { errMsg.append(rp.getErrMsg() + "\n"); } else { rp.append(rowFoto.updateCurrentFlgFaceScoringDone(1L)); } DBAdapter.printDebug(debug2, currentFuture.get().getMsg() + " ended"); if (i % 20 == 0) StatusMsg.updateMsgByTag(this.evento.getApFull(), TAG_THREAD_MSG, "max thread: " + maxNumberOfThread + " - face detected on foto " + j + " su " + vecF.getTotNumberOfRecords() + " eta: " + timer.getEta((long)vecF.getTotNumberOfRecords(), (long)(j + 1))); } vec = foto.findByEventoFaceScoringDone(this.evento.getId_evento(), 0L); totFotoSenzaVisi = (long)vec.getTotNumberOfRecords(); StatusMsg.deleteMsgByTag(this.evento.getApFull(), TAG_THREAD_MSG); DBAdapter.printDebug(debug2, TAG_THREAD_MSG + "FINE CICLO.. CONTROLLO FOTO SENZA VISI: TOTFOTOSENZA VISI= " + TAG_THREAD_MSG + " current,: " + totFotoSenzaVisi); } DBAdapter.printDebug(debug2, TAG_THREAD_MSG + "USCITO CICLO SENZA VISI.. : TOTFOTOSENZA VISI= " + TAG_THREAD_MSG + " current,: " + totFotoSenzaVisi); pool.shutdown(); } catch (Exception e) { err.append("ERRORE!! " + e.getMessage()); err.append("\n"); DBAdapter.printDebug(true, TAG_THREAD_MSG + "ERRORE!!!!! exc " + TAG_THREAD_MSG); e.printStackTrace(); } } timer.stop(); DBAdapter.printDebug(debug2, TAG_THREAD_MSG + " DURATA: " + TAG_THREAD_MSG + " " + timer.getDurataHourMin() + "\nerr: " + rp.getMsg()); StatusMsg.updateMsgByTag(this.evento.getApFull(), TAG_THREAD_MSG, "DETECT face foto concluso: DURATA: " + timer.getDurata() + "\nRisultato: " + rp.getMsg()); Log.addAltro(this.evento.getApFull(), "127.0.0.1", 1L, " runFaceDetection1: FINE!!! :\n" + timer.getDurata() + "\nRisultato: " + rp.getMsg() + "\nerr: " + errMsg.toString()); try { DBAdapter.sleepInMilliSecond(1000); } catch (Exception e) { e.printStackTrace(); } StatusMsg.deleteMsgByTag(this.evento.getApFull(), TAG_THREAD_MSG); DBAdapter.printDebug(debug, "🔵 [" + this.evento.getId_evento() + "] 1 FINE face detection"); StatusMsg.updateMsgByTag(this.evento.getApFull(), "DISPATCHER", "1 FINE face detection per evento ID " + this.evento.getId_evento()); } private void runScoring2() { DBAdapter.printDebug(debug, "🟠 [" + this.evento.getId_evento() + "] 2 Avvio scoring near..."); StatusMsg.updateMsgByTag(this.evento.getApFull(), "DISPATCHER", "2 Avvio scoring near per evento ID " + this.evento.getId_evento()); boolean debug2 = true; String TAG_THREAD_MSG = "2 SCORING Ev.: " + this.evento.getId_evento(); Timer timer = new Timer(); timer.start(); ResParm rp = new ResParm(true); long fotoCount = 0L; long faceCount = 0L; long faceTrovataCount = 0L; StringBuffer errMsg = new StringBuffer(); String targetDir = this.evento.getParm("PATHFOTO_FACE").getTesto() + this.evento.getParm("PATHFOTO_FACE").getTesto(); File targetDirFile = new File(targetDir); if (!targetDirFile.exists()) targetDirFile.mkdirs(); FaceScore fs = new FaceScore(this.evento.getApFull()); Vectumerator vecPf = this.evento.getPuntiFoto(0, 0); DBAdapter.printDebug(debug2, TAG_THREAD_MSG + " inizio ... vecpf.size: " + TAG_THREAD_MSG); int maxNumberOfThreadPf = this.evento.getParm("MAX_NUMBER_OF_THREAD_PF_SCORING_LVL_1").getNumeroInt(); if (maxNumberOfThreadPf < 1) maxNumberOfThreadPf = vecPf.getTotNumberOfRecords(); ExecutorService pool = Executors.newFixedThreadPool(maxNumberOfThreadPf); Vectumerator> vecF = new Vectumerator(); Foto foto = new Foto(this.evento.getApFull()); while (vecPf.hasMoreElements() && !this.threadSTOP) { PuntoFoto puntofoto = (PuntoFoto)vecPf.nextElement(); DBAdapter.printDebug(debug2, TAG_THREAD_MSG + " current pf: " + TAG_THREAD_MSG + " " + puntofoto.getId_puntoFoto() + " ....."); long numFotoPfDaElaborare = foto.getNumFotoFaceByPuntoFotoFaceScoringDone(puntofoto.getId_puntoFoto(), 1L); DBAdapter.printDebug(debug2, TAG_THREAD_MSG + " current pf: " + TAG_THREAD_MSG + " " + puntofoto.getId_puntoFoto() + "===> fotoface da elaborare: " + puntofoto.getDescrizione()); if (numFotoPfDaElaborare <= 0L) continue; DBAdapter.printDebug(debug2, TAG_THREAD_MSG + " PF:" + TAG_THREAD_MSG + " evento id: " + puntofoto.getDescrizione() + " fotoface da elaborare: " + this.evento.getId_evento()); fotoCount = 0L; Scoring2PuntoFotoCallable ifc = new Scoring2PuntoFotoCallable(this, puntofoto, TAG_THREAD_MSG + " pf: " + TAG_THREAD_MSG); DBAdapter.printDebug(debug2, "Submit pf " + puntofoto.getId_puntoFoto()); vecF.add(pool.submit(ifc)); puntofoto.setFlgStatoScoring(11L); puntofoto.save(); } try { for (int j = 0; j < vecF.getTotNumberOfRecords(); j++) { Future currentFuture = (Future)vecF.nextElement(); rp = currentFuture.get(); DBAdapter.printDebug(debug2, "Future " + j + "/" + vecF.getTotNumberOfRecords() + ": rp: " + rp.getMsg()); if (!rp.getStatus()) errMsg.append(rp.getErrMsg() + "\n"); DBAdapter.printDebug(currentFuture.get().getMsg() + " ended"); StatusMsg.updateMsgByTag(this.evento.getApFull(), TAG_THREAD_MSG, "max thread: " + maxNumberOfThreadPf + " reading future..... " + j + 1 + "/" + vecF.getTotNumberOfRecords() + 1); PuntoFoto pfFinito = (PuntoFoto)currentFuture.get().getReturnObj(); if (!this.threadSTOP) { pfFinito.setFlgStatoScoring(1L); } else { pfFinito.setFlgStatoScoring(0L); } pfFinito.save(); } } catch (Exception e) { e.printStackTrace(); rp.setStatus(false); rp.setMsg("Eccezione vecfuture: " + e.getMessage()); } if (rp.getStatus()) { faceTrovataCount = fs.getTotoFaceScoreByEvento(this.evento.getId_evento()); if (this.threadSTOP) { rp.setMsg("Evento Scoring STOP MANUALE!.
Numero foto:" + fotoCount + " facce: " + faceCount + " facce trovate: " + faceTrovataCount + "
- Errori rilevati: " + errMsg.toString()); } else { rp.setMsg("Evento Scoring completato.
Numero foto: " + fotoCount + " facce: " + faceCount + " facce trovate: " + faceTrovataCount + "
- Errori rilevati: " + errMsg.toString()); } rp.setStatus(true); } pool.shutdown(); timer.stop(); DBAdapter.printDebug(true, TAG_THREAD_MSG + " DURATA: " + TAG_THREAD_MSG + " " + timer.getDurata()); StatusMsg.updateMsgByTag(this.evento.getApFull(), TAG_THREAD_MSG, "Concluso fase 1: DURATA: " + timer.getDurata() + "\nRisultato: " + rp.getMsg()); try { DBAdapter.sleepInMilliSecond(4000); } catch (Exception e) {} StatusMsg.deleteMsgByTag(this.evento.getApFull(), TAG_THREAD_MSG); Log.addAltro(this.evento.getApFull(), "127.0.0.1", 1L, " fase1ScoringPuntifoto: FINE!!! :\n" + timer.getDurata() + "\nRisultato: " + rp.getMsg()); DBAdapter.printDebug(debug, "🟠 [" + this.evento.getId_evento() + "] 2 FINE scoring: " + rp.getMsg()); StatusMsg.updateMsgByTag(this.evento.getApFull(), "DISPATCHER", "2 FINE scoring near per evento ID " + this.evento.getId_evento() + " " + rp.getMsg()); } private void runScoring3() { ResParm rp = new ResParm(true); try { DBAdapter.printDebug(debug, "🔴 [" + this.evento.getId_evento() + "] 3 Avvio scoring global..."); StatusMsg.updateMsgByTag(this.evento.getApFull(), "DISPATCHER", "3 Avvio scoring global per evento ID " + this.evento.getId_evento()); boolean debug2 = false; boolean debug3 = true; DBAdapter.printDebug(debug2, "# 1 # START"); String TAG_THREAD_MSG = "3 SCORING Ev.: " + this.evento.getId_evento(); SmartFaceScoringQueue smartFaceScoringQueue = SmartFaceScoringQueue.getInstance(this.evento.getApFull()); Foto foto = new Foto(this.evento.getApFull()); long fotoQuery = foto.getNumFotoFaceByEventoFaceScoringDone(this.evento.getId_evento(), 2L); DBAdapter.printDebug(debug2, "fotoQuery: " + fotoQuery); if (fotoQuery <= 0L) { rp.setStatus(true); rp.setMsg("Non ci sono face query. Niente da fare!"); DBAdapter.printDebug(debug2, "FACCIO NIENTE"); } else { DBAdapter.printDebug(debug2, "VADO AVANTI"); StringBuffer errMsg = new StringBuffer(); int priority = 1; int STEP_STATUS_MSG = 100; Timer timer = new Timer(); Timer timerTarget = new Timer(); long currentTargetTime = 0L; double currentTargetVelox = 0.0D, actualTargetVelox = 0.0D; String targetDir = this.evento.getParm("PATHFOTO_FACE").getTesto() + this.evento.getParm("PATHFOTO_FACE").getTesto(); File targetDirFile = new File(targetDir); if (!targetDirFile.exists()) targetDirFile.mkdirs(); double confDetectLevel = this.evento.getDetectFaceConfidentThresold(); long dysType = this.evento.getParm("ZOO_YUNET_SCORING_DYS_TYPE").getNumeroLong(); int targetUniti = 0; int targetProcessati = 0; FaceScore fs = new FaceScore(this.evento.getApFull()); StatusMsg.updateMsgByTag(this.evento.getApFull(), TAG_THREAD_MSG, "1. QUERY TARGET X EVENTO......"); DBAdapter.printDebug(debug2, "1. QUERY TARGET X EVENTO......"); Vectumerator vecFsTarget = fs.findTargetByEvento(this.evento.getId_evento(), 2L); DBAdapter.printDebug(debug2, "1.1 vecFsTarget.getTotNumberOfRecords(): " + vecFsTarget.getTotNumberOfRecords() + " threadSTOP: " + this.threadSTOP); timer.start(); int i = 0; JsonArray jsonQueryArray = new JsonArray(); JsonArray jsonTargetDoneArray = new JsonArray(); JsonObject jsonData = new JsonObject(); jsonData.addProperty("dis_type", Long.valueOf(dysType)); jsonData.addProperty("conf_threshold", Double.valueOf(confDetectLevel)); FaceScore currentFs = null; JsonObject jsonTarget = null; String eta = "...."; String puntoFotoDesc = ""; long l_id_fotoTarget = 0L; while (vecFsTarget.hasMoreElements() && !this.threadSTOP && !Thread.currentThread().isInterrupted()) { currentFs = (FaceScore)vecFsTarget.nextElement(); puntoFotoDesc = currentFs.getFotoFace2().getFoto().getPuntoFoto().getDescrizione(); DBAdapter.printDebug(false, "2. creazione query globale: current punto foto:" + puntoFotoDesc + " (" + i + "/" + vecFsTarget.getTotNumberOfRecords() + ") currentFs.getId_fotoFace(): " + currentFs.getId_fotoFace() + " scoring_done_val: " + currentFs.getFotoFace().getFoto().getFlgFaceScoringDone()); if (i % STEP_STATUS_MSG == 0) StatusMsg.updateMsgByTag(this.evento.getApFull(), TAG_THREAD_MSG, "2. creazione query globale: current punto foto:" + puntoFotoDesc + " (" + i + "/" + vecFsTarget.getTotNumberOfRecords() + ")"); JsonObject jsonQueryRow = new JsonObject(); jsonQueryRow.addProperty("id", Long.valueOf(currentFs.getId_fotoFace())); jsonQueryRow.addProperty("id_foto", Long.valueOf(currentFs.getFotoFace().getId_foto())); jsonQueryRow.addProperty("path", currentFs.getFotoFace().getFacePath()); jsonQueryRow.addProperty("md5", currentFs.getFotoFace().getMd5()); jsonQueryRow.addProperty("type", "face"); i++; if (currentFs.getFotoFace().getFoto().getFlgFaceScoringDone() == 3L) { jsonTargetDoneArray.add((JsonElement)jsonQueryRow); DBAdapter.printDebug(debug2, "ciclo creo query e targetDone. aggiunto targetDone"); } if (currentFs.getFotoFace().getFoto().getFlgFaceScoringDone() == 2L) { jsonQueryArray.add((JsonElement)jsonQueryRow); DBAdapter.printDebug(debug2, "ciclo creo query e targetDone. aggiunto jsonQueryArray"); } } DBAdapter.printDebug(debug2, "fine while"); JsonArray jsonTargetArray = new JsonArray(); for (JsonElement elem : (Iterable)jsonTargetDoneArray) jsonTargetArray.add(elem); for (JsonElement elem : (Iterable)jsonQueryArray) jsonTargetArray.add(elem); DBAdapter.printDebug(debug2, "jsonTargetDoneArray: " + jsonTargetDoneArray.size()); DBAdapter.printDebug(debug2, "jsonQueryArray: " + jsonQueryArray.size()); DBAdapter.printDebug(debug2, "jsonTargetArray: " + jsonTargetArray.size()); long totConfrontiFatti = 0L; long totConfrontiTarget = 0L; long totMd5Toremove = 0L; double mediaMd5ToRemove = 0.0D; i = 0; long l_id_curretFoto = 0L; while (jsonTargetArray.size() > 0 && !this.threadSTOP) { timerTarget.start(); i++; totConfrontiTarget = (long)jsonTargetArray.size(); totConfrontiFatti += (long)jsonTargetArray.size(); StatusMsg.updateMsgByTag(this.evento.getApFull(), TAG_THREAD_MSG, "3 MAIN. actualTargetVelox: " + this.evento.getNf().format(actualTargetVelox) + " targ./sec currentTargetTime: " + currentTargetTime + " sec. currentTargetVelox: " + this.evento.getNf().format(currentTargetVelox) + " conf/sec punto foto:" + puntoFotoDesc + " (" + i + "/" + jsonTargetArray.size() + ") target uniti: " + targetUniti + " mediaMd5ToRemove: " + mediaMd5ToRemove + " ..... ETA: " + eta + "\n" + smartFaceScoringQueue.getStatus()); AppStats.setScoring7CurrentTargetVelox(currentTargetVelox); AppStats.setScoring7ETA(eta); targetProcessati++; jsonTarget = jsonTargetArray.get(0).getAsJsonObject(); jsonData.add("target", (JsonElement)jsonTarget); l_id_fotoTarget = jsonTarget.get("id_foto").getAsLong(); long l_id_fotoFacequery1 = jsonQueryArray.get(0).getAsJsonObject().get("id").getAsLong(); if (l_id_fotoFacequery1 == jsonTarget.get("id").getAsLong()) { jsonQueryArray.remove(0); DBAdapter.printDebug(debug2, "# 2 # tolto il target da jsonQueryArray perchè stesso id_fotoFace di target (corretto se non ci sono global2 done) l_id_fotoFacequery1: " + l_id_fotoFacequery1); } DBAdapter.printDebug(debug2, "# 2.1 # scoring target: " + currentFs.getId_fotoFace() + " jsonTargetArray.size(): " + jsonTargetArray.size()); JsonArray jsonQueryCopy = jsonQueryArray.deepCopy(); JsonObject jsonTargetCopy = jsonTarget.deepCopy(); DBAdapter.printDebug(debug2, "# 3 # inizio scoring - " + smartFaceScoringQueue.getStatus()); Vectumerator> vecF = smartFaceScoringQueue.faceScoring(this.evento, jsonQueryCopy, jsonTargetCopy, confDetectLevel, priority); DBAdapter.printDebug(debug2, "# 4 # future arrivato"); DBAdapter.printDebug(debug2, "# 5 # vecF.getTotNumberOfRecords(): " + vecF.getTotNumberOfRecords()); DBAdapter.printDebug(debug2, "# 6 # FUTURE start vecF.getTotNumberOfRecords():" + vecF.getTotNumberOfRecords()); Set md5ToRemove = new HashSet<>(); for (int fidx = 0; fidx < vecF.getTotNumberOfRecords(); fidx++) { Future currentFuture = (Future)vecF.nextElement(); if (currentFuture == null) { rp.setStatus(false); rp.setMsg("Errore: Future null per indice " + fidx); DBAdapter.printDebug(debug2, "# 7 # null Errore: Future null per indice " + fidx); } else { try { rp = currentFuture.get(45L, TimeUnit.SECONDS); } catch (TimeoutException e) { DBAdapter.printDebug(debug2, "# 8 # Errore Timeout su future index " + fidx + ", ignorando."); } catch (InterruptedException e) { DBAdapter.printDebug(debug2, "# 9 # Errore Interruzione rilevata, terminazione del ciclo."); Thread.currentThread().interrupt(); } catch (ExecutionException e) { DBAdapter.printDebug(debug2, "# 10 # Errore durante l'esecuzione del task " + fidx + ": " + e.getMessage()); e.printStackTrace(); } catch (Exception e) { DBAdapter.printDebug(debug2, "# 11 # Errore generico su future index " + fidx + ": " + e.getMessage()); e.printStackTrace(); } if (!rp.getStatus()) { DBAdapter.printDebug((debug || debug2), "# 12 # errore future: " + rp.getMsg()); errMsg.append(rp.getErrMsg() + "\n"); DBAdapter.printDebug(debug2, "# 13 # errore future: " + rp.getMsg()); StatusMsg.updateMsgByTag(this.evento.getApFull(), TAG_THREAD_MSG, "4 FUTURE. currentTargetTime: " + currentTargetTime + " sec. punto foto:" + puntoFotoDesc + " (" + i + "/" + jsonTargetArray.size() + ") target uniti: " + targetUniti + " mediaMd5ToRemove: " + mediaMd5ToRemove + " future: " + fidx + "/" + vecF.getTotNumberOfRecords() + " - " + eta + "\n" + smartFaceScoringQueue.getStatus()); AppStats.setScoring7CurrentTargetVelox(currentTargetVelox); AppStats.setScoring7ETA(eta); } else { HashSet hsFs = (HashSet)rp.getReturnObj(); if (hsFs != null) { if (debug || debug2) DBAdapter.printDebug(debug2, "# 14 # FS UNITI.... LI TOLGO DA HASHMAP se LSH SIZE>0 ===> CURRENT LSH SIZE: " + hsFs.size()); int hsIdx = 0; for (FaceScore fsTargetSul2 : hsFs) { StatusMsg.updateMsgByTag(this.evento.getApFull(), TAG_THREAD_MSG, "5 CLEAN MD5. actualTargetVelox: " + this.evento.getNf().format(actualTargetVelox) + " targ./sec currentTargetTime: " + currentTargetTime + " sec. currentTargetVelox: " + this.evento.getNf().format(currentTargetVelox) + " conf/sec punto foto:" + puntoFotoDesc + " (" + i + "/" + jsonTargetArray.size() + ") target uniti: " + targetUniti + "/" + targetProcessati + " mediaMd5ToRemove: " + mediaMd5ToRemove + " future: " + fidx + "/" + vecF.getTotNumberOfRecords() + " unisci target " + hsIdx + "/" + hsFs.size() + " - " + eta + "\n" + smartFaceScoringQueue.getStatus()); rp = fs.unisciTarget(jsonTarget.get("id").getAsLong(), fsTargetSul2.getId_fotoFace2()); targetUniti++; md5ToRemove.add(fsTargetSul2.getFotoFace2().getMd5()); hsIdx++; } } } } } DBAdapter.printDebug(debug2, "# 15 # . ciclo future finito faccio shutdown "); if (md5ToRemove.size() > 0) { JsonArray newJsonQuery = new JsonArray(); DBAdapter.printDebug(debug2, "# 16 # MD5 DA RIMUOVERE:" + md5ToRemove.size() + " ...... : creo newJsonQuery e lo copio su jsonQuery "); for (int j = jsonQueryArray.size() - 1; j >= 0; j--) { JsonObject item = jsonQueryArray.get(j).getAsJsonObject(); String currentMd5 = item.get("md5").getAsString(); if (!md5ToRemove.contains(currentMd5)) { newJsonQuery.add((JsonElement)item); DBAdapter.printDebug(false, "# 19 # newJsonQuery.add jsonQuery size:" + jsonQueryArray.size() + " newJsonQuery size : " + newJsonQuery.size()); } else { DBAdapter.printDebug(debug2, "# 20 # rimosso : " + currentMd5); } } jsonQueryArray = newJsonQuery.deepCopy(); } else { DBAdapter.printDebug(debug2, "# 21 # NIENTE DA RIMUOVERE estraggo target e tolgo primo elemento a query "); } if (l_id_curretFoto == 0L) { l_id_curretFoto = l_id_fotoTarget; } else if (l_id_curretFoto != l_id_fotoTarget) { foto.findByPrimaryKey(l_id_curretFoto); if (foto.getFlgFaceScoringDone() != 3L) { foto.updateCurrentFlgFaceScoringDone(3L); DBAdapter.printDebug(true, "# XXXX # updateCurrentFlgFaceScoringDone FACE_SCORING_DONE_GLOBAL_3 FOTO " + foto.getFileName() + " id: " + foto.getId_foto()); } else { DBAdapter.printDebug(true, "# YYYY # updateCurrentFlgFaceScoringDone NON AGGIORNATO PERCHE' IL SUO STATO E' : " + foto.getFaceScoringDone() + " FOTO " + foto.getFileName() + " id: " + foto.getId_foto()); } l_id_curretFoto = l_id_fotoTarget; } jsonTargetArray = new JsonArray(); if (jsonTargetDoneArray.size() > 0) for (int j = i; j < jsonTargetDoneArray.size(); j++) jsonTargetArray.add(jsonTargetDoneArray.get(j)); for (JsonElement elem : (Iterable)jsonQueryArray) jsonTargetArray.add(elem); totMd5Toremove += (long)md5ToRemove.size(); mediaMd5ToRemove = (double)(totMd5Toremove / (long)i); DBAdapter.printDebug(debug2, "# 22 # fine ciclo rimozione md5! totMd5Toremove: " + totMd5Toremove + " cicli i: " + i + " mediaMd5ToRemove: " + mediaMd5ToRemove); DBAdapter.printDebug(debug2, "# 23 # nuova query ripulita dei target trovati. jsonQueryArray.size: " + jsonQueryArray.size()); DBAdapter.printDebug(debug2, "# 23.1 # nuova TARGET ripulita dei target trovati. jsonTargetArray.size: " + jsonTargetArray.size()); DBAdapter.printDebug(debug2, "# 24 # FUTURE STOP. faccio shutdown "); smartFaceScoringQueue.poolShutdown(); DBAdapter.printDebug(debug2, "# 25 #. shutdown concluso "); DBAdapter.printDebug(debug2, "# 26 # tot confronti fatti:" + totConfrontiFatti); timerTarget.stop(); if (currentTargetTime == 0L) { currentTargetTime = timerTarget.getDurataSecLong(); currentTargetVelox = (double)totConfrontiTarget / (double)timerTarget.getDurataMilliSec() * 1000.0D; } long numeroQueryRimanenti = (long)jsonQueryArray.size(); long stimaConfrontiDaFare = Evento.stimaConfrontiMergeAggressivo(numeroQueryRimanenti, (double)numeroQueryRimanenti); eta = timer.getEta(stimaConfrontiDaFare, totConfrontiFatti); DBAdapter.printDebug(debug3, "# ETA # numeroQueryRimanenti: " + numeroQueryRimanenti + " numeroQueryRimanenti: " + numeroQueryRimanenti + " stimaConfrontiDaFare: " + stimaConfrontiDaFare + "currentTargetVelox: " + currentTargetVelox + " actualTargetVelox: " + actualTargetVelox + " currentTargetTime: " + currentTargetTime); actualTargetVelox = (double)totConfrontiFatti / (double)timer.getDurataMilliSec() * 1000.0D; DBAdapter.printDebug(debug2, "# 27 # FUTURE STOP currentTargetVelox: " + currentTargetVelox + " actualTargetVelox: " + actualTargetVelox + " currentTargetTime: " + currentTargetTime); } if (l_id_curretFoto != 0L) { foto.findByPrimaryKey(l_id_curretFoto); if (foto.getFlgFaceScoringDone() != 3L) { foto.updateCurrentFlgFaceScoringDone(3L); DBAdapter.printDebug(true, "# XXXX L # updateCurrentFlgFaceScoringDone FACE_SCORING_DONE_GLOBAL_3 FOTO " + foto.getFileName() + " id: " + foto.getId_foto()); } else { DBAdapter.printDebug(true, "# YYYY L # updateCurrentFlgFaceScoringDone NON AGGIORNATO PERCHE' IL SUO STATO E' : " + foto.getFaceScoringDone() + " FOTO " + foto.getFileName() + " id: " + foto.getId_foto()); } l_id_curretFoto = l_id_fotoTarget; } timer.stop(); DBAdapter.printDebug(debug2, "# 28 # FINE!!!: " + timer.getDurata()); StatusMsg.deleteMsgByTag(this.evento.getApFull(), TAG_THREAD_MSG); Log.addAltro(this.evento.getApFull(), "127.0.0.1", 1L, " fase7ScoringTargetSuTargetPerEvento: FINE!!! Target uniti/processati:" + targetUniti + "/" + targetProcessati + " mediaMd5ToRemove: " + mediaMd5ToRemove + " currentTargetVelox: " + this.evento.getNf().format(currentTargetVelox) + " conf/sec currentTargetTime: " + currentTargetTime + " sec.\n" + timer.getDurata() + "\nRisultato: " + rp.getMsg()); } } catch (Exception e) { DBAdapter.printDebug("# 99 # ECCEZIONE SCORING 3!!!!! " + e.getMessage()); e.printStackTrace(); rp.setStatus(false); rp.setException(e); } DBAdapter.printDebug(debug, "🔴 [" + this.evento.getId_evento() + "] 3 scoring FINE: " + rp.getMsg()); StatusMsg.updateMsgByTag(this.evento.getApFull(), "DISPATCHER", "3 FINE scoring global per evento ID " + this.evento.getId_evento() + " " + rp.getMsg()); } public static void shutdown(Evento evento) { EventoProcessingDispatcher dispatcher = activeDispatchers.remove(Long.valueOf(evento.getId_evento())); if (dispatcher != null) dispatcher.shutdown(); } private void shutdown() { if (this.isRunning.compareAndSet(true, false)) { this.threadSTOP = true; this.scheduler.shutdown(); activeDispatchers.remove(Long.valueOf(this.evento.getId_evento())); DBAdapter.printDebug(debug, "🛑 Dispatcher arrestato per evento ID " + this.evento.getId_evento()); StatusMsg.deleteMsgByTag(this.evento.getApFull(), "DISPATCHER"); } } public static boolean isRunningForEvento(long idEvento) { return activeDispatchers.containsKey(Long.valueOf(idEvento)); } public static void shutdownAll() { for (EventoProcessingDispatcher d : activeDispatchers.values()) d.shutdown(); } public static void main(String[] args) { Evento ev = new Evento(); EventoProcessingDispatcher d = new EventoProcessingDispatcher(ev); d.startProcessingLoop(); Executors.newSingleThreadScheduledExecutor().schedule(() -> ev.setFlgProcessing(0L), 30L, TimeUnit.SECONDS); } public Evento getEvento() { return this.evento; } }