import { constants as fsConstants } from 'node:fs'; import fs from 'node:fs/promises'; import path from 'node:path'; import { Worker } from 'bullmq'; import { config } from './config.js'; import { createRedisConnection, getSearchRecord, markSearchCompleted, markSearchFailed, markSearchProcessing, releaseActiveSearchLock, storeResultRecord } from '../../backend/src/redis-store.js'; import { parseMatcherCsv, resolvePklPath, runFaceMatcher } from './worker-utils.js'; const connection = createRedisConnection(config.redisUrl); async function ensureMatcherBinaryAvailable() { try { await fs.access(config.matcherBinary, fsConstants.X_OK); } catch (error) { console.error(`FaceAI processor cannot start because the matcher binary is unavailable: ${config.matcherBinary}`); console.error('Ensure FACEAI_MATCHER_BINARY points to a real executable and that the processor image includes the Unix face_matcher binary.'); if (error?.code === 'ENOENT') { console.error('The configured matcher path does not exist inside the processor runtime.'); } else if (error?.code === 'EACCES') { console.error('The configured matcher path exists but is not executable by the processor runtime.'); } throw error; } } console.log(`FaceAI processor configured matcher binary: ${config.matcherBinary}`); function formatLogLine(message, details) { const timestamp = new Date().toISOString(); if (details === undefined) { return `[${timestamp}] ${message}\n`; } return `[${timestamp}] ${message} ${JSON.stringify(details)}\n`; } async function appendSearchLog(logPath, message, details) { await fs.mkdir(path.dirname(logPath), { recursive: true }); await fs.appendFile(logPath, formatLogLine(message, details), 'utf8'); } async function resolveCompletionCode(logPath, matchCount) { if (matchCount > 0) { return null; } const matcherLog = await fs.readFile(logPath, 'utf8').catch(() => ''); if (/nessun\s+volt|no\s+faces?|no\s+face|0\s+faces?/i.test(matcherLog)) { return 'NO_FACES_FOUND'; } return 'NO_FACES_FOUND'; } async function completeSearch(search, searchId, searchLogPath, matchCount, matches, completionCode) { const result = await storeResultRecord(connection, { raceId: search.raceId, raceName: search.raceName, userId: search.userId, returnUrl: search.returnUrl, lang: search.lang, matches }, config.resultTtlSeconds); await appendSearchLog(searchLogPath, 'Completed FaceAI search', { resultId: result.id, matchCount, completionCode }); await markSearchCompleted(connection, searchId, result.id, matchCount, config.searchTtlSeconds, { completionCode }); await releaseActiveSearchLock(connection, search.userId, searchId); } async function processJob(job) { const searchId = String(job.data.searchId || ''); const search = await getSearchRecord(connection, searchId); if (!search) { throw new Error(`Search ${searchId} not found`); } await markSearchProcessing(connection, searchId, config.searchTtlSeconds); const searchDir = path.join(config.runtimeRoot, 'searches', searchId); const searchLogDir = path.join(config.logRoot, 'searches', searchId); const searchLogPath = path.join(searchLogDir, 'worker.log'); await fs.mkdir(searchDir, { recursive: true }); await fs.mkdir(searchLogDir, { recursive: true }); await appendSearchLog(searchLogPath, 'Starting FaceAI search', { searchId, raceId: search.raceId, userId: search.userId, selfiePath: search.selfiePath, runtimeRoot: config.runtimeRoot, logRoot: config.logRoot, queueName: config.queueName }); try { const pklPath = await resolvePklPath({ raceId: search.raceId, raceStorage: search.raceStorage, pklRoot: config.pklRoot }); await appendSearchLog(searchLogPath, 'Resolved PKL path', { pklPath, raceStorage: search.raceStorage }); const csvPath = path.join(searchDir, 'result.csv'); const logPath = path.join(searchLogDir, 'matcher.log'); await appendSearchLog(searchLogPath, 'Running matcher', { matcherBinary: config.matcherBinary, csvPath, matcherLogPath: logPath, timeoutMs: config.workerTimeoutMs }); try { await runFaceMatcher({ matcherBinary: config.matcherBinary, selfiePath: search.selfiePath, pklPath, csvPath, logPath, timeoutMs: config.workerTimeoutMs }); } catch (error) { if (error.message === 'face_matcher exited with code 1') { await appendSearchLog(searchLogPath, 'Matcher reported no detectable faces', { matcherLogPath: logPath, selfiePath: search.selfiePath }); await completeSearch(search, searchId, searchLogPath, 0, [], 'NO_FACES_FOUND'); return; } throw error; } const matches = await parseMatcherCsv(csvPath); const completionCode = await resolveCompletionCode(logPath, matches.length); await completeSearch(search, searchId, searchLogPath, matches.length, matches, completionCode); } catch (error) { await appendSearchLog(searchLogPath, 'FaceAI search failed', { message: error.message, stack: error.stack || null }); await markSearchFailed(connection, searchId, 'PROCESSOR_ERROR', error.message, config.searchTtlSeconds); await releaseActiveSearchLock(connection, search.userId, searchId); throw error; } } await ensureMatcherBinaryAvailable(); const worker = new Worker(config.queueName, processJob, { connection, concurrency: config.workerConcurrency }); worker.on('completed', (job) => { console.log(`Completed FaceAI search ${job.data.searchId}`); }); worker.on('failed', (job, error) => { const searchId = job?.data?.searchId || 'unknown'; console.error(`Failed FaceAI search ${searchId}:`, error); }); console.log(`FaceAI processor listening on queue ${config.queueName} with concurrency ${config.workerConcurrency}`);