From 1fe0b4dbf9cfb71797628a62d9f4b6f53c3cc0e8 Mon Sep 17 00:00:00 2001 From: Marcelo Date: Mon, 29 Dec 2025 18:43:39 +0000 Subject: [PATCH] V1.1 --- app/api/ingest/event/route.ts | 315 ++++++++++++++---- app/api/ingest/heartbeat/route.ts | 186 +++++++++-- app/api/ingest/kpi/route.ts | 253 +++++++++++--- data_validation_edge_contract.md | 74 ++++ lib/contracts/v1.ts | 210 ++++++++++++ package-lock.json | 4 +- package.json | 3 +- .../20251222235834_ingest_log/migration.sql | 55 +++ prisma/schema.prisma | 40 +++ 9 files changed, 1003 insertions(+), 137 deletions(-) create mode 100644 data_validation_edge_contract.md create mode 100644 lib/contracts/v1.ts create mode 100644 prisma/migrations/20251222235834_ingest_log/migration.sql diff --git a/app/api/ingest/event/route.ts b/app/api/ingest/event/route.ts index 8d6f0f6..0f8f06e 100644 --- a/app/api/ingest/event/route.ts +++ b/app/api/ingest/event/route.ts @@ -1,5 +1,22 @@ import { NextResponse } from "next/server"; import { prisma } from "@/lib/prisma"; +import { normalizeEventV1 } from "@/lib/contracts/v1"; + +function getClientIp(req: Request) { + const xf = req.headers.get("x-forwarded-for"); + if (xf) return xf.split(",")[0]?.trim() || null; + return req.headers.get("x-real-ip") || null; +} + +function parseSeqToBigInt(seq: unknown): bigint | null { + if (seq === null || seq === undefined) return null; + if (typeof seq === "number") { + if (!Number.isInteger(seq) || seq < 0) return null; + return BigInt(seq); + } + if (typeof seq === "string" && /^\d+$/.test(seq)) return BigInt(seq); + return null; +} const normalizeType = (t: any) => String(t ?? "") @@ -38,57 +55,124 @@ const MICROSTOP_SEC = 60; const MACROSTOP_SEC = 300; export async function POST(req: Request) { - const apiKey = req.headers.get("x-api-key"); - if (!apiKey) { - return NextResponse.json({ ok: false, error: "Missing api key" }, { status: 401 }); - } + const endpoint = "/api/ingest/event"; + const ip = getClientIp(req); + const userAgent = req.headers.get("user-agent"); - const body = await req.json().catch(() => null); - if (!body?.machineId || !body?.event) { - return NextResponse.json({ ok: false, error: "Invalid payload" }, { status: 400 }); - } - + let rawBody: any = null; + let orgId: string | null = null; + let machineId: string | null = null; + let schemaVersion: string | null = null; + let seq: bigint | null = null; + let tsDeviceDate: Date | null = null; - const machine = await prisma.machine.findFirst({ - where: { id: String(body.machineId), apiKey }, - select: { id: true, orgId: true }, - }); - if (!machine) { - return NextResponse.json({ ok: false, error: "Unauthorized" }, { status: 401 }); - } - - // Normalize to array (Node-RED sends array of anomalies) - const rawEvent = body.event; - const events = Array.isArray(rawEvent) ? rawEvent : [rawEvent]; - - const created: { id: string; ts: Date; eventType: string }[] = []; - const skipped: any[] = []; - - for (const ev of events) { - if (!ev || typeof ev !== "object") { - skipped.push({ reason: "invalid_event_object" }); - continue; + try { + // 1) Auth header exists + const apiKey = req.headers.get("x-api-key"); + if (!apiKey) { + await prisma.ingestLog.create({ + data: { + endpoint, + ok: false, + status: 401, + errorCode: "MISSING_API_KEY", + errorMsg: "Missing api key", + ip, + userAgent, + }, + }); + return NextResponse.json({ ok: false, error: "Missing api key" }, { status: 401 }); } - const rawType = (ev as any).eventType ?? (ev as any).anomaly_type ?? (ev as any).topic ?? body.topic ?? ""; + // 2) Parse JSON + rawBody = await req.json().catch(() => null); + + // 3) Reject arrays at the contract boundary (Phase 0 rule) + // Edge MUST split arrays into one event per POST. + if (rawBody?.event && Array.isArray(rawBody.event)) { + await prisma.ingestLog.create({ + data: { + endpoint, + ok: false, + status: 400, + errorCode: "EVENT_ARRAY_NOT_ALLOWED", + errorMsg: "Edge must split arrays; send one event per request.", + body: rawBody, + machineId: rawBody?.machineId ? String(rawBody.machineId) : null, + ip, + userAgent, + }, + }); + return NextResponse.json( + { ok: false, error: "Invalid payload", detail: "event array not allowed; split on edge" }, + { status: 400 } + ); + } + + // 4) Normalize to v1 (legacy tolerated) + const normalized = normalizeEventV1(rawBody); + if (!normalized.ok) { + await prisma.ingestLog.create({ + data: { + endpoint, + ok: false, + status: 400, + errorCode: "INVALID_PAYLOAD", + errorMsg: normalized.error, + body: rawBody, + ip, + userAgent, + }, + }); + return NextResponse.json({ ok: false, error: "Invalid payload", detail: normalized.error }, { status: 400 }); + } + + const body = normalized.value; + + schemaVersion = body.schemaVersion; + machineId = body.machineId; + seq = parseSeqToBigInt(body.seq); + tsDeviceDate = new Date(body.tsDevice); + + // 5) Authorize machineId + apiKey + const machine = await prisma.machine.findFirst({ + where: { id: machineId, apiKey }, + select: { id: true, orgId: true }, + }); + + if (!machine) { + await prisma.ingestLog.create({ + data: { + endpoint, + ok: false, + status: 401, + errorCode: "UNAUTHORIZED", + errorMsg: "Unauthorized (machineId/apiKey mismatch)", + body: rawBody, + machineId, + schemaVersion, + seq, + tsDevice: tsDeviceDate, + ip, + userAgent, + }, + }); + return NextResponse.json({ ok: false, error: "Unauthorized" }, { status: 401 }); + } + + orgId = machine.orgId; + + // 6) Canonicalize + classify type (keep for now; later move to edge in A1) + const ev = body.event; + + const rawType = + (ev as any).eventType ?? (ev as any).anomaly_type ?? (ev as any).topic ?? (body as any).topic ?? ""; const typ0 = normalizeType(rawType); const typ = CANON_TYPE[typ0] ?? typ0; - // Determine timestamp - const tsMs = - (typeof (ev as any)?.timestamp === "number" && (ev as any).timestamp) || - (typeof (ev as any)?.data?.timestamp === "number" && (ev as any).data.timestamp) || - (typeof (ev as any)?.data?.event_timestamp === "number" && (ev as any).data.event_timestamp) || - null; - - const ts = tsMs ? new Date(tsMs) : new Date(); - - // Severity defaulting (do not skip on severity — store for audit) - let sev = String((ev as any).severity ?? "").trim().toLowerCase(); - if (!sev) sev = "warning"; + let finalType = typ; // Stop classification -> microstop/macrostop - let finalType = typ; if (typ === "stop") { const stopSec = (typeof (ev as any)?.data?.stoppage_duration_seconds === "number" && (ev as any).data.stoppage_duration_seconds) || @@ -98,36 +182,85 @@ export async function POST(req: Request) { if (stopSec != null) { finalType = stopSec >= MACROSTOP_SEC ? "macrostop" : "microstop"; } else { - // missing duration -> conservative finalType = "microstop"; } } if (!ALLOWED_TYPES.has(finalType)) { - skipped.push({ reason: "type_not_allowed", typ: finalType, sev }); - continue; + await prisma.ingestLog.create({ + data: { + orgId, + machineId: machine.id, + endpoint, + ok: false, + status: 400, + errorCode: "TYPE_NOT_ALLOWED", + errorMsg: `Event type not allowed: ${finalType}`, + schemaVersion, + seq, + tsDevice: tsDeviceDate, + body: rawBody, + ip, + userAgent, + }, + }); + return NextResponse.json( + { ok: false, error: "Invalid event type", detail: finalType }, + { status: 400 } + ); } + // Determine severity + let sev = String((ev as any).severity ?? "").trim().toLowerCase(); + if (!sev) sev = "warning"; + const title = String((ev as any).title ?? "").trim() || - (finalType === "slow-cycle" ? "Slow Cycle Detected" : - finalType === "macrostop" ? "Macrostop Detected" : - finalType === "microstop" ? "Microstop Detected" : - "Event"); + (finalType === "slow-cycle" + ? "Slow Cycle Detected" + : finalType === "macrostop" + ? "Macrostop Detected" + : finalType === "microstop" + ? "Microstop Detected" + : "Event"); const description = (ev as any).description ? String((ev as any).description) : null; - // store full blob, ensure object + // store full blob const rawData = (ev as any).data ?? ev; - const dataObj = typeof rawData === "string" ? (() => { - try { return JSON.parse(rawData); } catch { return { raw: rawData }; } - })() : rawData; + const dataObj = + typeof rawData === "string" + ? (() => { + try { + return JSON.parse(rawData); + } catch { + return { raw: rawData }; + } + })() + : rawData; + // Prefer work_order_id always + const workOrderId = + (ev as any)?.work_order_id ? String((ev as any).work_order_id) + : (ev as any)?.data?.work_order_id ? String((ev as any).data.work_order_id) + : null; + + const sku = + (ev as any)?.sku ? String((ev as any).sku) + : (ev as any)?.data?.sku ? String((ev as any).data.sku) + : null; + + // 7) Store event with Phase 0 meta const row = await prisma.machineEvent.create({ data: { - orgId: machine.orgId, + orgId, machineId: machine.id, - ts, + + // Phase 0 meta + schemaVersion, + seq, + ts: tsDeviceDate, + topic: String((ev as any).topic ?? finalType), eventType: finalType, severity: sev, @@ -135,19 +268,69 @@ export async function POST(req: Request) { title, description, data: dataObj, - workOrderId: - (ev as any)?.work_order_id ? String((ev as any).work_order_id) - : (ev as any)?.data?.work_order_id ? String((ev as any).data.work_order_id) - : null, - sku: - (ev as any)?.sku ? String((ev as any).sku) - : (ev as any)?.data?.sku ? String((ev as any).data.sku) - : null, + workOrderId, + sku, }, }); - created.push({ id: row.id, ts: row.ts, eventType: row.eventType }); - } + // Optional: update machine last seen + await prisma.machine.update({ + where: { id: machine.id }, + data: { + schemaVersion, + seq, + tsDevice: tsDeviceDate, + tsServer: new Date(), + }, + }); - return NextResponse.json({ ok: true, createdCount: created.length, created, skippedCount: skipped.length, skipped }); + // 8) Ingest log success + await prisma.ingestLog.create({ + data: { + orgId, + machineId: machine.id, + endpoint, + ok: true, + status: 200, + schemaVersion, + seq, + tsDevice: tsDeviceDate, + body: rawBody, + ip, + userAgent, + }, + }); + + return NextResponse.json({ + ok: true, + createdCount: 1, + created: [{ id: row.id, ts: row.ts, eventType: row.eventType }], + skippedCount: 0, + skipped: [], + }); + } catch (err: any) { + const msg = err?.message ? String(err.message) : "Unknown error"; + + try { + await prisma.ingestLog.create({ + data: { + orgId, + machineId, + endpoint, + ok: false, + status: 500, + errorCode: "SERVER_ERROR", + errorMsg: msg, + schemaVersion, + seq, + tsDevice: tsDeviceDate ?? undefined, + body: rawBody, + ip, + userAgent, + }, + }); + } catch {} + + return NextResponse.json({ ok: false, error: "Server error", detail: msg }, { status: 500 }); + } } diff --git a/app/api/ingest/heartbeat/route.ts b/app/api/ingest/heartbeat/route.ts index 73b3e00..24d97ca 100644 --- a/app/api/ingest/heartbeat/route.ts +++ b/app/api/ingest/heartbeat/route.ts @@ -1,32 +1,168 @@ import { NextResponse } from "next/server"; import { prisma } from "@/lib/prisma"; +import { normalizeHeartbeatV1 } from "@/lib/contracts/v1"; + +function getClientIp(req: Request) { + const xf = req.headers.get("x-forwarded-for"); + if (xf) return xf.split(",")[0]?.trim() || null; + return req.headers.get("x-real-ip") || null; +} + +function parseSeqToBigInt(seq: unknown): bigint | null { + if (seq === null || seq === undefined) return null; + if (typeof seq === "number") { + if (!Number.isInteger(seq) || seq < 0) return null; + return BigInt(seq); + } + if (typeof seq === "string" && /^\d+$/.test(seq)) return BigInt(seq); + return null; +} export async function POST(req: Request) { - const apiKey = req.headers.get("x-api-key"); - if (!apiKey) return NextResponse.json({ ok: false, error: "Missing api key" }, { status: 401 }); + const endpoint = "/api/ingest/heartbeat"; + const ip = getClientIp(req); + const userAgent = req.headers.get("user-agent"); - const body = await req.json().catch(() => null); - if (!body?.machineId || !body?.status) { - return NextResponse.json({ ok: false, error: "Invalid payload" }, { status: 400 }); + let rawBody: any = null; + let orgId: string | null = null; + let machineId: string | null = null; + let seq: bigint | null = null; + let schemaVersion: string | null = null; + let tsDeviceDate: Date | null = null; + + try { + // 1) Auth header exists + const apiKey = req.headers.get("x-api-key"); + if (!apiKey) { + await prisma.ingestLog.create({ + data: { endpoint, ok: false, status: 401, errorCode: "MISSING_API_KEY", errorMsg: "Missing api key", ip, userAgent }, + }); + return NextResponse.json({ ok: false, error: "Missing api key" }, { status: 401 }); + } + + // 2) Parse JSON + rawBody = await req.json().catch(() => null); + + // 3) Normalize to v1 (legacy tolerated) + const normalized = normalizeHeartbeatV1(rawBody); + if (!normalized.ok) { + await prisma.ingestLog.create({ + data: { endpoint, ok: false, status: 400, errorCode: "INVALID_PAYLOAD", errorMsg: normalized.error, body: rawBody, ip, userAgent }, + }); + return NextResponse.json({ ok: false, error: "Invalid payload", detail: normalized.error }, { status: 400 }); + } + + const body = normalized.value; + schemaVersion = body.schemaVersion; + machineId = body.machineId; + seq = parseSeqToBigInt(body.seq); + tsDeviceDate = new Date(body.tsDevice); + + // 4) Authorize machineId + apiKey + const machine = await prisma.machine.findFirst({ + where: { id: machineId, apiKey }, + select: { id: true, orgId: true }, + }); + + if (!machine) { + await prisma.ingestLog.create({ + data: { + endpoint, + ok: false, + status: 401, + errorCode: "UNAUTHORIZED", + errorMsg: "Unauthorized (machineId/apiKey mismatch)", + body: rawBody, + machineId, + schemaVersion, + seq, + tsDevice: tsDeviceDate, + ip, + userAgent, + }, + }); + return NextResponse.json({ ok: false, error: "Unauthorized" }, { status: 401 }); + } + + orgId = machine.orgId; + + // 5) Store heartbeat + // Keep your legacy fields, but store meta fields too. + const hb = await prisma.machineHeartbeat.create({ + data: { + orgId, + machineId: machine.id, + + // Phase 0 meta + schemaVersion, + seq, + ts: tsDeviceDate, + + // Legacy payload compatibility + status: body.status ? String(body.status) : (body.online ? "RUN" : "STOP"), + message: body.message ? String(body.message) : null, + ip: body.ip ? String(body.ip) : null, + fwVersion: body.fwVersion ? String(body.fwVersion) : null, + }, + }); + + // Optional: update machine last seen (same as KPI) + await prisma.machine.update({ + where: { id: machine.id }, + data: { + schemaVersion, + seq, + tsDevice: tsDeviceDate, + tsServer: new Date(), + }, + }); + + // 6) Ingest log success + await prisma.ingestLog.create({ + data: { + orgId, + machineId: machine.id, + endpoint, + ok: true, + status: 200, + schemaVersion, + seq, + tsDevice: tsDeviceDate, + body: rawBody, + ip, + userAgent, + }, + }); + + return NextResponse.json({ + ok: true, + id: hb.id, + tsDevice: hb.ts, + tsServer: hb.tsServer, + }); + } catch (err: any) { + const msg = err?.message ? String(err.message) : "Unknown error"; + + try { + await prisma.ingestLog.create({ + data: { + orgId, + machineId, + endpoint, + ok: false, + status: 500, + errorCode: "SERVER_ERROR", + errorMsg: msg, + schemaVersion, + seq, + tsDevice: tsDeviceDate ?? undefined, + body: rawBody, + ip, + userAgent, + }, + }); + } catch {} + + return NextResponse.json({ ok: false, error: "Server error", detail: msg }, { status: 500 }); } - - const machine = await prisma.machine.findFirst({ - where: { id: String(body.machineId), apiKey }, - select: { id: true, orgId: true }, - }); - - if (!machine) return NextResponse.json({ ok: false, error: "Unauthorized" }, { status: 401 }); - - const hb = await prisma.machineHeartbeat.create({ - data: { - orgId: machine.orgId, - machineId: machine.id, - status: String(body.status), - message: body.message ? String(body.message) : null, - ip: body.ip ? String(body.ip) : null, - fwVersion: body.fwVersion ? String(body.fwVersion) : null, - }, - }); - - return NextResponse.json({ ok: true, id: hb.id, ts: hb.ts }); } diff --git a/app/api/ingest/kpi/route.ts b/app/api/ingest/kpi/route.ts index db66645..0245eed 100644 --- a/app/api/ingest/kpi/route.ts +++ b/app/api/ingest/kpi/route.ts @@ -1,50 +1,217 @@ +// mis-control-tower/app/api/ingest/kpi/route.ts import { NextResponse } from "next/server"; import { prisma } from "@/lib/prisma"; +import { normalizeSnapshotV1 } from "@/lib/contracts/v1"; + +function getClientIp(req: Request) { + const xf = req.headers.get("x-forwarded-for"); + if (xf) return xf.split(",")[0]?.trim() || null; + return req.headers.get("x-real-ip") || null; +} + +function parseSeqToBigInt(seq: unknown): bigint | null { + if (seq === null || seq === undefined) return null; + if (typeof seq === "number") { + if (!Number.isInteger(seq) || seq < 0) return null; + return BigInt(seq); + } + if (typeof seq === "string" && /^\d+$/.test(seq)) return BigInt(seq); + return null; +} export async function POST(req: Request) { - const apiKey = req.headers.get("x-api-key"); - if (!apiKey) return NextResponse.json({ ok: false, error: "Missing api key" }, { status: 401 }); + const endpoint = "/api/ingest/kpi"; + const startedAt = Date.now(); + const ip = getClientIp(req); + const userAgent = req.headers.get("user-agent"); - const body = await req.json().catch(() => null); - if (!body?.machineId || !body?.kpis) { - return NextResponse.json({ ok: false, error: "Invalid payload" }, { status: 400 }); + let rawBody: any = null; + let orgId: string | null = null; + let machineId: string | null = null; + let seq: bigint | null = null; + let schemaVersion: string | null = null; + let tsDeviceDate: Date | null = null; + + try { + const apiKey = req.headers.get("x-api-key"); + if (!apiKey) { + await prisma.ingestLog.create({ + data: { + endpoint, + ok: false, + status: 401, + errorCode: "MISSING_API_KEY", + errorMsg: "Missing api key", + ip, + userAgent, + }, + }); + return NextResponse.json({ ok: false, error: "Missing api key" }, { status: 401 }); + } + + rawBody = await req.json().catch(() => null); + const normalized = normalizeSnapshotV1(rawBody); + if (!normalized.ok) { + await prisma.ingestLog.create({ + data: { + endpoint, + ok: false, + status: 400, + errorCode: "INVALID_PAYLOAD", + errorMsg: normalized.error, + body: rawBody, + ip, + userAgent, + }, + }); + return NextResponse.json({ ok: false, error: "Invalid payload", detail: normalized.error }, { status: 400 }); + } + + const body = normalized.value; + + schemaVersion = body.schemaVersion; + machineId = body.machineId; + seq = parseSeqToBigInt(body.seq); + tsDeviceDate = new Date(body.tsDevice); + + // Auth: machineId + apiKey must match + const machine = await prisma.machine.findFirst({ + where: { id: machineId, apiKey }, + select: { id: true, orgId: true }, + }); + + if (!machine) { + await prisma.ingestLog.create({ + data: { + endpoint, + ok: false, + status: 401, + errorCode: "UNAUTHORIZED", + errorMsg: "Unauthorized (machineId/apiKey mismatch)", + body: rawBody, + machineId, + schemaVersion, + seq, + tsDevice: tsDeviceDate, + ip, + userAgent, + }, + }); + return NextResponse.json({ ok: false, error: "Unauthorized" }, { status: 401 }); + } + + orgId = machine.orgId; + + const wo = body.activeWorkOrder ?? {}; + const k = body.kpis ?? {}; + const safeCycleTime = + typeof body.cycleTime === "number" && body.cycleTime > 0 + ? body.cycleTime + : (typeof (wo as any).cycleTime === "number" && (wo as any).cycleTime > 0 ? (wo as any).cycleTime : null); + + const safeCavities = + typeof body.cavities === "number" && body.cavities > 0 + ? body.cavities + : (typeof (wo as any).cavities === "number" && (wo as any).cavities > 0 ? (wo as any).cavities : null); + // Write snapshot (ts = tsDevice; tsServer auto) + const row = await prisma.machineKpiSnapshot.create({ + data: { + orgId, + machineId: machine.id, + + // Phase 0 meta + schemaVersion, + seq, + ts: tsDeviceDate, // store device-time in ts; server-time goes to ts_server + + // Work order fields + workOrderId: wo.id ? String(wo.id) : null, + sku: wo.sku ? String(wo.sku) : null, + target: typeof wo.target === "number" ? Math.trunc(wo.target) : null, + good: typeof wo.good === "number" ? Math.trunc(wo.good) : null, + scrap: typeof wo.scrap === "number" ? Math.trunc(wo.scrap) : null, + + // Counters + cycleCount: typeof body.cycle_count === "number" ? body.cycle_count : null, + goodParts: typeof body.good_parts === "number" ? body.good_parts : null, + scrapParts: typeof body.scrap_parts === "number" ? body.scrap_parts : null, + cavities: safeCavities, + + // Cycle times + cycleTime: safeCycleTime, + actualCycle: typeof body.actualCycleTime === "number" ? body.actualCycleTime : null, + + // KPIs (0..100) + availability: typeof k.availability === "number" ? k.availability : null, + performance: typeof k.performance === "number" ? k.performance : null, + quality: typeof k.quality === "number" ? k.quality : null, + oee: typeof k.oee === "number" ? k.oee : null, + + trackingEnabled: typeof body.trackingEnabled === "boolean" ? body.trackingEnabled : null, + productionStarted: typeof body.productionStarted === "boolean" ? body.productionStarted : null, + }, + }); + + // Optional but useful: update machine "last seen" meta fields + await prisma.machine.update({ + where: { id: machine.id }, + data: { + schemaVersion, + seq, + tsDevice: tsDeviceDate, + tsServer: new Date(), + }, + }); + + await prisma.ingestLog.create({ + data: { + orgId, + machineId: machine.id, + endpoint, + ok: true, + status: 200, + schemaVersion, + seq, + tsDevice: tsDeviceDate, + body: rawBody, + ip, + userAgent, + }, + }); + + return NextResponse.json({ + ok: true, + id: row.id, + tsDevice: row.ts, + tsServer: row.tsServer, + }); + } catch (err: any) { + const msg = err?.message ? String(err.message) : "Unknown error"; + + // Never fail the request because logging failed + try { + await prisma.ingestLog.create({ + data: { + orgId, + machineId, + endpoint, + ok: false, + status: 500, + errorCode: "SERVER_ERROR", + errorMsg: msg, + schemaVersion, + seq, + tsDevice: tsDeviceDate ?? undefined, + body: rawBody, + ip, + userAgent, + }, + }); + } catch {} + + return NextResponse.json({ ok: false, error: "Server error", detail: msg }, { status: 500 }); + } finally { + // (If later you add latency_ms to IngestLog, you can store Date.now() - startedAt here.) + void startedAt; } - - const machine = await prisma.machine.findFirst({ - where: { id: String(body.machineId), apiKey }, - select: { id: true, orgId: true }, - }); - if (!machine) return NextResponse.json({ ok: false, error: "Unauthorized" }, { status: 401 }); - - const wo = body.activeWorkOrder ?? {}; - const k = body.kpis ?? {}; - - const row = await prisma.machineKpiSnapshot.create({ - data: { - orgId: machine.orgId, - machineId: machine.id, - - workOrderId: wo.id ? String(wo.id) : null, - sku: wo.sku ? String(wo.sku) : null, - - target: typeof wo.target === "number" ? wo.target : null, - good: typeof wo.good === "number" ? wo.good : null, - scrap: typeof wo.scrap === "number" ? wo.scrap : null, - - cycleCount: typeof body.cycle_count === "number" ? body.cycle_count : null, - goodParts: typeof body.good_parts === "number" ? body.good_parts : null, - - cycleTime: typeof body.cycleTime === "number" ? body.cycleTime : null, - - availability: typeof k.availability === "number" ? k.availability : null, - performance: typeof k.performance === "number" ? k.performance : null, - quality: typeof k.quality === "number" ? k.quality : null, - oee: typeof k.oee === "number" ? k.oee : null, - - trackingEnabled: typeof body.trackingEnabled === "boolean" ? body.trackingEnabled : null, - productionStarted: typeof body.productionStarted === "boolean" ? body.productionStarted : null, - }, - }); - - return NextResponse.json({ ok: true, id: row.id, ts: row.ts }); } diff --git a/data_validation_edge_contract.md b/data_validation_edge_contract.md new file mode 100644 index 0000000..67c17b3 --- /dev/null +++ b/data_validation_edge_contract.md @@ -0,0 +1,74 @@ +# MIS Edge → Cloud Contract (v1.0) + +All ingest payloads MUST include these top-level meta fields: + +- schemaVersion: "1.0" +- machineId: UUID +- tsDevice: epoch milliseconds (number) +- seq: monotonic integer per machine (persisted across reboots) + +## POST /api/ingest/heartbeat +{ + "schemaVersion": "1.0", + "machineId": "uuid", + "tsDevice": 1766427568335, + "seq": 123, + "online": true, + "message": "NR heartbeat", + "ip": "192.168.18.33", + "fwVersion": "raspi-nodered-1.0" +} + +## POST /api/ingest/kpi (snapshot) +{ + "schemaVersion": "1.0", + "machineId": "uuid", + "tsDevice": 1766427568335, + "seq": 124, + "activeWorkOrder": { "id": "OT-10001", "sku": "YoguFrut", "target": 600000, "good": 312640, "scrap": 0 }, + "cycle_count": 31264, + "good_parts": 312640, + "trackingEnabled": true, + "productionStarted": true, + "cycleTime": 14, + "kpis": { "oee": 100, "availability": 100, "performance": 100, "quality": 100 } +} + +## POST /api/ingest/cycle +{ + "schemaVersion": "1.0", + "machineId": "uuid", + "tsDevice": 1766427568335, + "seq": 125, + "cycle": { + "timestamp": 1766427568335, + "cycle_count": 31264, + "actual_cycle_time": 10.141, + "theoretical_cycle_time": 14, + "work_order_id": "OT-10001", + "sku": "YoguFrut", + "cavities": 10, + "good_delta": 10, + "scrap_total": 0 + } +} + +## POST /api/ingest/event +Edge MUST split arrays; cloud expects one event per request. +{ + "schemaVersion": "1.0", + "machineId": "uuid", + "tsDevice": 1766427568335, + "seq": 126, + "event": { + "anomaly_type": "slow-cycle", + "severity": "warning", + "title": "Slow Cycle Detected", + "description": "Cycle took 23.6s", + "timestamp": 1766427568335, + "work_order_id": "OT-10001", + "cycle_count": 31265, + "data": {}, + "kpi_snapshot": {} + } +} diff --git a/lib/contracts/v1.ts b/lib/contracts/v1.ts new file mode 100644 index 0000000..16d2960 --- /dev/null +++ b/lib/contracts/v1.ts @@ -0,0 +1,210 @@ +// /home/mdares/mis-control-tower/lib/contracts/v1.ts +import { z } from "zod"; + +/** + * Phase 0: freeze schema version string now and never change it without bumping. + * If you later create v2, make a new file or new constant. + */ +export const SCHEMA_VERSION = "1.0"; + +// KPI scale is frozen as 0..100 (you confirmed) +const KPI_0_100 = z.number().min(0).max(100); + +export const SnapshotV1 = z + .object({ + schemaVersion: z.literal(SCHEMA_VERSION), + machineId: z.string().uuid(), + tsDevice: z.number().int().nonnegative(), // epoch ms + // IMPORTANT: seq should be sent as string if it can ever exceed JS safe int + seq: z.union([z.number().int().nonnegative(), z.string().regex(/^\d+$/)]), + + // current shape (keep it flat so Node-RED changes are minimal) + activeWorkOrder: z + .object({ + id: z.string(), + sku: z.string().optional(), + target: z.number().optional(), + good: z.number().optional(), + scrap: z.number().optional(), + }) + .partial() + .optional(), + + cycle_count: z.number().int().nonnegative().optional(), + good_parts: z.number().int().nonnegative().optional(), + scrap_parts: z.number().int().nonnegative().optional(), + cavities: z.number().int().positive().optional(), + + cycleTime: z.number().nonnegative().optional(), // theoretical/target cycle time + actualCycleTime: z.number().nonnegative().optional(), // optional + + trackingEnabled: z.boolean().optional(), + productionStarted: z.boolean().optional(), + + kpis: z.object({ + oee: KPI_0_100, + availability: KPI_0_100, + performance: KPI_0_100, + quality: KPI_0_100, + }), + }) + .passthrough(); + +/** + * TEMPORARY: Accept your current legacy payload while Node-RED is not sending + * schemaVersion/tsDevice/seq yet. Remove this once edge is upgraded. + */ +const SnapshotLegacy = z + .object({ + machineId: z.any(), + kpis: z.any(), + }) + .passthrough(); + +export type SnapshotV1Type = z.infer; + +export function normalizeSnapshotV1(raw: unknown): { ok: true; value: SnapshotV1Type } | { ok: false; error: string } { + const strict = SnapshotV1.safeParse(raw); + if (strict.success) return { ok: true, value: strict.data }; + + // Legacy fallback (temporary) + const legacy = SnapshotLegacy.safeParse(raw); + if (!legacy.success) { + return { ok: false, error: strict.error.message }; + } + + const b: any = legacy.data; + + // Build a "best effort" SnapshotV1 so ingest works during transition. + // seq is intentionally set to "0" if missing (so you can still store); + // once Node-RED emits real seq, dedupe and ordering become reliable. + const migrated: any = { + schemaVersion: SCHEMA_VERSION, + machineId: String(b.machineId), + tsDevice: typeof b.tsDevice === "number" ? b.tsDevice : Date.now(), + seq: typeof b.seq === "number" || typeof b.seq === "string" ? b.seq : "0", + ...b, + }; + + const recheck = SnapshotV1.safeParse(migrated); + if (!recheck.success) return { ok: false, error: recheck.error.message }; + return { ok: true, value: recheck.data }; +} + +const HeartbeatV1 = z.object({ + schemaVersion: z.literal(SCHEMA_VERSION), + machineId: z.string().uuid(), + tsDevice: z.number().int().nonnegative(), + seq: z.union([z.number().int().nonnegative(), z.string().regex(/^\d+$/)]), + + // legacy shape you currently send: status/message/ip/fwVersion + status: z.string().optional(), + message: z.string().optional(), + ip: z.string().optional(), + fwVersion: z.string().optional(), + + // new canonical boolean + online: z.boolean().optional(), +}).passthrough(); + +export function normalizeHeartbeatV1(raw: unknown) { + const strict = HeartbeatV1.safeParse(raw); + if (strict.success) return { ok: true as const, value: strict.data }; + + // legacy fallback: allow missing meta + const legacy = z.object({ machineId: z.any() }).passthrough().safeParse(raw); + if (!legacy.success) return { ok: false as const, error: strict.error.message }; + + const b: any = legacy.data; + const migrated: any = { + schemaVersion: SCHEMA_VERSION, + machineId: String(b.machineId), + tsDevice: typeof b.tsDevice === "number" ? b.tsDevice : Date.now(), + seq: typeof b.seq === "number" || typeof b.seq === "string" ? b.seq : "0", + ...b, + }; + + const recheck = HeartbeatV1.safeParse(migrated); + if (!recheck.success) return { ok: false as const, error: recheck.error.message }; + return { ok: true as const, value: recheck.data }; +} + +const CycleV1 = z.object({ + schemaVersion: z.literal(SCHEMA_VERSION), + machineId: z.string().uuid(), + tsDevice: z.number().int().nonnegative(), + seq: z.union([z.number().int().nonnegative(), z.string().regex(/^\d+$/)]), + + cycle: z.object({ + timestamp: z.number().int().positive(), + cycle_count: z.number().int().nonnegative(), + actual_cycle_time: z.number(), + theoretical_cycle_time: z.number().optional(), + work_order_id: z.string(), + sku: z.string().optional(), + cavities: z.number().optional(), + good_delta: z.number().optional(), + scrap_total: z.number().optional(), + }).passthrough(), +}).passthrough(); + +export function normalizeCycleV1(raw: unknown) { + const strict = CycleV1.safeParse(raw); + if (strict.success) return { ok: true as const, value: strict.data }; + + // legacy fallback: { machineId, cycle } + const legacy = z.object({ machineId: z.any(), cycle: z.any() }).passthrough().safeParse(raw); + if (!legacy.success) return { ok: false as const, error: strict.error.message }; + + const b: any = legacy.data; + const tsDevice = typeof b.tsDevice === "number" ? b.tsDevice : (b.cycle?.timestamp ?? Date.now()); + const seq = typeof b.seq === "number" || typeof b.seq === "string" ? b.seq : (b.cycle?.cycle_count ?? "0"); + + const migrated: any = { schemaVersion: SCHEMA_VERSION, machineId: String(b.machineId), tsDevice, seq, ...b }; + const recheck = CycleV1.safeParse(migrated); + if (!recheck.success) return { ok: false as const, error: recheck.error.message }; + return { ok: true as const, value: recheck.data }; +} + +const EventV1 = z.object({ + schemaVersion: z.literal(SCHEMA_VERSION), + machineId: z.string().uuid(), + tsDevice: z.number().int().nonnegative(), + seq: z.union([z.number().int().nonnegative(), z.string().regex(/^\d+$/)]), + + // IMPORTANT: event must be an object, not an array + event: z.object({ + anomaly_type: z.string(), + severity: z.string(), + title: z.string(), + description: z.string().optional(), + timestamp: z.number().int().positive(), + work_order_id: z.string(), + cycle_count: z.number().optional(), + data: z.any().optional(), + kpi_snapshot: z.any().optional(), + }).passthrough(), +}).passthrough(); + +export function normalizeEventV1(raw: unknown) { + const strict = EventV1.safeParse(raw); + if (strict.success) return { ok: true as const, value: strict.data }; + + // legacy fallback: allow missing meta, but STILL reject arrays later + const legacy = z.object({ machineId: z.any(), event: z.any() }).passthrough().safeParse(raw); + if (!legacy.success) return { ok: false as const, error: strict.error.message }; + + const b: any = legacy.data; + const tsDevice = typeof b.tsDevice === "number" ? b.tsDevice : (b.event?.timestamp ?? Date.now()); + const migrated: any = { + schemaVersion: SCHEMA_VERSION, + machineId: String(b.machineId), + tsDevice, + seq: typeof b.seq === "number" || typeof b.seq === "string" ? b.seq : "0", + ...b, + }; + + const recheck = EventV1.safeParse(migrated); + if (!recheck.success) return { ok: false as const, error: recheck.error.message }; + return { ok: true as const, value: recheck.data }; +} diff --git a/package-lock.json b/package-lock.json index 0e21818..a469c39 100644 --- a/package-lock.json +++ b/package-lock.json @@ -14,7 +14,8 @@ "next": "16.0.10", "react": "19.2.1", "react-dom": "19.2.1", - "recharts": "^3.6.0" + "recharts": "^3.6.0", + "zod": "^4.2.1" }, "devDependencies": { "@tailwindcss/postcss": "^4", @@ -7373,7 +7374,6 @@ "version": "4.2.1", "resolved": "https://registry.npmjs.org/zod/-/zod-4.2.1.tgz", "integrity": "sha512-0wZ1IRqGGhMP76gLqz8EyfBXKk0J2qo2+H3fi4mcUP/KtTocoX08nmIAHl1Z2kJIZbZee8KOpBCSNPRgauucjw==", - "dev": true, "license": "MIT", "funding": { "url": "https://github.com/sponsors/colinhacks" diff --git a/package.json b/package.json index 0d0f9db..2f12721 100644 --- a/package.json +++ b/package.json @@ -15,7 +15,8 @@ "next": "16.0.10", "react": "19.2.1", "react-dom": "19.2.1", - "recharts": "^3.6.0" + "recharts": "^3.6.0", + "zod": "^4.2.1" }, "devDependencies": { "@tailwindcss/postcss": "^4", diff --git a/prisma/migrations/20251222235834_ingest_log/migration.sql b/prisma/migrations/20251222235834_ingest_log/migration.sql new file mode 100644 index 0000000..148c28a --- /dev/null +++ b/prisma/migrations/20251222235834_ingest_log/migration.sql @@ -0,0 +1,55 @@ +-- AlterTable +ALTER TABLE "Machine" ADD COLUMN "schema_version" TEXT, +ADD COLUMN "seq" BIGINT, +ADD COLUMN "ts" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, +ADD COLUMN "ts_server" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP; + +-- AlterTable +ALTER TABLE "MachineCycle" ADD COLUMN "schema_version" TEXT, +ADD COLUMN "seq" BIGINT, +ADD COLUMN "ts_server" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP; + +-- AlterTable +ALTER TABLE "MachineEvent" ADD COLUMN "schema_version" TEXT, +ADD COLUMN "seq" BIGINT, +ADD COLUMN "ts_server" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP; + +-- AlterTable +ALTER TABLE "MachineHeartbeat" ADD COLUMN "schema_version" TEXT, +ADD COLUMN "seq" BIGINT, +ADD COLUMN "ts_server" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP; + +-- AlterTable +ALTER TABLE "MachineKpiSnapshot" ADD COLUMN "schema_version" TEXT, +ADD COLUMN "seq" BIGINT, +ADD COLUMN "ts_server" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP; + +-- CreateTable +CREATE TABLE "IngestLog" ( + "id" TEXT NOT NULL, + "orgId" TEXT, + "machineId" TEXT, + "endpoint" TEXT NOT NULL, + "schemaVersion" TEXT, + "seq" BIGINT, + "tsDevice" TIMESTAMP(3), + "tsServer" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "ok" BOOLEAN NOT NULL, + "status" INTEGER NOT NULL, + "errorCode" TEXT, + "errorMsg" TEXT, + "body" JSONB, + "ip" TEXT, + "userAgent" TEXT, + + CONSTRAINT "IngestLog_pkey" PRIMARY KEY ("id") +); + +-- CreateIndex +CREATE INDEX "IngestLog_endpoint_tsServer_idx" ON "IngestLog"("endpoint", "tsServer"); + +-- CreateIndex +CREATE INDEX "IngestLog_machineId_tsServer_idx" ON "IngestLog"("machineId", "tsServer"); + +-- CreateIndex +CREATE INDEX "IngestLog_machineId_seq_idx" ON "IngestLog"("machineId", "seq"); diff --git a/prisma/schema.prisma b/prisma/schema.prisma index 84d4369..9789665 100644 --- a/prisma/schema.prisma +++ b/prisma/schema.prisma @@ -76,6 +76,10 @@ model Machine { location String? createdAt DateTime @default(now()) updatedAt DateTime @updatedAt + tsDevice DateTime @default(now()) @map("ts") + tsServer DateTime @default(now()) @map("ts_server") + schemaVersion String? @map("schema_version") + seq BigInt? @map("seq") org Org @relation(fields: [orgId], references: [id], onDelete: Cascade) heartbeats MachineHeartbeat[] @@ -93,6 +97,9 @@ model MachineHeartbeat { orgId String machineId String ts DateTime @default(now()) + tsServer DateTime @default(now()) @map("ts_server") + schemaVersion String? @map("schema_version") + seq BigInt? @map("seq") status String message String? @@ -131,6 +138,9 @@ model MachineKpiSnapshot { trackingEnabled Boolean? productionStarted Boolean? + tsServer DateTime @default(now()) @map("ts_server") + schemaVersion String? @map("schema_version") + seq BigInt? @map("seq") org Org @relation(fields: [orgId], references: [id], onDelete: Cascade) machine Machine @relation(fields: [machineId], references: [id], onDelete: Cascade) @@ -150,6 +160,9 @@ model MachineEvent { requiresAck Boolean @default(false) title String description String? + tsServer DateTime @default(now()) @map("ts_server") + schemaVersion String? @map("schema_version") + seq BigInt? @map("seq") // store the raw data blob so we don't lose fields data Json? @@ -179,6 +192,9 @@ model MachineCycle { cavities Int? goodDelta Int? scrapDelta Int? + tsServer DateTime @default(now()) @map("ts_server") + schemaVersion String? @map("schema_version") + seq BigInt? @map("seq") createdAt DateTime @default(now()) @@ -187,3 +203,27 @@ model MachineCycle { @@index([orgId, machineId, cycleCount]) } +model IngestLog { + id String @id @default(uuid()) + orgId String? + machineId String? + endpoint String + schemaVersion String? + seq BigInt? + tsDevice DateTime? + tsServer DateTime @default(now()) + + ok Boolean + status Int + errorCode String? + errorMsg String? + body Json? + ip String? + userAgent String? + + @@index([endpoint, tsServer]) + @@index([machineId, tsServer]) + @@index([machineId, seq]) +} + +