Regalamiunsorriso/faceai/apps/processor/src/worker.js

205 lines
6.4 KiB
JavaScript
Raw Normal View History

import { constants as fsConstants } from 'node:fs';
import fs from 'node:fs/promises';
import path from 'node:path';
import { Worker } from 'bullmq';
import { config } from './config.js';
import {
createRedisConnection,
getSearchRecord,
markSearchCompleted,
markSearchFailed,
markSearchProcessing,
releaseActiveSearchLock,
updateProcessorHeartbeat,
storeResultRecord
} from '../../backend/src/redis-store.js';
import { parseMatcherCsv, resolvePklPath, runFaceMatcher } from './worker-utils.js';
const connection = createRedisConnection(config.redisUrl);
async function ensureMatcherBinaryAvailable() {
try {
await fs.access(config.matcherBinary, fsConstants.X_OK);
} catch (error) {
console.error(`FaceAI processor cannot start because the matcher binary is unavailable: ${config.matcherBinary}`);
2026-04-19 11:25:25 +02:00
console.error('Ensure FACEAI_MATCHER_BINARY points to a real executable and that the processor image includes the Unix face_matcher binary.');
if (error?.code === 'ENOENT') {
console.error('The configured matcher path does not exist inside the processor runtime.');
} else if (error?.code === 'EACCES') {
console.error('The configured matcher path exists but is not executable by the processor runtime.');
}
throw error;
}
}
console.log(`FaceAI processor configured matcher binary: ${config.matcherBinary}`);
async function publishProcessorHeartbeat() {
try {
await updateProcessorHeartbeat(connection, config.processorHeartbeatTtlSeconds, {
pid: process.pid,
queueName: config.queueName,
matcherBinary: config.matcherBinary
});
} catch (error) {
console.error('Unable to publish FaceAI processor heartbeat:', error);
}
}
2026-04-12 19:31:12 +02:00
function formatLogLine(message, details) {
const timestamp = new Date().toISOString();
if (details === undefined) {
return `[${timestamp}] ${message}\n`;
}
return `[${timestamp}] ${message} ${JSON.stringify(details)}\n`;
}
async function appendSearchLog(logPath, message, details) {
await fs.mkdir(path.dirname(logPath), { recursive: true });
await fs.appendFile(logPath, formatLogLine(message, details), 'utf8');
}
async function resolveCompletionCode(logPath, matchCount) {
if (matchCount > 0) {
return null;
}
const matcherLog = await fs.readFile(logPath, 'utf8').catch(() => '');
if (/nessun\s+volt|no\s+faces?|no\s+face|0\s+faces?/i.test(matcherLog)) {
return 'NO_FACES_FOUND';
}
return 'NO_FACES_FOUND';
}
async function completeSearch(search, searchId, searchLogPath, matchCount, matches, completionCode) {
const result = await storeResultRecord(connection, {
raceId: search.raceId,
raceName: search.raceName,
userId: search.userId,
returnUrl: search.returnUrl,
lang: search.lang,
matches
}, config.resultTtlSeconds);
await appendSearchLog(searchLogPath, 'Completed FaceAI search', {
resultId: result.id,
matchCount,
completionCode
});
await markSearchCompleted(connection, searchId, result.id, matchCount, config.searchTtlSeconds, {
completionCode
});
await releaseActiveSearchLock(connection, search.userId, searchId);
}
async function processJob(job) {
const searchId = String(job.data.searchId || '');
const search = await getSearchRecord(connection, searchId);
if (!search) {
throw new Error(`Search ${searchId} not found`);
}
await markSearchProcessing(connection, searchId, config.searchTtlSeconds);
const searchDir = path.join(config.runtimeRoot, 'searches', searchId);
2026-04-12 19:31:12 +02:00
const searchLogDir = path.join(config.logRoot, 'searches', searchId);
const searchLogPath = path.join(searchLogDir, 'worker.log');
await fs.mkdir(searchDir, { recursive: true });
2026-04-12 19:31:12 +02:00
await fs.mkdir(searchLogDir, { recursive: true });
await appendSearchLog(searchLogPath, 'Starting FaceAI search', {
searchId,
raceId: search.raceId,
userId: search.userId,
selfiePath: search.selfiePath,
runtimeRoot: config.runtimeRoot,
logRoot: config.logRoot,
queueName: config.queueName
});
try {
const pklPath = await resolvePklPath({
raceId: search.raceId,
raceStorage: search.raceStorage,
pklRoot: config.pklRoot
});
2026-04-12 19:31:12 +02:00
await appendSearchLog(searchLogPath, 'Resolved PKL path', {
pklPath,
raceStorage: search.raceStorage
});
const csvPath = path.join(searchDir, 'result.csv');
2026-04-12 19:31:12 +02:00
const logPath = path.join(searchLogDir, 'matcher.log');
2026-04-12 19:31:12 +02:00
await appendSearchLog(searchLogPath, 'Running matcher', {
matcherBinary: config.matcherBinary,
csvPath,
2026-04-12 19:31:12 +02:00
matcherLogPath: logPath,
timeoutMs: config.workerTimeoutMs
});
2026-04-12 19:31:12 +02:00
try {
await runFaceMatcher({
matcherBinary: config.matcherBinary,
selfiePath: search.selfiePath,
pklPath,
csvPath,
logPath,
timeoutMs: config.workerTimeoutMs
});
} catch (error) {
if (error.message === 'face_matcher exited with code 1') {
await appendSearchLog(searchLogPath, 'Matcher reported no detectable faces', {
matcherLogPath: logPath,
selfiePath: search.selfiePath
});
await completeSearch(search, searchId, searchLogPath, 0, [], 'NO_FACES_FOUND');
return;
}
throw error;
}
const matches = await parseMatcherCsv(csvPath);
2026-04-12 19:31:12 +02:00
const completionCode = await resolveCompletionCode(logPath, matches.length);
await completeSearch(search, searchId, searchLogPath, matches.length, matches, completionCode);
} catch (error) {
2026-04-12 19:31:12 +02:00
await appendSearchLog(searchLogPath, 'FaceAI search failed', {
message: error.message,
stack: error.stack || null
});
await markSearchFailed(connection, searchId, 'PROCESSOR_ERROR', error.message, config.searchTtlSeconds);
await releaseActiveSearchLock(connection, search.userId, searchId);
throw error;
}
}
await ensureMatcherBinaryAvailable();
await publishProcessorHeartbeat();
const heartbeatTimer = setInterval(() => {
publishProcessorHeartbeat();
}, config.processorHeartbeatIntervalMs);
heartbeatTimer.unref();
const worker = new Worker(config.queueName, processJob, {
connection,
concurrency: config.workerConcurrency
});
worker.on('completed', (job) => {
console.log(`Completed FaceAI search ${job.data.searchId}`);
});
worker.on('failed', (job, error) => {
const searchId = job?.data?.searchId || 'unknown';
2026-04-12 19:31:12 +02:00
console.error(`Failed FaceAI search ${searchId}:`, error);
});
console.log(`FaceAI processor listening on queue ${config.queueName} with concurrency ${config.workerConcurrency}`);