Compare commits

..

4 Commits

Author SHA1 Message Date
Marcelo
a369a69978 Sandbox: ingest fixes + env separation 2025-12-29 22:20:57 +00:00
Marcelo
1fe0b4dbf9 V1.1 2025-12-29 18:43:39 +00:00
Marcelo
945ff2dc09 Issues with data flow & consistency 2025-12-22 14:36:40 +00:00
Marcelo
ffc39a5c90 MVP 2025-12-18 20:17:20 +00:00
10 changed files with 1086 additions and 146 deletions

View File

@@ -1,5 +1,22 @@
import { NextResponse } from "next/server";
import { prisma } from "@/lib/prisma";
import { normalizeEventV1 } from "@/lib/contracts/v1";
function getClientIp(req: Request) {
const xf = req.headers.get("x-forwarded-for");
if (xf) return xf.split(",")[0]?.trim() || null;
return req.headers.get("x-real-ip") || null;
}
function parseSeqToBigInt(seq: unknown): bigint | null {
if (seq === null || seq === undefined) return null;
if (typeof seq === "number") {
if (!Number.isInteger(seq) || seq < 0) return null;
return BigInt(seq);
}
if (typeof seq === "string" && /^\d+$/.test(seq)) return BigInt(seq);
return null;
}
const normalizeType = (t: any) =>
String(t ?? "")
@@ -38,57 +55,124 @@ const MICROSTOP_SEC = 60;
const MACROSTOP_SEC = 300;
export async function POST(req: Request) {
const endpoint = "/api/ingest/event";
const ip = getClientIp(req);
const userAgent = req.headers.get("user-agent");
let rawBody: any = null;
let orgId: string | null = null;
let machineId: string | null = null;
let schemaVersion: string | null = null;
let seq: bigint | null = null;
let tsDeviceDate: Date | null = null;
try {
// 1) Auth header exists
const apiKey = req.headers.get("x-api-key");
if (!apiKey) {
await prisma.ingestLog.create({
data: {
endpoint,
ok: false,
status: 401,
errorCode: "MISSING_API_KEY",
errorMsg: "Missing api key",
ip,
userAgent,
},
});
return NextResponse.json({ ok: false, error: "Missing api key" }, { status: 401 });
}
const body = await req.json().catch(() => null);
if (!body?.machineId || !body?.event) {
return NextResponse.json({ ok: false, error: "Invalid payload" }, { status: 400 });
// 2) Parse JSON
rawBody = await req.json().catch(() => null);
// 3) Reject arrays at the contract boundary (Phase 0 rule)
// Edge MUST split arrays into one event per POST.
if (rawBody?.event && Array.isArray(rawBody.event)) {
await prisma.ingestLog.create({
data: {
endpoint,
ok: false,
status: 400,
errorCode: "EVENT_ARRAY_NOT_ALLOWED",
errorMsg: "Edge must split arrays; send one event per request.",
body: rawBody,
machineId: rawBody?.machineId ? String(rawBody.machineId) : null,
ip,
userAgent,
},
});
return NextResponse.json(
{ ok: false, error: "Invalid payload", detail: "event array not allowed; split on edge" },
{ status: 400 }
);
}
// 4) Normalize to v1 (legacy tolerated)
const normalized = normalizeEventV1(rawBody);
if (!normalized.ok) {
await prisma.ingestLog.create({
data: {
endpoint,
ok: false,
status: 400,
errorCode: "INVALID_PAYLOAD",
errorMsg: normalized.error,
body: rawBody,
ip,
userAgent,
},
});
return NextResponse.json({ ok: false, error: "Invalid payload", detail: normalized.error }, { status: 400 });
}
const body = normalized.value;
schemaVersion = body.schemaVersion;
machineId = body.machineId;
seq = parseSeqToBigInt(body.seq);
tsDeviceDate = new Date(body.tsDevice);
// 5) Authorize machineId + apiKey
const machine = await prisma.machine.findFirst({
where: { id: String(body.machineId), apiKey },
where: { id: machineId, apiKey },
select: { id: true, orgId: true },
});
if (!machine) {
await prisma.ingestLog.create({
data: {
endpoint,
ok: false,
status: 401,
errorCode: "UNAUTHORIZED",
errorMsg: "Unauthorized (machineId/apiKey mismatch)",
body: rawBody,
machineId,
schemaVersion,
seq,
tsDevice: tsDeviceDate,
ip,
userAgent,
},
});
return NextResponse.json({ ok: false, error: "Unauthorized" }, { status: 401 });
}
// Normalize to array (Node-RED sends array of anomalies)
const rawEvent = body.event;
const events = Array.isArray(rawEvent) ? rawEvent : [rawEvent];
orgId = machine.orgId;
const created: { id: string; ts: Date; eventType: string }[] = [];
const skipped: any[] = [];
// 6) Canonicalize + classify type (keep for now; later move to edge in A1)
const ev = body.event;
for (const ev of events) {
if (!ev || typeof ev !== "object") {
skipped.push({ reason: "invalid_event_object" });
continue;
}
const rawType = (ev as any).eventType ?? (ev as any).anomaly_type ?? (ev as any).topic ?? body.topic ?? "";
const rawType =
(ev as any).eventType ?? (ev as any).anomaly_type ?? (ev as any).topic ?? (body as any).topic ?? "";
const typ0 = normalizeType(rawType);
const typ = CANON_TYPE[typ0] ?? typ0;
// Determine timestamp
const tsMs =
(typeof (ev as any)?.timestamp === "number" && (ev as any).timestamp) ||
(typeof (ev as any)?.data?.timestamp === "number" && (ev as any).data.timestamp) ||
(typeof (ev as any)?.data?.event_timestamp === "number" && (ev as any).data.event_timestamp) ||
null;
const ts = tsMs ? new Date(tsMs) : new Date();
// Severity defaulting (do not skip on severity — store for audit)
let sev = String((ev as any).severity ?? "").trim().toLowerCase();
if (!sev) sev = "warning";
let finalType = typ;
// Stop classification -> microstop/macrostop
let finalType = typ;
if (typ === "stop") {
const stopSec =
(typeof (ev as any)?.data?.stoppage_duration_seconds === "number" && (ev as any).data.stoppage_duration_seconds) ||
@@ -98,36 +182,85 @@ export async function POST(req: Request) {
if (stopSec != null) {
finalType = stopSec >= MACROSTOP_SEC ? "macrostop" : "microstop";
} else {
// missing duration -> conservative
finalType = "microstop";
}
}
if (!ALLOWED_TYPES.has(finalType)) {
skipped.push({ reason: "type_not_allowed", typ: finalType, sev });
continue;
await prisma.ingestLog.create({
data: {
orgId,
machineId: machine.id,
endpoint,
ok: false,
status: 400,
errorCode: "TYPE_NOT_ALLOWED",
errorMsg: `Event type not allowed: ${finalType}`,
schemaVersion,
seq,
tsDevice: tsDeviceDate,
body: rawBody,
ip,
userAgent,
},
});
return NextResponse.json(
{ ok: false, error: "Invalid event type", detail: finalType },
{ status: 400 }
);
}
// Determine severity
let sev = String((ev as any).severity ?? "").trim().toLowerCase();
if (!sev) sev = "warning";
const title =
String((ev as any).title ?? "").trim() ||
(finalType === "slow-cycle" ? "Slow Cycle Detected" :
finalType === "macrostop" ? "Macrostop Detected" :
finalType === "microstop" ? "Microstop Detected" :
"Event");
(finalType === "slow-cycle"
? "Slow Cycle Detected"
: finalType === "macrostop"
? "Macrostop Detected"
: finalType === "microstop"
? "Microstop Detected"
: "Event");
const description = (ev as any).description ? String((ev as any).description) : null;
// store full blob, ensure object
// store full blob
const rawData = (ev as any).data ?? ev;
const dataObj = typeof rawData === "string" ? (() => {
try { return JSON.parse(rawData); } catch { return { raw: rawData }; }
})() : rawData;
const dataObj =
typeof rawData === "string"
? (() => {
try {
return JSON.parse(rawData);
} catch {
return { raw: rawData };
}
})()
: rawData;
// Prefer work_order_id always
const workOrderId =
(ev as any)?.work_order_id ? String((ev as any).work_order_id)
: (ev as any)?.data?.work_order_id ? String((ev as any).data.work_order_id)
: null;
const sku =
(ev as any)?.sku ? String((ev as any).sku)
: (ev as any)?.data?.sku ? String((ev as any).data.sku)
: null;
// 7) Store event with Phase 0 meta
const row = await prisma.machineEvent.create({
data: {
orgId: machine.orgId,
orgId,
machineId: machine.id,
ts,
// Phase 0 meta
schemaVersion,
seq,
ts: tsDeviceDate,
topic: String((ev as any).topic ?? finalType),
eventType: finalType,
severity: sev,
@@ -135,19 +268,69 @@ export async function POST(req: Request) {
title,
description,
data: dataObj,
workOrderId:
(ev as any)?.work_order_id ? String((ev as any).work_order_id)
: (ev as any)?.data?.work_order_id ? String((ev as any).data.work_order_id)
: null,
sku:
(ev as any)?.sku ? String((ev as any).sku)
: (ev as any)?.data?.sku ? String((ev as any).data.sku)
: null,
workOrderId,
sku,
},
});
created.push({ id: row.id, ts: row.ts, eventType: row.eventType });
}
// Optional: update machine last seen
await prisma.machine.update({
where: { id: machine.id },
data: {
schemaVersion,
seq,
tsDevice: tsDeviceDate,
tsServer: new Date(),
},
});
return NextResponse.json({ ok: true, createdCount: created.length, created, skippedCount: skipped.length, skipped });
// 8) Ingest log success
await prisma.ingestLog.create({
data: {
orgId,
machineId: machine.id,
endpoint,
ok: true,
status: 200,
schemaVersion,
seq,
tsDevice: tsDeviceDate,
body: rawBody,
ip,
userAgent,
},
});
return NextResponse.json({
ok: true,
createdCount: 1,
created: [{ id: row.id, ts: row.ts, eventType: row.eventType }],
skippedCount: 0,
skipped: [],
});
} catch (err: any) {
const msg = err?.message ? String(err.message) : "Unknown error";
try {
await prisma.ingestLog.create({
data: {
orgId,
machineId,
endpoint,
ok: false,
status: 500,
errorCode: "SERVER_ERROR",
errorMsg: msg,
schemaVersion,
seq,
tsDevice: tsDeviceDate ?? undefined,
body: rawBody,
ip,
userAgent,
},
});
} catch {}
return NextResponse.json({ ok: false, error: "Server error", detail: msg }, { status: 500 });
}
}

View File

@@ -1,32 +1,168 @@
import { NextResponse } from "next/server";
import { prisma } from "@/lib/prisma";
import { normalizeHeartbeatV1 } from "@/lib/contracts/v1";
function getClientIp(req: Request) {
const xf = req.headers.get("x-forwarded-for");
if (xf) return xf.split(",")[0]?.trim() || null;
return req.headers.get("x-real-ip") || null;
}
function parseSeqToBigInt(seq: unknown): bigint | null {
if (seq === null || seq === undefined) return null;
if (typeof seq === "number") {
if (!Number.isInteger(seq) || seq < 0) return null;
return BigInt(seq);
}
if (typeof seq === "string" && /^\d+$/.test(seq)) return BigInt(seq);
return null;
}
export async function POST(req: Request) {
const apiKey = req.headers.get("x-api-key");
if (!apiKey) return NextResponse.json({ ok: false, error: "Missing api key" }, { status: 401 });
const endpoint = "/api/ingest/heartbeat";
const ip = getClientIp(req);
const userAgent = req.headers.get("user-agent");
const body = await req.json().catch(() => null);
if (!body?.machineId || !body?.status) {
return NextResponse.json({ ok: false, error: "Invalid payload" }, { status: 400 });
let rawBody: any = null;
let orgId: string | null = null;
let machineId: string | null = null;
let seq: bigint | null = null;
let schemaVersion: string | null = null;
let tsDeviceDate: Date | null = null;
try {
// 1) Auth header exists
const apiKey = req.headers.get("x-api-key");
if (!apiKey) {
await prisma.ingestLog.create({
data: { endpoint, ok: false, status: 401, errorCode: "MISSING_API_KEY", errorMsg: "Missing api key", ip, userAgent },
});
return NextResponse.json({ ok: false, error: "Missing api key" }, { status: 401 });
}
// 2) Parse JSON
rawBody = await req.json().catch(() => null);
// 3) Normalize to v1 (legacy tolerated)
const normalized = normalizeHeartbeatV1(rawBody);
if (!normalized.ok) {
await prisma.ingestLog.create({
data: { endpoint, ok: false, status: 400, errorCode: "INVALID_PAYLOAD", errorMsg: normalized.error, body: rawBody, ip, userAgent },
});
return NextResponse.json({ ok: false, error: "Invalid payload", detail: normalized.error }, { status: 400 });
}
const body = normalized.value;
schemaVersion = body.schemaVersion;
machineId = body.machineId;
seq = parseSeqToBigInt(body.seq);
tsDeviceDate = new Date(body.tsDevice);
// 4) Authorize machineId + apiKey
const machine = await prisma.machine.findFirst({
where: { id: String(body.machineId), apiKey },
where: { id: machineId, apiKey },
select: { id: true, orgId: true },
});
if (!machine) return NextResponse.json({ ok: false, error: "Unauthorized" }, { status: 401 });
if (!machine) {
await prisma.ingestLog.create({
data: {
endpoint,
ok: false,
status: 401,
errorCode: "UNAUTHORIZED",
errorMsg: "Unauthorized (machineId/apiKey mismatch)",
body: rawBody,
machineId,
schemaVersion,
seq,
tsDevice: tsDeviceDate,
ip,
userAgent,
},
});
return NextResponse.json({ ok: false, error: "Unauthorized" }, { status: 401 });
}
orgId = machine.orgId;
// 5) Store heartbeat
// Keep your legacy fields, but store meta fields too.
const hb = await prisma.machineHeartbeat.create({
data: {
orgId: machine.orgId,
orgId,
machineId: machine.id,
status: String(body.status),
// Phase 0 meta
schemaVersion,
seq,
ts: tsDeviceDate,
// Legacy payload compatibility
status: body.status ? String(body.status) : (body.online ? "RUN" : "STOP"),
message: body.message ? String(body.message) : null,
ip: body.ip ? String(body.ip) : null,
fwVersion: body.fwVersion ? String(body.fwVersion) : null,
},
});
return NextResponse.json({ ok: true, id: hb.id, ts: hb.ts });
// Optional: update machine last seen (same as KPI)
await prisma.machine.update({
where: { id: machine.id },
data: {
schemaVersion,
seq,
tsDevice: tsDeviceDate,
tsServer: new Date(),
},
});
// 6) Ingest log success
await prisma.ingestLog.create({
data: {
orgId,
machineId: machine.id,
endpoint,
ok: true,
status: 200,
schemaVersion,
seq,
tsDevice: tsDeviceDate,
body: rawBody,
ip,
userAgent,
},
});
return NextResponse.json({
ok: true,
id: hb.id,
tsDevice: hb.ts,
tsServer: hb.tsServer,
});
} catch (err: any) {
const msg = err?.message ? String(err.message) : "Unknown error";
try {
await prisma.ingestLog.create({
data: {
orgId,
machineId,
endpoint,
ok: false,
status: 500,
errorCode: "SERVER_ERROR",
errorMsg: msg,
schemaVersion,
seq,
tsDevice: tsDeviceDate ?? undefined,
body: rawBody,
ip,
userAgent,
},
});
} catch {}
return NextResponse.json({ ok: false, error: "Server error", detail: msg }, { status: 500 });
}
}

View File

@@ -1,41 +1,147 @@
// mis-control-tower/app/api/ingest/kpi/route.ts
import { NextResponse } from "next/server";
import { prisma } from "@/lib/prisma";
import { normalizeSnapshotV1 } from "@/lib/contracts/v1";
function getClientIp(req: Request) {
const xf = req.headers.get("x-forwarded-for");
if (xf) return xf.split(",")[0]?.trim() || null;
return req.headers.get("x-real-ip") || null;
}
function parseSeqToBigInt(seq: unknown): bigint | null {
if (seq === null || seq === undefined) return null;
if (typeof seq === "number") {
if (!Number.isInteger(seq) || seq < 0) return null;
return BigInt(seq);
}
if (typeof seq === "string" && /^\d+$/.test(seq)) return BigInt(seq);
return null;
}
export async function POST(req: Request) {
const apiKey = req.headers.get("x-api-key");
if (!apiKey) return NextResponse.json({ ok: false, error: "Missing api key" }, { status: 401 });
const endpoint = "/api/ingest/kpi";
const startedAt = Date.now();
const ip = getClientIp(req);
const userAgent = req.headers.get("user-agent");
const body = await req.json().catch(() => null);
if (!body?.machineId || !body?.kpis) {
return NextResponse.json({ ok: false, error: "Invalid payload" }, { status: 400 });
let rawBody: any = null;
let orgId: string | null = null;
let machineId: string | null = null;
let seq: bigint | null = null;
let schemaVersion: string | null = null;
let tsDeviceDate: Date | null = null;
try {
const apiKey = req.headers.get("x-api-key");
if (!apiKey) {
await prisma.ingestLog.create({
data: {
endpoint,
ok: false,
status: 401,
errorCode: "MISSING_API_KEY",
errorMsg: "Missing api key",
ip,
userAgent,
},
});
return NextResponse.json({ ok: false, error: "Missing api key" }, { status: 401 });
}
rawBody = await req.json().catch(() => null);
const normalized = normalizeSnapshotV1(rawBody);
if (!normalized.ok) {
await prisma.ingestLog.create({
data: {
endpoint,
ok: false,
status: 400,
errorCode: "INVALID_PAYLOAD",
errorMsg: normalized.error,
body: rawBody,
ip,
userAgent,
},
});
return NextResponse.json({ ok: false, error: "Invalid payload", detail: normalized.error }, { status: 400 });
}
const body = normalized.value;
schemaVersion = body.schemaVersion;
machineId = body.machineId;
seq = parseSeqToBigInt(body.seq);
tsDeviceDate = new Date(body.tsDevice);
// Auth: machineId + apiKey must match
const machine = await prisma.machine.findFirst({
where: { id: String(body.machineId), apiKey },
where: { id: machineId, apiKey },
select: { id: true, orgId: true },
});
if (!machine) return NextResponse.json({ ok: false, error: "Unauthorized" }, { status: 401 });
if (!machine) {
await prisma.ingestLog.create({
data: {
endpoint,
ok: false,
status: 401,
errorCode: "UNAUTHORIZED",
errorMsg: "Unauthorized (machineId/apiKey mismatch)",
body: rawBody,
machineId,
schemaVersion,
seq,
tsDevice: tsDeviceDate,
ip,
userAgent,
},
});
return NextResponse.json({ ok: false, error: "Unauthorized" }, { status: 401 });
}
orgId = machine.orgId;
const wo = body.activeWorkOrder ?? {};
const k = body.kpis ?? {};
const safeCycleTime =
typeof body.cycleTime === "number" && body.cycleTime > 0
? body.cycleTime
: (typeof (wo as any).cycleTime === "number" && (wo as any).cycleTime > 0 ? (wo as any).cycleTime : null);
const safeCavities =
typeof body.cavities === "number" && body.cavities > 0
? body.cavities
: (typeof (wo as any).cavities === "number" && (wo as any).cavities > 0 ? (wo as any).cavities : null);
// Write snapshot (ts = tsDevice; tsServer auto)
const row = await prisma.machineKpiSnapshot.create({
data: {
orgId: machine.orgId,
orgId,
machineId: machine.id,
// Phase 0 meta
schemaVersion,
seq,
ts: tsDeviceDate, // store device-time in ts; server-time goes to ts_server
// Work order fields
workOrderId: wo.id ? String(wo.id) : null,
sku: wo.sku ? String(wo.sku) : null,
target: typeof wo.target === "number" ? Math.trunc(wo.target) : null,
good: typeof wo.good === "number" ? Math.trunc(wo.good) : null,
scrap: typeof wo.scrap === "number" ? Math.trunc(wo.scrap) : null,
target: typeof wo.target === "number" ? wo.target : null,
good: typeof wo.good === "number" ? wo.good : null,
scrap: typeof wo.scrap === "number" ? wo.scrap : null,
// Counters
cycleCount: typeof body.cycle_count === "number" ? body.cycle_count : null,
goodParts: typeof body.good_parts === "number" ? body.good_parts : null,
scrapParts: typeof body.scrap_parts === "number" ? body.scrap_parts : null,
cavities: safeCavities,
cycleTime: typeof body.cycleTime === "number" ? body.cycleTime : null,
// Cycle times
cycleTime: safeCycleTime,
actualCycle: typeof body.actualCycleTime === "number" ? body.actualCycleTime : null,
// KPIs (0..100)
availability: typeof k.availability === "number" ? k.availability : null,
performance: typeof k.performance === "number" ? k.performance : null,
quality: typeof k.quality === "number" ? k.quality : null,
@@ -46,5 +152,66 @@ export async function POST(req: Request) {
},
});
return NextResponse.json({ ok: true, id: row.id, ts: row.ts });
// Optional but useful: update machine "last seen" meta fields
await prisma.machine.update({
where: { id: machine.id },
data: {
schemaVersion,
seq,
tsDevice: tsDeviceDate,
tsServer: new Date(),
},
});
await prisma.ingestLog.create({
data: {
orgId,
machineId: machine.id,
endpoint,
ok: true,
status: 200,
schemaVersion,
seq,
tsDevice: tsDeviceDate,
body: rawBody,
ip,
userAgent,
},
});
return NextResponse.json({
ok: true,
id: row.id,
tsDevice: row.ts,
tsServer: row.tsServer,
});
} catch (err: any) {
const msg = err?.message ? String(err.message) : "Unknown error";
// Never fail the request because logging failed
try {
await prisma.ingestLog.create({
data: {
orgId,
machineId,
endpoint,
ok: false,
status: 500,
errorCode: "SERVER_ERROR",
errorMsg: msg,
schemaVersion,
seq,
tsDevice: tsDeviceDate ?? undefined,
body: rawBody,
ip,
userAgent,
},
});
} catch {}
return NextResponse.json({ ok: false, error: "Server error", detail: msg }, { status: 500 });
} finally {
// (If later you add latency_ms to IngestLog, you can store Date.now() - startedAt here.)
void startedAt;
}
}

View File

@@ -0,0 +1,74 @@
# MIS Edge → Cloud Contract (v1.0)
All ingest payloads MUST include these top-level meta fields:
- schemaVersion: "1.0"
- machineId: UUID
- tsDevice: epoch milliseconds (number)
- seq: monotonic integer per machine (persisted across reboots)
## POST /api/ingest/heartbeat
{
"schemaVersion": "1.0",
"machineId": "uuid",
"tsDevice": 1766427568335,
"seq": 123,
"online": true,
"message": "NR heartbeat",
"ip": "192.168.18.33",
"fwVersion": "raspi-nodered-1.0"
}
## POST /api/ingest/kpi (snapshot)
{
"schemaVersion": "1.0",
"machineId": "uuid",
"tsDevice": 1766427568335,
"seq": 124,
"activeWorkOrder": { "id": "OT-10001", "sku": "YoguFrut", "target": 600000, "good": 312640, "scrap": 0 },
"cycle_count": 31264,
"good_parts": 312640,
"trackingEnabled": true,
"productionStarted": true,
"cycleTime": 14,
"kpis": { "oee": 100, "availability": 100, "performance": 100, "quality": 100 }
}
## POST /api/ingest/cycle
{
"schemaVersion": "1.0",
"machineId": "uuid",
"tsDevice": 1766427568335,
"seq": 125,
"cycle": {
"timestamp": 1766427568335,
"cycle_count": 31264,
"actual_cycle_time": 10.141,
"theoretical_cycle_time": 14,
"work_order_id": "OT-10001",
"sku": "YoguFrut",
"cavities": 10,
"good_delta": 10,
"scrap_total": 0
}
}
## POST /api/ingest/event
Edge MUST split arrays; cloud expects one event per request.
{
"schemaVersion": "1.0",
"machineId": "uuid",
"tsDevice": 1766427568335,
"seq": 126,
"event": {
"anomaly_type": "slow-cycle",
"severity": "warning",
"title": "Slow Cycle Detected",
"description": "Cycle took 23.6s",
"timestamp": 1766427568335,
"work_order_id": "OT-10001",
"cycle_count": 31265,
"data": {},
"kpi_snapshot": {}
}
}

210
lib/contracts/v1.ts Normal file
View File

@@ -0,0 +1,210 @@
// /home/mdares/mis-control-tower/lib/contracts/v1.ts
import { z } from "zod";
/**
* Phase 0: freeze schema version string now and never change it without bumping.
* If you later create v2, make a new file or new constant.
*/
export const SCHEMA_VERSION = "1.0";
// KPI scale is frozen as 0..100 (you confirmed)
const KPI_0_100 = z.number().min(0).max(100);
export const SnapshotV1 = z
.object({
schemaVersion: z.literal(SCHEMA_VERSION),
machineId: z.string().uuid(),
tsDevice: z.number().int().nonnegative(), // epoch ms
// IMPORTANT: seq should be sent as string if it can ever exceed JS safe int
seq: z.union([z.number().int().nonnegative(), z.string().regex(/^\d+$/)]),
// current shape (keep it flat so Node-RED changes are minimal)
activeWorkOrder: z
.object({
id: z.string(),
sku: z.string().optional(),
target: z.number().optional(),
good: z.number().optional(),
scrap: z.number().optional(),
})
.partial()
.optional(),
cycle_count: z.number().int().nonnegative().optional(),
good_parts: z.number().int().nonnegative().optional(),
scrap_parts: z.number().int().nonnegative().optional(),
cavities: z.number().int().positive().optional(),
cycleTime: z.number().nonnegative().optional(), // theoretical/target cycle time
actualCycleTime: z.number().nonnegative().optional(), // optional
trackingEnabled: z.boolean().optional(),
productionStarted: z.boolean().optional(),
kpis: z.object({
oee: KPI_0_100,
availability: KPI_0_100,
performance: KPI_0_100,
quality: KPI_0_100,
}),
})
.passthrough();
/**
* TEMPORARY: Accept your current legacy payload while Node-RED is not sending
* schemaVersion/tsDevice/seq yet. Remove this once edge is upgraded.
*/
const SnapshotLegacy = z
.object({
machineId: z.any(),
kpis: z.any(),
})
.passthrough();
export type SnapshotV1Type = z.infer<typeof SnapshotV1>;
export function normalizeSnapshotV1(raw: unknown): { ok: true; value: SnapshotV1Type } | { ok: false; error: string } {
const strict = SnapshotV1.safeParse(raw);
if (strict.success) return { ok: true, value: strict.data };
// Legacy fallback (temporary)
const legacy = SnapshotLegacy.safeParse(raw);
if (!legacy.success) {
return { ok: false, error: strict.error.message };
}
const b: any = legacy.data;
// Build a "best effort" SnapshotV1 so ingest works during transition.
// seq is intentionally set to "0" if missing (so you can still store);
// once Node-RED emits real seq, dedupe and ordering become reliable.
const migrated: any = {
schemaVersion: SCHEMA_VERSION,
machineId: String(b.machineId),
tsDevice: typeof b.tsDevice === "number" ? b.tsDevice : Date.now(),
seq: typeof b.seq === "number" || typeof b.seq === "string" ? b.seq : "0",
...b,
};
const recheck = SnapshotV1.safeParse(migrated);
if (!recheck.success) return { ok: false, error: recheck.error.message };
return { ok: true, value: recheck.data };
}
const HeartbeatV1 = z.object({
schemaVersion: z.literal(SCHEMA_VERSION),
machineId: z.string().uuid(),
tsDevice: z.number().int().nonnegative(),
seq: z.union([z.number().int().nonnegative(), z.string().regex(/^\d+$/)]),
// legacy shape you currently send: status/message/ip/fwVersion
status: z.string().optional(),
message: z.string().optional(),
ip: z.string().optional(),
fwVersion: z.string().optional(),
// new canonical boolean
online: z.boolean().optional(),
}).passthrough();
export function normalizeHeartbeatV1(raw: unknown) {
const strict = HeartbeatV1.safeParse(raw);
if (strict.success) return { ok: true as const, value: strict.data };
// legacy fallback: allow missing meta
const legacy = z.object({ machineId: z.any() }).passthrough().safeParse(raw);
if (!legacy.success) return { ok: false as const, error: strict.error.message };
const b: any = legacy.data;
const migrated: any = {
schemaVersion: SCHEMA_VERSION,
machineId: String(b.machineId),
tsDevice: typeof b.tsDevice === "number" ? b.tsDevice : Date.now(),
seq: typeof b.seq === "number" || typeof b.seq === "string" ? b.seq : "0",
...b,
};
const recheck = HeartbeatV1.safeParse(migrated);
if (!recheck.success) return { ok: false as const, error: recheck.error.message };
return { ok: true as const, value: recheck.data };
}
const CycleV1 = z.object({
schemaVersion: z.literal(SCHEMA_VERSION),
machineId: z.string().uuid(),
tsDevice: z.number().int().nonnegative(),
seq: z.union([z.number().int().nonnegative(), z.string().regex(/^\d+$/)]),
cycle: z.object({
timestamp: z.number().int().positive(),
cycle_count: z.number().int().nonnegative(),
actual_cycle_time: z.number(),
theoretical_cycle_time: z.number().optional(),
work_order_id: z.string(),
sku: z.string().optional(),
cavities: z.number().optional(),
good_delta: z.number().optional(),
scrap_total: z.number().optional(),
}).passthrough(),
}).passthrough();
export function normalizeCycleV1(raw: unknown) {
const strict = CycleV1.safeParse(raw);
if (strict.success) return { ok: true as const, value: strict.data };
// legacy fallback: { machineId, cycle }
const legacy = z.object({ machineId: z.any(), cycle: z.any() }).passthrough().safeParse(raw);
if (!legacy.success) return { ok: false as const, error: strict.error.message };
const b: any = legacy.data;
const tsDevice = typeof b.tsDevice === "number" ? b.tsDevice : (b.cycle?.timestamp ?? Date.now());
const seq = typeof b.seq === "number" || typeof b.seq === "string" ? b.seq : (b.cycle?.cycle_count ?? "0");
const migrated: any = { schemaVersion: SCHEMA_VERSION, machineId: String(b.machineId), tsDevice, seq, ...b };
const recheck = CycleV1.safeParse(migrated);
if (!recheck.success) return { ok: false as const, error: recheck.error.message };
return { ok: true as const, value: recheck.data };
}
const EventV1 = z.object({
schemaVersion: z.literal(SCHEMA_VERSION),
machineId: z.string().uuid(),
tsDevice: z.number().int().nonnegative(),
seq: z.union([z.number().int().nonnegative(), z.string().regex(/^\d+$/)]),
// IMPORTANT: event must be an object, not an array
event: z.object({
anomaly_type: z.string(),
severity: z.string(),
title: z.string(),
description: z.string().optional(),
timestamp: z.number().int().positive(),
work_order_id: z.string(),
cycle_count: z.number().optional(),
data: z.any().optional(),
kpi_snapshot: z.any().optional(),
}).passthrough(),
}).passthrough();
export function normalizeEventV1(raw: unknown) {
const strict = EventV1.safeParse(raw);
if (strict.success) return { ok: true as const, value: strict.data };
// legacy fallback: allow missing meta, but STILL reject arrays later
const legacy = z.object({ machineId: z.any(), event: z.any() }).passthrough().safeParse(raw);
if (!legacy.success) return { ok: false as const, error: strict.error.message };
const b: any = legacy.data;
const tsDevice = typeof b.tsDevice === "number" ? b.tsDevice : (b.event?.timestamp ?? Date.now());
const migrated: any = {
schemaVersion: SCHEMA_VERSION,
machineId: String(b.machineId),
tsDevice,
seq: typeof b.seq === "number" || typeof b.seq === "string" ? b.seq : "0",
...b,
};
const recheck = EventV1.safeParse(migrated);
if (!recheck.success) return { ok: false as const, error: recheck.error.message };
return { ok: true as const, value: recheck.data };
}

68
package-lock.json generated
View File

@@ -14,7 +14,8 @@
"next": "16.0.10",
"react": "19.2.1",
"react-dom": "19.2.1",
"recharts": "^3.6.0"
"recharts": "^3.6.0",
"zod": "^4.2.1"
},
"devDependencies": {
"@tailwindcss/postcss": "^4",
@@ -22,6 +23,7 @@
"@types/node": "^20",
"@types/react": "^19",
"@types/react-dom": "^19",
"dotenv-cli": "^11.0.0",
"eslint": "^9",
"eslint-config-next": "16.0.10",
"prisma": "^6.19.1",
@@ -2625,9 +2627,9 @@
"license": "MIT"
},
"node_modules/baseline-browser-mapping": {
"version": "2.9.7",
"resolved": "https://registry.npmjs.org/baseline-browser-mapping/-/baseline-browser-mapping-2.9.7.tgz",
"integrity": "sha512-k9xFKplee6KIio3IDbwj+uaCLpqzOwakOgmqzPezM0sFJlFKcg30vk2wOiAJtkTSfx0SSQDSe8q+mWA/fSH5Zg==",
"version": "2.9.9",
"resolved": "https://registry.npmjs.org/baseline-browser-mapping/-/baseline-browser-mapping-2.9.9.tgz",
"integrity": "sha512-V8fbOCSeOFvlDj7LLChUcqbZrdKD9RU/VR260piF1790vT0mfLSwGc/Qzxv3IqiTukOpNtItePa0HBpMAj7MDg==",
"dev": true,
"license": "Apache-2.0",
"bin": {
@@ -3255,6 +3257,51 @@
"url": "https://dotenvx.com"
}
},
"node_modules/dotenv-cli": {
"version": "11.0.0",
"resolved": "https://registry.npmjs.org/dotenv-cli/-/dotenv-cli-11.0.0.tgz",
"integrity": "sha512-r5pA8idbk7GFWuHEU7trSTflWcdBpQEK+Aw17UrSHjS6CReuhrrPcyC3zcQBPQvhArRHnBo/h6eLH1fkCvNlww==",
"dev": true,
"license": "MIT",
"dependencies": {
"cross-spawn": "^7.0.6",
"dotenv": "^17.1.0",
"dotenv-expand": "^12.0.0",
"minimist": "^1.2.6"
},
"bin": {
"dotenv": "cli.js"
}
},
"node_modules/dotenv-cli/node_modules/dotenv": {
"version": "17.2.3",
"resolved": "https://registry.npmjs.org/dotenv/-/dotenv-17.2.3.tgz",
"integrity": "sha512-JVUnt+DUIzu87TABbhPmNfVdBDt18BLOWjMUFJMSi/Qqg7NTYtabbvSNJGOJ7afbRuv9D/lngizHtP7QyLQ+9w==",
"dev": true,
"license": "BSD-2-Clause",
"engines": {
"node": ">=12"
},
"funding": {
"url": "https://dotenvx.com"
}
},
"node_modules/dotenv-expand": {
"version": "12.0.3",
"resolved": "https://registry.npmjs.org/dotenv-expand/-/dotenv-expand-12.0.3.tgz",
"integrity": "sha512-uc47g4b+4k/M/SeaW1y4OApx+mtLWl92l5LMPP0GNXctZqELk+YGgOPIIC5elYmUH4OuoK3JLhuRUYegeySiFA==",
"dev": true,
"license": "BSD-2-Clause",
"dependencies": {
"dotenv": "^16.4.5"
},
"engines": {
"node": ">=12"
},
"funding": {
"url": "https://dotenvx.com"
}
},
"node_modules/dunder-proto": {
"version": "1.0.1",
"resolved": "https://registry.npmjs.org/dunder-proto/-/dunder-proto-1.0.1.tgz",
@@ -7163,9 +7210,9 @@
}
},
"node_modules/update-browserslist-db": {
"version": "1.2.2",
"resolved": "https://registry.npmjs.org/update-browserslist-db/-/update-browserslist-db-1.2.2.tgz",
"integrity": "sha512-E85pfNzMQ9jpKkA7+TJAi4TJN+tBCuWh5rUcS/sv6cFi+1q9LYDwDI5dpUL0u/73EElyQ8d3TEaeW4sPedBqYA==",
"version": "1.2.3",
"resolved": "https://registry.npmjs.org/update-browserslist-db/-/update-browserslist-db-1.2.3.tgz",
"integrity": "sha512-Js0m9cx+qOgDxo0eMiFGEueWztz+d4+M3rGlmKPT+T4IS/jP4ylw3Nwpu6cpTTP8R1MAC1kF4VbdLt3ARf209w==",
"dev": true,
"funding": [
{
@@ -7370,10 +7417,9 @@
}
},
"node_modules/zod": {
"version": "4.2.0",
"resolved": "https://registry.npmjs.org/zod/-/zod-4.2.0.tgz",
"integrity": "sha512-Bd5fw9wlIhtqCCxotZgdTOMwGm1a0u75wARVEY9HMs1X17trvA/lMi4+MGK5EUfYkXVTbX8UDiDKW4OgzHVUZw==",
"dev": true,
"version": "4.2.1",
"resolved": "https://registry.npmjs.org/zod/-/zod-4.2.1.tgz",
"integrity": "sha512-0wZ1IRqGGhMP76gLqz8EyfBXKk0J2qo2+H3fi4mcUP/KtTocoX08nmIAHl1Z2kJIZbZee8KOpBCSNPRgauucjw==",
"license": "MIT",
"funding": {
"url": "https://github.com/sponsors/colinhacks"

View File

@@ -15,7 +15,8 @@
"next": "16.0.10",
"react": "19.2.1",
"react-dom": "19.2.1",
"recharts": "^3.6.0"
"recharts": "^3.6.0",
"zod": "^4.2.1"
},
"devDependencies": {
"@tailwindcss/postcss": "^4",
@@ -23,6 +24,7 @@
"@types/node": "^20",
"@types/react": "^19",
"@types/react-dom": "^19",
"dotenv-cli": "^11.0.0",
"eslint": "^9",
"eslint-config-next": "16.0.10",
"prisma": "^6.19.1",

View File

@@ -0,0 +1,27 @@
-- CreateTable
CREATE TABLE "MachineCycle" (
"id" TEXT NOT NULL,
"orgId" TEXT NOT NULL,
"machineId" TEXT NOT NULL,
"ts" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"cycleCount" INTEGER,
"actualCycleTime" DOUBLE PRECISION NOT NULL,
"theoreticalCycleTime" DOUBLE PRECISION,
"workOrderId" TEXT,
"sku" TEXT,
"cavities" INTEGER,
"goodDelta" INTEGER,
"scrapDelta" INTEGER,
"createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT "MachineCycle_pkey" PRIMARY KEY ("id")
);
-- CreateIndex
CREATE INDEX "MachineCycle_orgId_machineId_ts_idx" ON "MachineCycle"("orgId", "machineId", "ts");
-- CreateIndex
CREATE INDEX "MachineCycle_orgId_machineId_cycleCount_idx" ON "MachineCycle"("orgId", "machineId", "cycleCount");
-- AddForeignKey
ALTER TABLE "MachineCycle" ADD CONSTRAINT "MachineCycle_machineId_fkey" FOREIGN KEY ("machineId") REFERENCES "Machine"("id") ON DELETE RESTRICT ON UPDATE CASCADE;

View File

@@ -0,0 +1,55 @@
-- AlterTable
ALTER TABLE "Machine" ADD COLUMN "schema_version" TEXT,
ADD COLUMN "seq" BIGINT,
ADD COLUMN "ts" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
ADD COLUMN "ts_server" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP;
-- AlterTable
ALTER TABLE "MachineCycle" ADD COLUMN "schema_version" TEXT,
ADD COLUMN "seq" BIGINT,
ADD COLUMN "ts_server" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP;
-- AlterTable
ALTER TABLE "MachineEvent" ADD COLUMN "schema_version" TEXT,
ADD COLUMN "seq" BIGINT,
ADD COLUMN "ts_server" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP;
-- AlterTable
ALTER TABLE "MachineHeartbeat" ADD COLUMN "schema_version" TEXT,
ADD COLUMN "seq" BIGINT,
ADD COLUMN "ts_server" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP;
-- AlterTable
ALTER TABLE "MachineKpiSnapshot" ADD COLUMN "schema_version" TEXT,
ADD COLUMN "seq" BIGINT,
ADD COLUMN "ts_server" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP;
-- CreateTable
CREATE TABLE "IngestLog" (
"id" TEXT NOT NULL,
"orgId" TEXT,
"machineId" TEXT,
"endpoint" TEXT NOT NULL,
"schemaVersion" TEXT,
"seq" BIGINT,
"tsDevice" TIMESTAMP(3),
"tsServer" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"ok" BOOLEAN NOT NULL,
"status" INTEGER NOT NULL,
"errorCode" TEXT,
"errorMsg" TEXT,
"body" JSONB,
"ip" TEXT,
"userAgent" TEXT,
CONSTRAINT "IngestLog_pkey" PRIMARY KEY ("id")
);
-- CreateIndex
CREATE INDEX "IngestLog_endpoint_tsServer_idx" ON "IngestLog"("endpoint", "tsServer");
-- CreateIndex
CREATE INDEX "IngestLog_machineId_tsServer_idx" ON "IngestLog"("machineId", "tsServer");
-- CreateIndex
CREATE INDEX "IngestLog_machineId_seq_idx" ON "IngestLog"("machineId", "seq");

View File

@@ -76,6 +76,10 @@ model Machine {
location String?
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
tsDevice DateTime @default(now()) @map("ts")
tsServer DateTime @default(now()) @map("ts_server")
schemaVersion String? @map("schema_version")
seq BigInt? @map("seq")
org Org @relation(fields: [orgId], references: [id], onDelete: Cascade)
heartbeats MachineHeartbeat[]
@@ -93,6 +97,9 @@ model MachineHeartbeat {
orgId String
machineId String
ts DateTime @default(now())
tsServer DateTime @default(now()) @map("ts_server")
schemaVersion String? @map("schema_version")
seq BigInt? @map("seq")
status String
message String?
@@ -131,6 +138,9 @@ model MachineKpiSnapshot {
trackingEnabled Boolean?
productionStarted Boolean?
tsServer DateTime @default(now()) @map("ts_server")
schemaVersion String? @map("schema_version")
seq BigInt? @map("seq")
org Org @relation(fields: [orgId], references: [id], onDelete: Cascade)
machine Machine @relation(fields: [machineId], references: [id], onDelete: Cascade)
@@ -150,6 +160,9 @@ model MachineEvent {
requiresAck Boolean @default(false)
title String
description String?
tsServer DateTime @default(now()) @map("ts_server")
schemaVersion String? @map("schema_version")
seq BigInt? @map("seq")
// store the raw data blob so we don't lose fields
data Json?
@@ -179,6 +192,9 @@ model MachineCycle {
cavities Int?
goodDelta Int?
scrapDelta Int?
tsServer DateTime @default(now()) @map("ts_server")
schemaVersion String? @map("schema_version")
seq BigInt? @map("seq")
createdAt DateTime @default(now())
@@ -187,3 +203,27 @@ model MachineCycle {
@@index([orgId, machineId, cycleCount])
}
model IngestLog {
id String @id @default(uuid())
orgId String?
machineId String?
endpoint String
schemaVersion String?
seq BigInt?
tsDevice DateTime?
tsServer DateTime @default(now())
ok Boolean
status Int
errorCode String?
errorMsg String?
body Json?
ip String?
userAgent String?
@@index([endpoint, tsServer])
@@index([machineId, tsServer])
@@index([machineId, seq])
}