Basic new MVP with control tower fully functional

This commit is contained in:
Marcelo Dares
2025-12-31 01:17:13 +00:00
parent a369a69978
commit 363c9fbf4f
11 changed files with 284 additions and 577 deletions

View File

@@ -1,11 +1,37 @@
import { NextResponse } from "next/server";
import { prisma } from "@/lib/prisma";
function unwrapEnvelope(raw: any) {
if (!raw || typeof raw !== "object") return raw;
const payload = raw.payload;
if (!payload || typeof payload !== "object") return raw;
const hasMeta =
raw.schemaVersion !== undefined ||
raw.machineId !== undefined ||
raw.tsMs !== undefined ||
raw.tsDevice !== undefined ||
raw.seq !== undefined ||
raw.type !== undefined;
if (!hasMeta) return raw;
return {
...payload,
machineId: raw.machineId ?? payload.machineId,
tsMs: raw.tsMs ?? payload.tsMs,
tsDevice: raw.tsDevice ?? payload.tsDevice,
schemaVersion: raw.schemaVersion ?? payload.schemaVersion,
seq: raw.seq ?? payload.seq,
};
}
export async function POST(req: Request) {
const apiKey = req.headers.get("x-api-key");
if (!apiKey) return NextResponse.json({ ok: false, error: "Missing api key" }, { status: 401 });
const body = await req.json().catch(() => null);
let body = await req.json().catch(() => null);
body = unwrapEnvelope(body);
if (!body?.machineId || !body?.cycle) {
return NextResponse.json({ ok: false, error: "Invalid payload" }, { status: 400 });
}
@@ -22,6 +48,8 @@ export async function POST(req: Request) {
(typeof c.timestamp === "number" && c.timestamp) ||
(typeof c.ts === "number" && c.ts) ||
(typeof c.event_timestamp === "number" && c.event_timestamp) ||
(typeof body.tsMs === "number" && body.tsMs) ||
(typeof body.tsDevice === "number" && body.tsDevice) ||
undefined;
const ts = tsMs ? new Date(tsMs) : new Date();
@@ -30,7 +58,7 @@ export async function POST(req: Request) {
data: {
orgId: machine.orgId,
machineId: machine.id,
ts,
ts,
cycleCount: typeof c.cycle_count === "number" ? c.cycle_count : null,
actualCycleTime: Number(c.actual_cycle_time),
theoreticalCycleTime: c.theoretical_cycle_time != null ? Number(c.theoretical_cycle_time) : null,

View File

@@ -1,22 +1,5 @@
import { NextResponse } from "next/server";
import { prisma } from "@/lib/prisma";
import { normalizeEventV1 } from "@/lib/contracts/v1";
function getClientIp(req: Request) {
const xf = req.headers.get("x-forwarded-for");
if (xf) return xf.split(",")[0]?.trim() || null;
return req.headers.get("x-real-ip") || null;
}
function parseSeqToBigInt(seq: unknown): bigint | null {
if (seq === null || seq === undefined) return null;
if (typeof seq === "number") {
if (!Number.isInteger(seq) || seq < 0) return null;
return BigInt(seq);
}
if (typeof seq === "string" && /^\d+$/.test(seq)) return BigInt(seq);
return null;
}
const normalizeType = (t: any) =>
String(t ?? "")
@@ -55,124 +38,73 @@ const MICROSTOP_SEC = 60;
const MACROSTOP_SEC = 300;
export async function POST(req: Request) {
const endpoint = "/api/ingest/event";
const ip = getClientIp(req);
const userAgent = req.headers.get("user-agent");
const apiKey = req.headers.get("x-api-key");
if (!apiKey) return NextResponse.json({ ok: false, error: "Missing api key" }, { status: 401 });
let rawBody: any = null;
let orgId: string | null = null;
let machineId: string | null = null;
let schemaVersion: string | null = null;
let seq: bigint | null = null;
let tsDeviceDate: Date | null = null;
let body: any = await req.json().catch(() => null);
try {
// 1) Auth header exists
const apiKey = req.headers.get("x-api-key");
if (!apiKey) {
await prisma.ingestLog.create({
data: {
endpoint,
ok: false,
status: 401,
errorCode: "MISSING_API_KEY",
errorMsg: "Missing api key",
ip,
userAgent,
},
});
return NextResponse.json({ ok: false, error: "Missing api key" }, { status: 401 });
// ✅ if Node-RED sent an array as the whole body, unwrap it
if (Array.isArray(body)) body = body[0];
// ✅ accept multiple common keys
const machineId = body?.machineId ?? body?.machine_id ?? body?.machine?.id;
let rawEvent =
body?.event ??
body?.events ??
body?.anomalies ??
body?.payload?.event ??
body?.payload?.events ??
body?.payload?.anomalies ??
body?.payload ??
body?.data; // sometimes "data"
if (rawEvent?.event && typeof rawEvent.event === "object") rawEvent = rawEvent.event;
if (Array.isArray(rawEvent?.events)) rawEvent = rawEvent.events;
if (!machineId || !rawEvent) {
return NextResponse.json(
{ ok: false, error: "Invalid payload", got: { hasMachineId: !!machineId, keys: Object.keys(body ?? {}) } },
{ status: 400 }
);
}
const machine = await prisma.machine.findFirst({
where: { id: String(machineId), apiKey },
select: { id: true, orgId: true },
});
if (!machine) return NextResponse.json({ ok: false, error: "Unauthorized" }, { status: 401 });
// ✅ normalize to array no matter what
const events = Array.isArray(rawEvent) ? rawEvent : [rawEvent];
const created: { id: string; ts: Date; eventType: string }[] = [];
const skipped: any[] = [];
for (const ev of events) {
if (!ev || typeof ev !== "object") {
skipped.push({ reason: "invalid_event_object" });
continue;
}
// 2) Parse JSON
rawBody = await req.json().catch(() => null);
// 3) Reject arrays at the contract boundary (Phase 0 rule)
// Edge MUST split arrays into one event per POST.
if (rawBody?.event && Array.isArray(rawBody.event)) {
await prisma.ingestLog.create({
data: {
endpoint,
ok: false,
status: 400,
errorCode: "EVENT_ARRAY_NOT_ALLOWED",
errorMsg: "Edge must split arrays; send one event per request.",
body: rawBody,
machineId: rawBody?.machineId ? String(rawBody.machineId) : null,
ip,
userAgent,
},
});
return NextResponse.json(
{ ok: false, error: "Invalid payload", detail: "event array not allowed; split on edge" },
{ status: 400 }
);
}
// 4) Normalize to v1 (legacy tolerated)
const normalized = normalizeEventV1(rawBody);
if (!normalized.ok) {
await prisma.ingestLog.create({
data: {
endpoint,
ok: false,
status: 400,
errorCode: "INVALID_PAYLOAD",
errorMsg: normalized.error,
body: rawBody,
ip,
userAgent,
},
});
return NextResponse.json({ ok: false, error: "Invalid payload", detail: normalized.error }, { status: 400 });
}
const body = normalized.value;
schemaVersion = body.schemaVersion;
machineId = body.machineId;
seq = parseSeqToBigInt(body.seq);
tsDeviceDate = new Date(body.tsDevice);
// 5) Authorize machineId + apiKey
const machine = await prisma.machine.findFirst({
where: { id: machineId, apiKey },
select: { id: true, orgId: true },
});
if (!machine) {
await prisma.ingestLog.create({
data: {
endpoint,
ok: false,
status: 401,
errorCode: "UNAUTHORIZED",
errorMsg: "Unauthorized (machineId/apiKey mismatch)",
body: rawBody,
machineId,
schemaVersion,
seq,
tsDevice: tsDeviceDate,
ip,
userAgent,
},
});
return NextResponse.json({ ok: false, error: "Unauthorized" }, { status: 401 });
}
orgId = machine.orgId;
// 6) Canonicalize + classify type (keep for now; later move to edge in A1)
const ev = body.event;
const rawType =
(ev as any).eventType ?? (ev as any).anomaly_type ?? (ev as any).topic ?? (body as any).topic ?? "";
const rawType = (ev as any).eventType ?? (ev as any).anomaly_type ?? (ev as any).topic ?? body.topic ?? "";
const typ0 = normalizeType(rawType);
const typ = CANON_TYPE[typ0] ?? typ0;
let finalType = typ;
// Determine timestamp
const tsMs =
(typeof (ev as any)?.timestamp === "number" && (ev as any).timestamp) ||
(typeof (ev as any)?.data?.timestamp === "number" && (ev as any).data.timestamp) ||
(typeof (ev as any)?.data?.event_timestamp === "number" && (ev as any).data.event_timestamp) ||
null;
const ts = tsMs ? new Date(tsMs) : new Date();
// Severity defaulting (do not skip on severity — store for audit)
let sev = String((ev as any).severity ?? "").trim().toLowerCase();
if (!sev) sev = "warning";
// Stop classification -> microstop/macrostop
let finalType = typ;
if (typ === "stop") {
const stopSec =
(typeof (ev as any)?.data?.stoppage_duration_seconds === "number" && (ev as any).data.stoppage_duration_seconds) ||
@@ -182,85 +114,36 @@ export async function POST(req: Request) {
if (stopSec != null) {
finalType = stopSec >= MACROSTOP_SEC ? "macrostop" : "microstop";
} else {
// missing duration -> conservative
finalType = "microstop";
}
}
if (!ALLOWED_TYPES.has(finalType)) {
await prisma.ingestLog.create({
data: {
orgId,
machineId: machine.id,
endpoint,
ok: false,
status: 400,
errorCode: "TYPE_NOT_ALLOWED",
errorMsg: `Event type not allowed: ${finalType}`,
schemaVersion,
seq,
tsDevice: tsDeviceDate,
body: rawBody,
ip,
userAgent,
},
});
return NextResponse.json(
{ ok: false, error: "Invalid event type", detail: finalType },
{ status: 400 }
);
skipped.push({ reason: "type_not_allowed", typ: finalType, sev });
continue;
}
// Determine severity
let sev = String((ev as any).severity ?? "").trim().toLowerCase();
if (!sev) sev = "warning";
const title =
String((ev as any).title ?? "").trim() ||
(finalType === "slow-cycle"
? "Slow Cycle Detected"
: finalType === "macrostop"
? "Macrostop Detected"
: finalType === "microstop"
? "Microstop Detected"
: "Event");
(finalType === "slow-cycle" ? "Slow Cycle Detected" :
finalType === "macrostop" ? "Macrostop Detected" :
finalType === "microstop" ? "Microstop Detected" :
"Event");
const description = (ev as any).description ? String((ev as any).description) : null;
// store full blob
// store full blob, ensure object
const rawData = (ev as any).data ?? ev;
const dataObj =
typeof rawData === "string"
? (() => {
try {
return JSON.parse(rawData);
} catch {
return { raw: rawData };
}
})()
: rawData;
const dataObj = typeof rawData === "string" ? (() => {
try { return JSON.parse(rawData); } catch { return { raw: rawData }; }
})() : rawData;
// Prefer work_order_id always
const workOrderId =
(ev as any)?.work_order_id ? String((ev as any).work_order_id)
: (ev as any)?.data?.work_order_id ? String((ev as any).data.work_order_id)
: null;
const sku =
(ev as any)?.sku ? String((ev as any).sku)
: (ev as any)?.data?.sku ? String((ev as any).data.sku)
: null;
// 7) Store event with Phase 0 meta
const row = await prisma.machineEvent.create({
data: {
orgId,
orgId: machine.orgId,
machineId: machine.id,
// Phase 0 meta
schemaVersion,
seq,
ts: tsDeviceDate,
ts,
topic: String((ev as any).topic ?? finalType),
eventType: finalType,
severity: sev,
@@ -268,69 +151,19 @@ export async function POST(req: Request) {
title,
description,
data: dataObj,
workOrderId,
sku,
workOrderId:
(ev as any)?.work_order_id ? String((ev as any).work_order_id)
: (ev as any)?.data?.work_order_id ? String((ev as any).data.work_order_id)
: null,
sku:
(ev as any)?.sku ? String((ev as any).sku)
: (ev as any)?.data?.sku ? String((ev as any).data.sku)
: null,
},
});
// Optional: update machine last seen
await prisma.machine.update({
where: { id: machine.id },
data: {
schemaVersion,
seq,
tsDevice: tsDeviceDate,
tsServer: new Date(),
},
});
// 8) Ingest log success
await prisma.ingestLog.create({
data: {
orgId,
machineId: machine.id,
endpoint,
ok: true,
status: 200,
schemaVersion,
seq,
tsDevice: tsDeviceDate,
body: rawBody,
ip,
userAgent,
},
});
return NextResponse.json({
ok: true,
createdCount: 1,
created: [{ id: row.id, ts: row.ts, eventType: row.eventType }],
skippedCount: 0,
skipped: [],
});
} catch (err: any) {
const msg = err?.message ? String(err.message) : "Unknown error";
try {
await prisma.ingestLog.create({
data: {
orgId,
machineId,
endpoint,
ok: false,
status: 500,
errorCode: "SERVER_ERROR",
errorMsg: msg,
schemaVersion,
seq,
tsDevice: tsDeviceDate ?? undefined,
body: rawBody,
ip,
userAgent,
},
});
} catch {}
return NextResponse.json({ ok: false, error: "Server error", detail: msg }, { status: 500 });
created.push({ id: row.id, ts: row.ts, eventType: row.eventType });
}
return NextResponse.json({ ok: true, createdCount: created.length, created, skippedCount: skipped.length, skipped });
}

View File

@@ -103,6 +103,8 @@ export async function POST(req: Request) {
orgId = machine.orgId;
const wo = body.activeWorkOrder ?? {};
const good = typeof wo.good === "number" ? wo.good : (typeof wo.goodParts === "number" ? wo.goodParts : null);
const scrap = typeof wo.scrap === "number" ? wo.scrap : (typeof wo.scrapParts === "number" ? wo.scrapParts : null)
const k = body.kpis ?? {};
const safeCycleTime =
typeof body.cycleTime === "number" && body.cycleTime > 0
@@ -128,8 +130,8 @@ export async function POST(req: Request) {
workOrderId: wo.id ? String(wo.id) : null,
sku: wo.sku ? String(wo.sku) : null,
target: typeof wo.target === "number" ? Math.trunc(wo.target) : null,
good: typeof wo.good === "number" ? Math.trunc(wo.good) : null,
scrap: typeof wo.scrap === "number" ? Math.trunc(wo.scrap) : null,
good: good != null ? Math.trunc(good) : null,
scrap: scrap != null ? Math.trunc(scrap) : null,
// Counters
cycleCount: typeof body.cycle_count === "number" ? body.cycle_count : null,

View File

@@ -238,11 +238,7 @@ const ALLOWED_TYPES = new Set([
const events = normalized
.filter((e) => ALLOWED_TYPES.has(e.eventType))
// keep slow-cycle even if severity is info, otherwise require warning/critical/error
.filter((e) =>
["slow-cycle", "microstop", "macrostop"].includes(e.eventType) ||
["warning", "critical", "error"].includes(e.severity)
)
// drop severity gating so recent info events appear
.slice(0, 30);