Add processor heartbeat management and improve health check functionality
All checks were successful
Publish FaceAI Container / publish (push) Successful in 3m7s
All checks were successful
Publish FaceAI Container / publish (push) Successful in 3m7s
- Introduced processor heartbeat configuration in environment variables and Docker setup. - Implemented heartbeat publishing in the processor worker. - Enhanced health check endpoint to include processor availability status. - Updated frontend components to handle processor unavailability messages. - Added legacy return functionality in the upload panel.
This commit is contained in:
parent
c0732c142c
commit
87d9238795
14 changed files with 292 additions and 23 deletions
|
|
@ -28,6 +28,7 @@ export const config = {
|
|||
uploadRoot: process.env.FACEAI_UPLOAD_ROOT || path.join(process.env.FACEAI_RUNTIME_ROOT || '/data/runtime', 'uploads'),
|
||||
searchTtlSeconds: Number(process.env.FACEAI_SEARCH_TTL_SECONDS || 24 * 60 * 60),
|
||||
resultTtlSeconds: Number(process.env.FACEAI_RESULT_TTL_SECONDS || 24 * 60 * 60),
|
||||
processorHeartbeatGraceMs: Number(process.env.FACEAI_PROCESSOR_HEARTBEAT_GRACE_MS || 20 * 1000),
|
||||
rateLimitWindowSeconds: Number(process.env.FACEAI_RATE_LIMIT_WINDOW_SECONDS || 10 * 60),
|
||||
rateLimitMaxRequests: Number(process.env.FACEAI_RATE_LIMIT_MAX_REQUESTS || 5)
|
||||
};
|
||||
|
|
|
|||
|
|
@ -24,6 +24,10 @@ function rateLimitKey(userId) {
|
|||
return `faceai:rate-limit:${userId}`;
|
||||
}
|
||||
|
||||
function processorHeartbeatKey() {
|
||||
return 'faceai:processor-heartbeat';
|
||||
}
|
||||
|
||||
export async function incrementRateLimit(redis, userId, windowSeconds) {
|
||||
const key = rateLimitKey(userId);
|
||||
const count = await redis.incr(key);
|
||||
|
|
@ -50,6 +54,21 @@ export async function getActiveSearchId(redis, userId) {
|
|||
return redis.get(activeSearchKey(userId));
|
||||
}
|
||||
|
||||
export async function updateProcessorHeartbeat(redis, ttlSeconds, payload = {}) {
|
||||
const heartbeat = {
|
||||
updatedAt: Date.now(),
|
||||
...payload
|
||||
};
|
||||
|
||||
await redis.set(processorHeartbeatKey(), JSON.stringify(heartbeat), 'EX', ttlSeconds);
|
||||
return heartbeat;
|
||||
}
|
||||
|
||||
export async function getProcessorHeartbeat(redis) {
|
||||
const raw = await redis.get(processorHeartbeatKey());
|
||||
return raw ? JSON.parse(raw) : null;
|
||||
}
|
||||
|
||||
export async function createSearchRecord(redis, payload, ttlSeconds) {
|
||||
const searchId = randomId('search');
|
||||
const record = {
|
||||
|
|
|
|||
|
|
@ -15,9 +15,11 @@ import {
|
|||
createRedisConnection,
|
||||
createSearchRecord,
|
||||
getActiveSearchId,
|
||||
getProcessorHeartbeat,
|
||||
getResultRecord,
|
||||
getSearchRecord,
|
||||
incrementRateLimit,
|
||||
markSearchFailed,
|
||||
saveSearchRecord
|
||||
} from './redis-store.js';
|
||||
import { getSearchQueue } from './queue.js';
|
||||
|
|
@ -28,6 +30,7 @@ const frontendDist = path.resolve(__dirname, '../../frontend/dist');
|
|||
const app = express();
|
||||
const redis = createRedisConnection(config.redisUrl);
|
||||
const searchQueue = getSearchQueue({ queueName: config.queueName, connection: redis });
|
||||
let lastHealthFailureSignature = null;
|
||||
|
||||
await fsp.mkdir(config.uploadRoot, { recursive: true });
|
||||
|
||||
|
|
@ -89,6 +92,98 @@ function logFaceAiAccess(event, req, details = {}) {
|
|||
})}`);
|
||||
}
|
||||
|
||||
async function getProcessorAvailability() {
|
||||
const heartbeat = await getProcessorHeartbeat(redis);
|
||||
const ageMs = heartbeat ? Date.now() - Number(heartbeat.updatedAt || 0) : null;
|
||||
const available = Boolean(heartbeat) && Number.isFinite(ageMs) && ageMs <= config.processorHeartbeatGraceMs;
|
||||
|
||||
return {
|
||||
available,
|
||||
ageMs,
|
||||
heartbeat,
|
||||
message: available
|
||||
? null
|
||||
: 'FaceAI processor is temporarily unavailable. Please try again shortly.'
|
||||
};
|
||||
}
|
||||
|
||||
async function failSearchIfProcessorUnavailable(search) {
|
||||
if (!search || (search.status !== 'queued' && search.status !== 'processing')) {
|
||||
return search;
|
||||
}
|
||||
|
||||
const processor = await getProcessorAvailability();
|
||||
if (processor.available) {
|
||||
return search;
|
||||
}
|
||||
|
||||
return markSearchFailed(
|
||||
redis,
|
||||
search.id,
|
||||
'PROCESSOR_UNAVAILABLE',
|
||||
processor.message,
|
||||
config.searchTtlSeconds
|
||||
);
|
||||
}
|
||||
|
||||
function logHealthFailure(details) {
|
||||
const signature = JSON.stringify(details);
|
||||
if (signature === lastHealthFailureSignature) {
|
||||
return;
|
||||
}
|
||||
|
||||
lastHealthFailureSignature = signature;
|
||||
console.error(`[FaceAI] Health check failed ${signature}`);
|
||||
}
|
||||
|
||||
function clearHealthFailure() {
|
||||
if (!lastHealthFailureSignature) {
|
||||
return;
|
||||
}
|
||||
|
||||
lastHealthFailureSignature = null;
|
||||
console.log('[FaceAI] Health check recovered');
|
||||
}
|
||||
|
||||
async function getHealthStatus() {
|
||||
const status = {
|
||||
ok: true,
|
||||
checks: {
|
||||
redis: { ok: true },
|
||||
processor: { ok: false, optional: true, ageMs: null, heartbeat: null }
|
||||
}
|
||||
};
|
||||
|
||||
try {
|
||||
await redis.ping();
|
||||
} catch (error) {
|
||||
status.ok = false;
|
||||
status.checks.redis = {
|
||||
ok: false,
|
||||
error: error.message
|
||||
};
|
||||
}
|
||||
|
||||
try {
|
||||
const processor = await getProcessorAvailability();
|
||||
status.checks.processor = {
|
||||
ok: processor.available,
|
||||
optional: true,
|
||||
ageMs: processor.ageMs,
|
||||
heartbeat: processor.heartbeat
|
||||
};
|
||||
} catch (error) {
|
||||
status.checks.processor = {
|
||||
ok: false,
|
||||
optional: true,
|
||||
error: error.message,
|
||||
heartbeat: null
|
||||
};
|
||||
}
|
||||
|
||||
return status;
|
||||
}
|
||||
|
||||
function getFaceAiSession(req) {
|
||||
const sessionId = req.cookies[config.sessionCookieName];
|
||||
return sessionId ? getSession(sessionId) : null;
|
||||
|
|
@ -268,8 +363,17 @@ function renderLegacyRacePage({ raceId, lang = 'it', result = null }) {
|
|||
</html>`;
|
||||
}
|
||||
|
||||
app.get('/health', (req, res) => {
|
||||
res.json({ ok: true });
|
||||
app.get('/health', async (req, res) => {
|
||||
const status = await getHealthStatus();
|
||||
|
||||
if (!status.ok) {
|
||||
logHealthFailure(status);
|
||||
res.status(500).json(status);
|
||||
return;
|
||||
}
|
||||
|
||||
clearHealthFailure();
|
||||
res.json(status);
|
||||
});
|
||||
|
||||
app.get('/dev/legacy/race', (req, res) => {
|
||||
|
|
@ -406,6 +510,24 @@ app.post('/api/searches', requireSession, enforceSearchRateLimit, upload.single(
|
|||
return;
|
||||
}
|
||||
|
||||
const processor = await getProcessorAvailability();
|
||||
if (!processor.available) {
|
||||
logFaceAiAccess('Identification blocked: processor unavailable', req, {
|
||||
user: summarizeUser(req.faceaiSession.user),
|
||||
race: summarizeRace(race),
|
||||
processorAgeMs: processor.ageMs,
|
||||
processorHeartbeat: processor.heartbeat
|
||||
});
|
||||
if (req.file?.path) {
|
||||
await fsp.unlink(req.file.path).catch(() => {});
|
||||
}
|
||||
res.status(503).json({
|
||||
error: processor.message,
|
||||
code: 'PROCESSOR_UNAVAILABLE'
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
const activeSearchId = await getActiveSearchId(redis, userId);
|
||||
|
||||
if (activeSearchId) {
|
||||
|
|
@ -491,7 +613,8 @@ app.post('/api/searches', requireSession, enforceSearchRateLimit, upload.single(
|
|||
});
|
||||
|
||||
app.get('/api/searches/:id', requireSession, async (req, res) => {
|
||||
const search = await getSearchRecord(redis, req.params.id);
|
||||
const rawSearch = await getSearchRecord(redis, req.params.id);
|
||||
const search = await failSearchIfProcessorUnavailable(rawSearch);
|
||||
if (!search || search.userId !== req.faceaiSession.user.id) {
|
||||
res.status(404).json({ error: 'Search not found' });
|
||||
return;
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue