feat: add processor service with Redis-backed job queue

- Introduced a new `processor` service in the Docker Compose setup to handle face matching jobs.
- Configured Redis as a job queue and state management system for processing searches.
- Updated the backend to enqueue jobs and manage user locks using Redis.
- Added environment variables for Redis configuration and runtime paths.
- Created technical design documentation for the processor service outlining architecture, queue model, and search lifecycle.
- Updated package.json and package-lock.json to include dependencies for BullMQ and ioredis in the processor workspace.
- Added sample PKL files for local testing in the `test_pkl` directory.
This commit is contained in:
MaddoScientisto 2026-04-11 17:53:22 +02:00
commit 81a1ac85af
20 changed files with 1313 additions and 108 deletions

View file

@ -1,16 +1,49 @@
import express from 'express';
import cors from 'cors';
import cookieParser from 'cookie-parser';
import multer from 'multer';
import fs from 'node:fs';
import fsp from 'node:fs/promises';
import path from 'node:path';
import { fileURLToPath } from 'node:url';
import { config } from './config.js';
import { signPayload, verifySignedPayload } from './auth.js';
import { createSession, createSearch, completeSearch, getResult, getSearch, getSession, mockCatalog } from './store.js';
import { createSession, getSession, mockCatalog } from './store.js';
import {
acquireActiveSearchLock,
createRedisConnection,
createSearchRecord,
getActiveSearchId,
getResultRecord,
getSearchRecord,
incrementRateLimit,
saveSearchRecord
} from './redis-store.js';
import { getSearchQueue } from './queue.js';
import { normalizeMatches } from './matcher-results.js';
const __dirname = path.dirname(fileURLToPath(import.meta.url));
const frontendDist = path.resolve(__dirname, '../../frontend/dist');
const app = express();
const redis = createRedisConnection(config.redisUrl);
const searchQueue = getSearchQueue({ queueName: config.queueName, connection: redis });
await fsp.mkdir(config.uploadRoot, { recursive: true });
const upload = multer({
storage: multer.diskStorage({
destination: (req, file, cb) => {
const pendingRoot = path.join(config.uploadRoot, 'pending');
fsp.mkdir(pendingRoot, { recursive: true })
.then(() => cb(null, pendingRoot))
.catch((error) => cb(error));
},
filename: (req, file, cb) => {
const safeName = file.originalname.replace(/[^a-zA-Z0-9._-]/g, '_');
cb(null, `${Date.now()}_${safeName}`);
}
})
});
app.use(cookieParser());
app.use(express.json());
@ -42,6 +75,25 @@ function requireSession(req, res, next) {
next();
}
async function enforceSearchRateLimit(req, res, next) {
const userId = req.faceaiSession?.user?.id;
if (!userId) {
res.status(401).json({ error: 'Not authenticated with FaceAI' });
return;
}
const count = await incrementRateLimit(redis, userId, config.rateLimitWindowSeconds);
if (count > config.rateLimitMaxRequests) {
res.status(429).json({
error: 'Too many search attempts. Please try again later.',
code: 'RATE_LIMITED'
});
return;
}
next();
}
function issueHandoffToken({ raceId, raceSlug, lang, returnUrl }) {
const race = mockCatalog[raceId] || { id: raceId, slug: raceSlug || `race-${raceId}`, name: raceSlug || `Race ${raceId}` };
@ -185,7 +237,7 @@ app.get('/dev/legacy/launch', (req, res) => {
res.redirect(`${config.frontendUrl}/auth/callback?token=${encodeURIComponent(token)}`);
});
app.get('/dev/legacy/return', (req, res) => {
app.get('/dev/legacy/return', async (req, res) => {
try {
const token = String(req.query.token || '');
const payload = verifySignedPayload(token, config.sharedSecret);
@ -193,12 +245,17 @@ app.get('/dev/legacy/return', (req, res) => {
throw new Error('Wrong token type');
}
const result = getResult(String(req.query.resultId || payload.resultId));
const result = await getResultRecord(redis, String(req.query.resultId || payload.resultId));
if (!result || result.userId !== payload.userId) {
throw new Error('Result not found');
}
res.type('html').send(renderLegacyRacePage({ raceId: result.raceId, lang: result.lang || 'it', result }));
const normalizedResult = {
...result,
matches: normalizeMatches(result)
};
res.type('html').send(renderLegacyRacePage({ raceId: result.raceId, lang: result.lang || 'it', result: normalizedResult }));
} catch (error) {
res.status(400).type('html').send(`<h1>Return handoff failed</h1><p>${escapeHtml(error.message)}</p>`);
}
@ -247,33 +304,86 @@ app.get('/api/session', requireSession, (req, res) => {
res.json(req.faceaiSession);
});
app.post('/api/searches', requireSession, (req, res) => {
const raceId = String(req.body.raceId || req.faceaiSession.race.id);
const selfieName = String(req.body.selfieName || 'selfie.jpg');
app.post('/api/searches', requireSession, enforceSearchRateLimit, upload.single('selfie'), async (req, res) => {
try {
const raceId = String(req.body.raceId || req.faceaiSession.race.id);
const userId = String(req.faceaiSession.user.id);
const activeSearchId = await getActiveSearchId(redis, userId);
const search = createSearch({
raceId,
selfieName,
user: req.faceaiSession.user,
returnUrl: req.faceaiSession.returnUrl,
lang: req.faceaiSession.lang
});
if (activeSearchId) {
res.status(409).json({
error: 'There is already an operation being processed.',
code: 'ACTIVE_SEARCH_EXISTS',
activeSearchId
});
return;
}
setTimeout(() => {
completeSearch(search.id);
}, 3500);
if (!req.file) {
res.status(400).json({
error: 'Choose a selfie before starting the search.',
code: 'MISSING_SELFIE'
});
return;
}
res.status(201).json({
id: search.id,
status: search.status,
raceId: search.raceId,
selfieName: search.selfieName
});
const race = mockCatalog[raceId] || req.faceaiSession.race;
const search = await createSearchRecord(redis, {
raceId,
raceName: race?.name || raceId,
userId,
returnUrl: req.faceaiSession.returnUrl,
lang: req.faceaiSession.lang,
selfieName: req.file.originalname,
selfiePath: req.file.path,
uploadPath: req.file.path
}, config.searchTtlSeconds);
const lockAcquired = await acquireActiveSearchLock(redis, userId, search.id, config.searchTtlSeconds);
if (!lockAcquired) {
await fsp.unlink(req.file.path).catch(() => {});
res.status(409).json({
error: 'There is already an operation being processed.',
code: 'ACTIVE_SEARCH_EXISTS'
});
return;
}
const finalUploadDir = path.join(config.uploadRoot, search.id);
await fsp.mkdir(finalUploadDir, { recursive: true });
const finalUploadPath = path.join(finalUploadDir, path.basename(req.file.path));
await fsp.rename(req.file.path, finalUploadPath);
const updatedSearch = await saveSearchRecord(redis, {
...search,
selfiePath: finalUploadPath,
uploadPath: finalUploadPath
}, config.searchTtlSeconds);
await searchQueue.add('run-search', {
searchId: search.id
}, {
removeOnComplete: 100,
removeOnFail: 100
});
res.status(201).json({
id: updatedSearch.id,
status: updatedSearch.status,
raceId: updatedSearch.raceId,
selfieName: updatedSearch.selfieName,
matchCount: updatedSearch.matchCount,
errorCode: updatedSearch.errorCode,
errorMessage: updatedSearch.errorMessage
});
} catch (error) {
res.status(500).json({ error: error.message || 'Unable to create the search.' });
}
});
app.get('/api/searches/:id', requireSession, (req, res) => {
const search = getSearch(req.params.id);
if (!search || search.user.id !== req.faceaiSession.user.id) {
app.get('/api/searches/:id', requireSession, async (req, res) => {
const search = await getSearchRecord(redis, req.params.id);
if (!search || search.userId !== req.faceaiSession.user.id) {
res.status(404).json({ error: 'Search not found' });
return;
}
@ -285,13 +395,15 @@ app.get('/api/searches/:id', requireSession, (req, res) => {
resultId: search.resultId,
createdAt: search.createdAt,
completedAt: search.completedAt,
matchCount: search.matches.length
matchCount: search.matchCount || 0,
errorCode: search.errorCode,
errorMessage: search.errorMessage
});
});
app.get('/api/searches/:id/redirect', requireSession, (req, res) => {
const search = getSearch(req.params.id);
if (!search || search.user.id !== req.faceaiSession.user.id) {
app.get('/api/searches/:id/redirect', requireSession, async (req, res) => {
const search = await getSearchRecord(redis, req.params.id);
if (!search || search.userId !== req.faceaiSession.user.id) {
res.status(404).json({ error: 'Search not found' });
return;
}
@ -301,7 +413,12 @@ app.get('/api/searches/:id/redirect', requireSession, (req, res) => {
return;
}
const result = getResult(search.resultId);
const result = await getResultRecord(redis, search.resultId);
if (!result) {
res.status(404).json({ error: 'Result not found' });
return;
}
const token = issueReturnToken(result);
res.json({
@ -309,7 +426,7 @@ app.get('/api/searches/:id/redirect', requireSession, (req, res) => {
});
});
app.get('/bridge/results/:id', (req, res) => {
app.get('/bridge/results/:id', async (req, res) => {
try {
const token = String(req.query.token || '');
const payload = verifySignedPayload(token, config.sharedSecret);
@ -321,7 +438,7 @@ app.get('/bridge/results/:id', (req, res) => {
throw new Error('Result id mismatch');
}
const result = getResult(req.params.id);
const result = await getResultRecord(redis, req.params.id);
if (!result || result.userId !== payload.userId) {
throw new Error('Result not found');
}
@ -340,6 +457,15 @@ app.get('/bridge/results/:id', (req, res) => {
}
});
app.get('/api/health/queue', async (req, res) => {
try {
await redis.ping();
res.json({ ok: true });
} catch (error) {
res.status(500).json({ ok: false, error: error.message });
}
});
if (fs.existsSync(frontendDist)) {
app.use(express.static(frontendDist));
app.get('*', (req, res, next) => {