All checks were successful
Publish FaceAI Container / publish (push) Successful in 13m22s
- Added configuration options for audit database path and retention days in backend and processor. - Integrated audit logging in server and worker processes to track search requests, completions, and failures. - Created utility functions for reading and parsing audit logs in end-to-end tests. - Updated Docker Compose files to include audit database configuration. - Added new tests to verify audit log entries for successful and no-results searches.
241 lines
No EOL
7.6 KiB
JavaScript
241 lines
No EOL
7.6 KiB
JavaScript
import { constants as fsConstants } from 'node:fs';
|
|
import fs from 'node:fs/promises';
|
|
import path from 'node:path';
|
|
import { Worker } from 'bullmq';
|
|
import { config } from './config.js';
|
|
import { createAuditStore } from '../../backend/src/audit-store.js';
|
|
import {
|
|
createRedisConnection,
|
|
getSearchRecord,
|
|
markSearchCompleted,
|
|
markSearchFailed,
|
|
markSearchProcessing,
|
|
releaseActiveSearchLock,
|
|
updateProcessorHeartbeat,
|
|
storeResultRecord
|
|
} from '../../backend/src/redis-store.js';
|
|
import { parseMatcherCsv, resolvePklPath, runFaceMatcher } from './worker-utils.js';
|
|
|
|
const connection = createRedisConnection(config.redisUrl);
|
|
const auditStore = createAuditStore({ dbPath: config.auditDbPath, retentionDays: config.auditRetentionDays });
|
|
|
|
async function ensureMatcherBinaryAvailable() {
|
|
try {
|
|
await fs.access(config.matcherBinary, fsConstants.X_OK);
|
|
} catch (error) {
|
|
console.error(`FaceAI processor cannot start because the matcher binary is unavailable: ${config.matcherBinary}`);
|
|
console.error('Ensure FACEAI_MATCHER_BINARY points to a real executable and that the processor image includes the Unix face_matcher binary.');
|
|
if (error?.code === 'ENOENT') {
|
|
console.error('The configured matcher path does not exist inside the processor runtime.');
|
|
} else if (error?.code === 'EACCES') {
|
|
console.error('The configured matcher path exists but is not executable by the processor runtime.');
|
|
}
|
|
|
|
throw error;
|
|
}
|
|
}
|
|
|
|
console.log(`FaceAI processor configured matcher binary: ${config.matcherBinary}`);
|
|
if (config.matcherTolerance !== null) {
|
|
console.log(`FaceAI processor configured matcher tolerance: ${config.matcherTolerance}`);
|
|
}
|
|
|
|
async function publishProcessorHeartbeat() {
|
|
try {
|
|
await updateProcessorHeartbeat(connection, config.processorHeartbeatTtlSeconds, {
|
|
pid: process.pid,
|
|
queueName: config.queueName,
|
|
matcherBinary: config.matcherBinary
|
|
});
|
|
} catch (error) {
|
|
console.error('Unable to publish FaceAI processor heartbeat:', error);
|
|
}
|
|
}
|
|
|
|
function formatLogLine(message, details) {
|
|
const timestamp = new Date().toISOString();
|
|
if (details === undefined) {
|
|
return `[${timestamp}] ${message}\n`;
|
|
}
|
|
|
|
return `[${timestamp}] ${message} ${JSON.stringify(details)}\n`;
|
|
}
|
|
|
|
async function appendSearchLog(logPath, message, details) {
|
|
await fs.mkdir(path.dirname(logPath), { recursive: true });
|
|
await fs.appendFile(logPath, formatLogLine(message, details), 'utf8');
|
|
}
|
|
|
|
async function resolveCompletionCode(logPath, matchCount) {
|
|
if (matchCount > 0) {
|
|
return null;
|
|
}
|
|
|
|
const matcherLog = await fs.readFile(logPath, 'utf8').catch(() => '');
|
|
if (/nessun\s+volt|no\s+faces?|no\s+face|0\s+faces?/i.test(matcherLog)) {
|
|
return 'NO_FACES_FOUND';
|
|
}
|
|
|
|
return 'NO_FACES_FOUND';
|
|
}
|
|
|
|
async function completeSearch(search, searchId, searchLogPath, matchCount, matches, completionCode) {
|
|
const result = await storeResultRecord(connection, {
|
|
raceId: search.raceId,
|
|
raceName: search.raceName,
|
|
userId: search.userId,
|
|
returnUrl: search.returnUrl,
|
|
lang: search.lang,
|
|
matches
|
|
}, config.resultTtlSeconds);
|
|
|
|
await appendSearchLog(searchLogPath, 'Completed FaceAI search', {
|
|
resultId: result.id,
|
|
matchCount,
|
|
completionCode
|
|
});
|
|
|
|
const completedSearch = await markSearchCompleted(connection, searchId, result.id, matchCount, config.searchTtlSeconds, {
|
|
completionCode
|
|
});
|
|
auditStore.markSearchCompleted({
|
|
searchId,
|
|
user: { id: search.userId },
|
|
race: {
|
|
id: search.raceId,
|
|
name: search.raceName,
|
|
storage: search.raceStorage
|
|
},
|
|
resultId: result.id,
|
|
matchCount,
|
|
matches,
|
|
completionCode,
|
|
completedAt: completedSearch?.completedAt || Date.now()
|
|
});
|
|
await releaseActiveSearchLock(connection, search.userId, searchId);
|
|
}
|
|
|
|
async function processJob(job) {
|
|
const searchId = String(job.data.searchId || '');
|
|
const search = await getSearchRecord(connection, searchId);
|
|
if (!search) {
|
|
throw new Error(`Search ${searchId} not found`);
|
|
}
|
|
|
|
await markSearchProcessing(connection, searchId, config.searchTtlSeconds);
|
|
|
|
const searchDir = path.join(config.runtimeRoot, 'searches', searchId);
|
|
const searchLogDir = path.join(config.logRoot, 'searches', searchId);
|
|
const searchLogPath = path.join(searchLogDir, 'worker.log');
|
|
await fs.mkdir(searchDir, { recursive: true });
|
|
await fs.mkdir(searchLogDir, { recursive: true });
|
|
|
|
await appendSearchLog(searchLogPath, 'Starting FaceAI search', {
|
|
searchId,
|
|
raceId: search.raceId,
|
|
userId: search.userId,
|
|
selfiePath: search.selfiePath,
|
|
runtimeRoot: config.runtimeRoot,
|
|
logRoot: config.logRoot,
|
|
queueName: config.queueName
|
|
});
|
|
|
|
try {
|
|
const pklPath = await resolvePklPath({
|
|
raceId: search.raceId,
|
|
raceStorage: search.raceStorage,
|
|
pklRoot: config.pklRoot
|
|
});
|
|
|
|
await appendSearchLog(searchLogPath, 'Resolved PKL path', {
|
|
pklPath,
|
|
raceStorage: search.raceStorage
|
|
});
|
|
|
|
const csvPath = path.join(searchDir, 'result.csv');
|
|
const logPath = path.join(searchLogDir, 'matcher.log');
|
|
|
|
await appendSearchLog(searchLogPath, 'Running matcher', {
|
|
matcherBinary: config.matcherBinary,
|
|
matcherTolerance: config.matcherTolerance,
|
|
csvPath,
|
|
matcherLogPath: logPath,
|
|
timeoutMs: config.workerTimeoutMs
|
|
});
|
|
|
|
try {
|
|
await runFaceMatcher({
|
|
matcherBinary: config.matcherBinary,
|
|
matcherTolerance: config.matcherTolerance,
|
|
selfiePath: search.selfiePath,
|
|
pklPath,
|
|
csvPath,
|
|
logPath,
|
|
timeoutMs: config.workerTimeoutMs
|
|
});
|
|
} catch (error) {
|
|
if (error.message === 'face_matcher exited with code 1') {
|
|
await appendSearchLog(searchLogPath, 'Matcher reported no detectable faces', {
|
|
matcherLogPath: logPath,
|
|
selfiePath: search.selfiePath
|
|
});
|
|
await completeSearch(search, searchId, searchLogPath, 0, [], 'NO_FACES_FOUND');
|
|
return;
|
|
}
|
|
|
|
throw error;
|
|
}
|
|
|
|
const matches = await parseMatcherCsv(csvPath);
|
|
const completionCode = await resolveCompletionCode(logPath, matches.length);
|
|
await completeSearch(search, searchId, searchLogPath, matches.length, matches, completionCode);
|
|
} catch (error) {
|
|
await appendSearchLog(searchLogPath, 'FaceAI search failed', {
|
|
message: error.message,
|
|
stack: error.stack || null
|
|
});
|
|
const failedSearch = await markSearchFailed(connection, searchId, 'PROCESSOR_ERROR', error.message, config.searchTtlSeconds);
|
|
auditStore.markSearchFailed({
|
|
searchId,
|
|
user: { id: search.userId },
|
|
race: {
|
|
id: search.raceId,
|
|
name: search.raceName,
|
|
storage: search.raceStorage
|
|
},
|
|
errorCode: 'PROCESSOR_ERROR',
|
|
errorMessage: error.message,
|
|
completedAt: failedSearch?.completedAt || Date.now(),
|
|
payload: {
|
|
stack: error.stack || null
|
|
}
|
|
});
|
|
await releaseActiveSearchLock(connection, search.userId, searchId);
|
|
throw error;
|
|
}
|
|
}
|
|
|
|
await ensureMatcherBinaryAvailable();
|
|
await publishProcessorHeartbeat();
|
|
|
|
const heartbeatTimer = setInterval(() => {
|
|
publishProcessorHeartbeat();
|
|
}, config.processorHeartbeatIntervalMs);
|
|
|
|
heartbeatTimer.unref();
|
|
|
|
const worker = new Worker(config.queueName, processJob, {
|
|
connection,
|
|
concurrency: config.workerConcurrency
|
|
});
|
|
|
|
worker.on('completed', (job) => {
|
|
console.log(`Completed FaceAI search ${job.data.searchId}`);
|
|
});
|
|
|
|
worker.on('failed', (job, error) => {
|
|
const searchId = job?.data?.searchId || 'unknown';
|
|
console.error(`Failed FaceAI search ${searchId}:`, error);
|
|
});
|
|
|
|
console.log(`FaceAI processor listening on queue ${config.queueName} with concurrency ${config.workerConcurrency}`); |