feat: add processor service with Redis-backed job queue

- Introduced a new `processor` service in the Docker Compose setup to handle face matching jobs.
- Configured Redis as a job queue and state management system for processing searches.
- Updated the backend to enqueue jobs and manage user locks using Redis.
- Added environment variables for Redis configuration and runtime paths.
- Created technical design documentation for the processor service outlining architecture, queue model, and search lifecycle.
- Updated package.json and package-lock.json to include dependencies for BullMQ and ioredis in the processor workspace.
- Added sample PKL files for local testing in the `test_pkl` directory.
This commit is contained in:
MaddoScientisto 2026-04-11 17:53:22 +02:00
commit bbb9c193ce
20 changed files with 1313 additions and 108 deletions

View file

@ -14,5 +14,13 @@ export const config = {
: process.env.NODE_ENV !== 'production',
localLegacyStaticRoot: process.env.FACEAI_LOCAL_LEGACY_STATIC_ROOT || defaultLocalLegacyRoot,
sharedSecret: process.env.FACEAI_SHARED_SECRET || 'change-me',
sessionCookieName: process.env.FACEAI_SESSION_COOKIE || 'rus_faceai_session'
sessionCookieName: process.env.FACEAI_SESSION_COOKIE || 'rus_faceai_session',
redisUrl: process.env.FACEAI_REDIS_URL || 'redis://redis:6379',
queueName: process.env.FACEAI_QUEUE_NAME || 'faceai-searches',
runtimeRoot: process.env.FACEAI_RUNTIME_ROOT || '/data/runtime',
uploadRoot: process.env.FACEAI_UPLOAD_ROOT || path.join(process.env.FACEAI_RUNTIME_ROOT || '/data/runtime', 'uploads'),
searchTtlSeconds: Number(process.env.FACEAI_SEARCH_TTL_SECONDS || 24 * 60 * 60),
resultTtlSeconds: Number(process.env.FACEAI_RESULT_TTL_SECONDS || 24 * 60 * 60),
rateLimitWindowSeconds: Number(process.env.FACEAI_RATE_LIMIT_WINDOW_SECONDS || 10 * 60),
rateLimitMaxRequests: Number(process.env.FACEAI_RATE_LIMIT_MAX_REQUESTS || 5)
};

View file

@ -0,0 +1,9 @@
export function normalizeMatches(result) {
return (result.matches || []).map((match) => ({
id: match.photoId,
label: match.label || match.photoId,
checkpoint: match.checkpoint || '-',
thumb: match.thumb || match.photoId,
score: match.score ?? null
}));
}

View file

@ -0,0 +1,11 @@
import { Queue } from 'bullmq';
let queue = null;
export function getSearchQueue({ queueName, connection }) {
if (!queue) {
queue = new Queue(queueName, { connection });
}
return queue;
}

View file

