Regalamiunsorriso/faceai/apps/processor/src/worker.js

82 lines
2.5 KiB
JavaScript
Raw Normal View History

import fs from 'node:fs/promises';
import path from 'node:path';
import { Worker } from 'bullmq';
import { config } from './config.js';
import {
createRedisConnection,
getSearchRecord,
markSearchCompleted,
markSearchFailed,
markSearchProcessing,
releaseActiveSearchLock,
storeResultRecord
} from '../../backend/src/redis-store.js';
import { parseMatcherCsv, resolvePklPath, runFaceMatcher } from './worker-utils.js';
const connection = createRedisConnection(config.redisUrl);
async function processJob(job) {
const searchId = String(job.data.searchId || '');
const search = await getSearchRecord(connection, searchId);
if (!search) {
throw new Error(`Search ${searchId} not found`);
}
await markSearchProcessing(connection, searchId, config.searchTtlSeconds);
const searchDir = path.join(config.runtimeRoot, 'searches', searchId);
await fs.mkdir(searchDir, { recursive: true });
try {
const pklPath = await resolvePklPath({
raceId: search.raceId,
pklRoot: config.pklRoot,
fallbackPklRoot: config.fallbackPklRoot
});
const csvPath = path.join(searchDir, 'result.csv');
const logPath = path.join(searchDir, 'matcher.log');
await runFaceMatcher({
matcherBinary: config.matcherBinary,
selfiePath: search.selfiePath,
pklPath,
csvPath,
logPath,
timeoutMs: config.workerTimeoutMs
});
const matches = await parseMatcherCsv(csvPath);
const result = await storeResultRecord(connection, {
raceId: search.raceId,
raceName: search.raceName,
userId: search.userId,
returnUrl: search.returnUrl,
lang: search.lang,
matches
}, config.resultTtlSeconds);
await markSearchCompleted(connection, searchId, result.id, matches.length, config.searchTtlSeconds);
await releaseActiveSearchLock(connection, search.userId, searchId);
} catch (error) {
await markSearchFailed(connection, searchId, 'PROCESSOR_ERROR', error.message, config.searchTtlSeconds);
await releaseActiveSearchLock(connection, search.userId, searchId);
throw error;
}
}
const worker = new Worker(config.queueName, processJob, {
connection,
concurrency: config.workerConcurrency
});
worker.on('completed', (job) => {
console.log(`Completed FaceAI search ${job.data.searchId}`);
});
worker.on('failed', (job, error) => {
const searchId = job?.data?.searchId || 'unknown';
console.error(`Failed FaceAI search ${searchId}: ${error.message}`);
});
console.log(`FaceAI processor listening on queue ${config.queueName} with concurrency ${config.workerConcurrency}`);