@ -0,0 +1,138 @@
import Redis from 'ioredis';
import { randomId } from './auth.js';
export function createRedisConnection(redisUrl) {
return new Redis(redisUrl, {
maxRetriesPerRequest: null,
enableReadyCheck: true
});
}
function searchKey(searchId) {
return `faceai:search:${searchId}`;
}
function resultKey(resultId) {
return `faceai:result:${resultId}`;
}
function activeSearchKey(userId) {
return `faceai:active-search:user:${userId}`;
}
function rateLimitKey(userId) {
return `faceai:rate-limit:${userId}`;
}
export async function incrementRateLimit(redis, userId, windowSeconds) {
const key = rateLimitKey(userId);
const count = await redis.incr(key);
if (count === 1) {
await redis.expire(key, windowSeconds);
}
return count;
}
export async function acquireActiveSearchLock(redis, userId, searchId, ttlSeconds) {
const result = await redis.set(activeSearchKey(userId), searchId, 'EX', ttlSeconds, 'NX');
return result === 'OK';
}
export async function releaseActiveSearchLock(redis, userId, searchId) {
const key = activeSearchKey(userId);
const current = await redis.get(key);
if (current === String(searchId)) {
await redis.del(key);
}
}
export async function getActiveSearchId(redis, userId) {
return redis.get(activeSearchKey(userId));
}
export async function createSearchRecord(redis, payload, ttlSeconds) {
const searchId = randomId('search');
const record = {
id: searchId,
status: 'queued',
resultId: null,
matchCount: 0,
errorCode: null,
errorMessage: null,
createdAt: Date.now(),
startedAt: null,
completedAt: null,
...payload
};
await redis.set(searchKey(searchId), JSON.stringify(record), 'EX', ttlSeconds);
return record;
}
export async function saveSearchRecord(redis, record, ttlSeconds) {
await redis.set(searchKey(record.id), JSON.stringify(record), 'EX', ttlSeconds);
return record;
}
export async function getSearchRecord(redis, searchId) {
const raw = await redis.get(searchKey(searchId));
return raw ? JSON.parse(raw) : null;
}
async function updateSearchRecord(redis, searchId, updater, ttlSeconds) {
const current = await getSearchRecord(redis, searchId);
if (!current) {
return null;
}
const next = updater(current);
await redis.set(searchKey(searchId), JSON.stringify(next), 'EX', ttlSeconds);
return next;
}
export async function markSearchProcessing(redis, searchId, ttlSeconds = 24 * 60 * 60) {
return updateSearchRecord(redis, searchId, (current) => ({
...current,
status: 'processing',
startedAt: Date.now(),
errorCode: null,
errorMessage: null
}), ttlSeconds);
}
export async function markSearchCompleted(redis, searchId, resultId, matchCount, ttlSeconds) {
return updateSearchRecord(redis, searchId, (current) => ({
...current,
status: 'completed',
resultId,
matchCount,
completedAt: Date.now()
}), ttlSeconds);
}
export async function markSearchFailed(redis, searchId, errorCode, errorMessage, ttlSeconds) {
return updateSearchRecord(redis, searchId, (current) => ({
...current,
status: 'failed',
errorCode,
errorMessage,
completedAt: Date.now()
}), ttlSeconds);
}
export async function storeResultRecord(redis, payload, ttlSeconds) {
const resultId = randomId('result');
const record = {
id: resultId,
createdAt: Date.now(),
...payload
};
await redis.set(resultKey(resultId), JSON.stringify(record), 'EX', ttlSeconds);
return record;
}
export async function getResultRecord(redis, resultId) {
const raw = await redis.get(resultKey(resultId));
return raw ? JSON.parse(raw) : null;
}

View file

@ -1,16 +1,49 @@
import express from 'express';
import cors from 'cors';
import cookieParser from 'cookie-parser';
import multer from 'multer';
import fs from 'node:fs';
import fsp from 'node:fs/promises';
import path from 'node:path';
import { fileURLToPath } from 'node:url';
import { config } from './config.js';
import { signPayload, verifySignedPayload } from './auth.js';
import { createSession, createSearch, completeSearch, getResult, getSearch, getSession, mockCatalog } from './store.js';
import { createSession, getSession, mockCatalog } from './store.js';
import {
acquireActiveSearchLock,
createRedisConnection,
createSearchRecord,
getActiveSearchId,
getResultRecord,
getSearchRecord,
incrementRateLimit,
saveSearchRecord
} from './redis-store.js';
import { getSearchQueue } from './queue.js';
import { normalizeMatches } from './matcher-results.js';
const __dirname = path.dirname(fileURLToPath(import.meta.url));
const frontendDist = path.resolve(__dirname, '../../frontend/dist');
const app = express();
const redis = createRedisConnection(config.redisUrl);
const searchQueue = getSearchQueue({ queueName: config.queueName, connection: redis });
await fsp.mkdir(config.uploadRoot, { recursive: true });
const upload = multer({
storage: multer.diskStorage({
destination: (req, file, cb) => {
const pendingRoot = path.join(config.uploadRoot, 'pending');
fsp.mkdir(pendingRoot, { recursive: true })
.then(() => cb(null, pendingRoot))
.catch((error) => cb(error));
},
filename: (req, file, cb) => {
const safeName = file.originalname.replace(/[^a-zA-Z0-9._-]/g, '_');
cb(null, `${Date.now()}_${safeName}`);
}
})
});
app.use(cookieParser());
app.use(express.json());
@ -42,6 +75,25 @@ function requireSession(req, res, next) {
next();
}
async function enforceSearchRateLimit(req, res, next) {
const userId = req.faceaiSession?.user?.id;
if (!userId) {
res.status(401).json({ error: 'Not authenticated with FaceAI' });
return;
}
const count = await incrementRateLimit(redis, userId, config.rateLimitWindowSeconds);
if (count > config.rateLimitMaxRequests) {
res.status(429).json({
error: 'Too many search attempts. Please try again later.',
code: 'RATE_LIMITED'
});
return;
}
next();
}
function issueHandoffToken({ raceId, raceSlug, lang, returnUrl }) {
const race = mockCatalog[raceId] || { id: raceId, slug: raceSlug || `race-${raceId}`, name: raceSlug || `Race ${raceId}` };
@ -185,7 +237,7 @@ app.get('/dev/legacy/launch', (req, res) => {
res.redirect(`${config.frontendUrl}/auth/callback?token=${encodeURIComponent(token)}`);
});
app.get('/dev/legacy/return', (req, res) => {
app.get('/dev/legacy/return', async (req, res) => {
try {
const token = String(req.query.token || '');
const payload = verifySignedPayload(token, config.sharedSecret);
@ -193,12 +245,17 @@ app.get('/dev/legacy/return', (req, res) => {
throw new Error('Wrong token type');
}
const result = getResult(String(req.query.resultId || payload.resultId));
const result = await getResultRecord(redis, String(req.query.resultId || payload.resultId));
if (!result || result.userId !== payload.userId) {
throw new Error('Result not found');
}
res.type('html').send(renderLegacyRacePage({ raceId: result.raceId, lang: result.lang || 'it', result }));
const normalizedResult = {
...result,
matches: normalizeMatches(result)
};
res.type('html').send(renderLegacyRacePage({ raceId: result.raceId, lang: result.lang || 'it', result: normalizedResult }));
} catch (error) {
res.status(400).type('html').send(`<h1>Return handoff failed</h1><p>${escapeHtml(error.message)}</p>`);
}
@ -247,33 +304,86 @@ app.get('/api/session', requireSession, (req, res) => {
res.json(req.faceaiSession);
});
app.post('/api/searches', requireSession, (req, res) => {
const raceId = String(req.body.raceId || req.faceaiSession.race.id);
const selfieName = String(req.body.selfieName || 'selfie.jpg');
app.post('/api/searches', requireSession, enforceSearchRateLimit, upload.single('selfie'), async (req, res) => {
try {
const raceId = String(req.body.raceId || req.faceaiSession.race.id);
const userId = String(req.faceaiSession.user.id);
const activeSearchId = await getActiveSearchId(redis, userId);
const search = createSearch({
raceId,
selfieName,
user: req.faceaiSession.user,
returnUrl: req.faceaiSession.returnUrl,
lang: req.faceaiSession.lang
});
if (activeSearchId) {
res.status(409).json({
error: 'There is already an operation being processed.',
code: 'ACTIVE_SEARCH_EXISTS',
activeSearchId
});
return;
}
setTimeout(() => {
completeSearch(search.id);
}, 3500);
if (!req.file) {
res.status(400).json({
error: 'Choose a selfie before starting the search.',
code: 'MISSING_SELFIE'
});
return;
}
res.status(201).json({
id: search.id,
status: search.status,
raceId: search.raceId,
selfieName: search.selfieName
});
const race = mockCatalog[raceId] || req.faceaiSession.race;
const search = await createSearchRecord(redis, {
raceId,
raceName: race?.name || raceId,
userId,
returnUrl: req.faceaiSession.returnUrl,
lang: req.faceaiSession.lang,
selfieName: req.file.originalname,
selfiePath: req.file.path,
uploadPath: req.file.path
}, config.searchTtlSeconds);
const lockAcquired = await acquireActiveSearchLock(redis, userId, search.id, config.searchTtlSeconds);
if (!lockAcquired) {
await fsp.unlink(req.file.path).catch(() => {});
res.status(409).json({
error: 'There is already an operation being processed.',
code: 'ACTIVE_SEARCH_EXISTS'
});
return;
}
const finalUploadDir = path.join(config.uploadRoot, search.id);
await fsp.mkdir(finalUploadDir, { recursive: true });
const finalUploadPath = path.join(finalUploadDir, path.basename(req.file.path));
await fsp.rename(req.file.path, finalUploadPath);
const updatedSearch = await saveSearchRecord(redis, {
...search,
selfiePath: finalUploadPath,
uploadPath: finalUploadPath
}, config.searchTtlSeconds);
await searchQueue.add('run-search', {
searchId: search.id
}, {
removeOnComplete: 100,
removeOnFail: 100
});
res.status(201).json({
id: updatedSearch.id,
status: updatedSearch.status,
raceId: updatedSearch.raceId,
selfieName: updatedSearch.selfieName,
matchCount: updatedSearch.matchCount,
errorCode: updatedSearch.errorCode,
errorMessage: updatedSearch.errorMessage
});
} catch (error) {
res.status(500).json({ error: error.message || 'Unable to create the search.' });
}
});
app.get('/api/searches/:id', requireSession, (req, res) => {
const search = getSearch(req.params.id);
if (!search || search.user.id !== req.faceaiSession.user.id) {
app.get('/api/searches/:id', requireSession, async (req, res) => {
const search = await getSearchRecord(redis, req.params.id);
if (!search || search.userId !== req.faceaiSession.user.id) {
res.status(404).json({ error: 'Search not found' });
return;
}
@ -285,13 +395,15 @@ app.get('/api/searches/:id', requireSession, (req, res) => {
resultId: search.resultId,
createdAt: search.createdAt,
completedAt: search.completedAt,
matchCount: search.matches.length
matchCount: search.matchCount || 0,
errorCode: search.errorCode,
errorMessage: search.errorMessage
});
});
app.get('/api/searches/:id/redirect', requireSession, (req, res) => {
const search = getSearch(req.params.id);
if (!search || search.user.id !== req.faceaiSession.user.id) {
app.get('/api/searches/:id/redirect', requireSession, async (req, res) => {
const search = await getSearchRecord(redis, req.params.id);
if (!search || search.userId !== req.faceaiSession.user.id) {
res.status(404).json({ error: 'Search not found' });
return;
}
@ -301,7 +413,12 @@ app.get('/api/searches/:id/redirect', requireSession, (req, res) => {
return;
}
const result = getResult(search.resultId);
const result = await getResultRecord(redis, search.resultId);
if (!result) {
res.status(404).json({ error: 'Result not found' });
return;
}
const token = issueReturnToken(result);
res.json({
@ -309,7 +426,7 @@ app.get('/api/searches/:id/redirect', requireSession, (req, res) => {
});
});
app.get('/bridge/results/:id', (req, res) => {
app.get('/bridge/results/:id', async (req, res) => {
try {
const token = String(req.query.token || '');
const payload = verifySignedPayload(token, config.sharedSecret);
@ -321,7 +438,7 @@ app.get('/bridge/results/:id', (req, res) => {
throw new Error('Result id mismatch');
}
const result = getResult(req.params.id);
const result = await getResultRecord(redis, req.params.id);
if (!result || result.userId !== payload.userId) {
throw new Error('Result not found');
}
@ -340,6 +457,15 @@ app.get('/bridge/results/:id', (req, res) => {
}
});
app.get('/api/health/queue', async (req, res) => {
try {
await redis.ping();
res.json({ ok: true });
} catch (error) {
res.status(500).json({ ok: false, error: error.message });
}
});
if (fs.existsSync(frontendDist)) {
app.use(express.static(frontendDist));
app.get('*', (req, res, next) => {

View file

@ -34,8 +34,6 @@ export const mockCatalog = {
};
const sessions = new Map();
const searches = new Map();
const results = new Map();
export function createSession(session) {
const sessionId = randomId('sess');
@ -49,62 +47,3 @@ export function createSession(session) {
export function getSession(sessionId) {
return sessions.get(sessionId) || null;
}
export function createSearch({ raceId, user, selfieName, returnUrl, lang }) {
const searchId = randomId('search');
searches.set(searchId, {
id: searchId,
raceId,
user,
selfieName,
returnUrl,
lang,
status: 'processing',
createdAt: Date.now(),
completedAt: null,
resultId: null,
matches: []
});
return searches.get(searchId);
}
export function getSearch(searchId) {
return searches.get(searchId) || null;
}
export function completeSearch(searchId) {
const search = searches.get(searchId);
if (!search) {
return null;
}
const race = mockCatalog[search.raceId];
const matches = (race?.photos || []).slice(0, Math.min(4, race?.photos?.length || 0));
const resultId = randomId('result');
results.set(resultId, {
id: resultId,
raceId: search.raceId,
raceName: race?.name || search.raceId,
userId: search.user.id,
returnUrl: search.returnUrl,
lang: search.lang,
matches,
createdAt: Date.now()
});
const completed = {
...search,
status: 'completed',
completedAt: Date.now(),
resultId,
matches
};
searches.set(searchId, completed);
return completed;
}
export function getResult(resultId) {
return results.get(resultId) || null;
}