Initial commit, 90% there
This commit is contained in:
@@ -0,0 +1,151 @@
|
||||
// ============================================================
|
||||
// EVENT LOGGER
|
||||
// Deduplicates and logs anomaly events to database
|
||||
// ============================================================
|
||||
|
||||
const anomalies = msg.payload || [];
|
||||
|
||||
if (!Array.isArray(anomalies) || anomalies.length === 0) {
|
||||
return null;
|
||||
}
|
||||
|
||||
// Get or initialize active anomalies list
|
||||
let activeAnomalies = global.get("activeAnomalies") || [];
|
||||
const now = Date.now();
|
||||
const DEDUP_WINDOW = 5 * 60 * 1000; // 5 minutes
|
||||
|
||||
const dbInserts = [];
|
||||
const uiUpdates = [];
|
||||
|
||||
anomalies.forEach(anomaly => {
|
||||
// ============================================================
|
||||
// DEDUPLICATION LOGIC
|
||||
// Don't create new event if same type exists in last 5 minutes
|
||||
// ============================================================
|
||||
const existingIndex = activeAnomalies.findIndex(existing =>
|
||||
existing.anomaly_type === anomaly.anomaly_type &&
|
||||
existing.work_order_id === anomaly.work_order_id &&
|
||||
existing.status === 'active' &&
|
||||
(now - existing.timestamp) < DEDUP_WINDOW
|
||||
);
|
||||
|
||||
if (existingIndex !== -1) {
|
||||
// Update existing event
|
||||
const existing = activeAnomalies[existingIndex];
|
||||
existing.occurrence_count = (existing.occurrence_count || 1) + 1;
|
||||
existing.last_occurrence = now;
|
||||
|
||||
// Update in database
|
||||
const updateQuery = `UPDATE anomaly_events
|
||||
SET occurrence_count = ?, last_occurrence = ?
|
||||
WHERE event_id = ?`;
|
||||
|
||||
dbInserts.push({
|
||||
topic: updateQuery,
|
||||
payload: [existing.occurrence_count, existing.last_occurrence, existing.event_id]
|
||||
});
|
||||
|
||||
node.warn(`[EVENT LOGGER] Updated existing ${anomaly.anomaly_type} event (occurrence #${existing.occurrence_count})`);
|
||||
|
||||
} else if (anomaly.status === 'resolved') {
|
||||
// ============================================================
|
||||
// RESOLVE EVENT
|
||||
// ============================================================
|
||||
const resolveIndex = activeAnomalies.findIndex(existing =>
|
||||
existing.anomaly_type === anomaly.anomaly_type &&
|
||||
existing.work_order_id === anomaly.work_order_id &&
|
||||
existing.status === 'active'
|
||||
);
|
||||
|
||||
if (resolveIndex !== -1) {
|
||||
const existing = activeAnomalies[resolveIndex];
|
||||
existing.status = 'resolved';
|
||||
existing.resolved_at = anomaly.resolved_at || now;
|
||||
existing.auto_resolved = anomaly.auto_resolved || false;
|
||||
|
||||
// Update in database
|
||||
const resolveQuery = `UPDATE anomaly_events
|
||||
SET status = 'resolved', resolved_at = ?, auto_resolved = ?
|
||||
WHERE event_id = ?`;
|
||||
|
||||
dbInserts.push({
|
||||
topic: resolveQuery,
|
||||
payload: [existing.resolved_at, existing.auto_resolved, existing.event_id]
|
||||
});
|
||||
|
||||
// Remove from active list
|
||||
activeAnomalies.splice(resolveIndex, 1);
|
||||
|
||||
node.warn(`[EVENT LOGGER] Resolved ${anomaly.anomaly_type} event (auto: ${existing.auto_resolved})`);
|
||||
|
||||
uiUpdates.push({
|
||||
event_id: existing.event_id,
|
||||
status: 'resolved'
|
||||
});
|
||||
}
|
||||
|
||||
} else {
|
||||
// ============================================================
|
||||
// NEW EVENT
|
||||
// ============================================================
|
||||
const insertQuery = `INSERT INTO anomaly_events
|
||||
(timestamp, work_order_id, anomaly_type, severity, title, description,
|
||||
data_json, kpi_snapshot_json, status, cycle_count, occurrence_count, last_occurrence)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`;
|
||||
|
||||
const dataJson = JSON.stringify(anomaly.data || {});
|
||||
const kpiJson = JSON.stringify(anomaly.kpi_snapshot || {});
|
||||
|
||||
dbInserts.push({
|
||||
topic: insertQuery,
|
||||
payload: [
|
||||
anomaly.timestamp,
|
||||
anomaly.work_order_id,
|
||||
anomaly.anomaly_type,
|
||||
anomaly.severity,
|
||||
anomaly.title,
|
||||
anomaly.description,
|
||||
dataJson,
|
||||
kpiJson,
|
||||
'active',
|
||||
anomaly.cycle_count,
|
||||
1, // occurrence_count
|
||||
anomaly.timestamp // last_occurrence
|
||||
],
|
||||
_storeEventId: true, // Flag to get generated event_id
|
||||
_anomaly: anomaly // Keep reference for later
|
||||
});
|
||||
|
||||
node.warn(`[EVENT LOGGER] New ${anomaly.anomaly_type} event: ${anomaly.title}`);
|
||||
}
|
||||
});
|
||||
|
||||
// Save active anomalies to global context
|
||||
global.set("activeAnomalies", activeAnomalies);
|
||||
|
||||
// ============================================================
|
||||
// OUTPUT
|
||||
// ============================================================
|
||||
// Output 1: Database inserts (to mysql node)
|
||||
// Output 2: UI updates (to Home/Alerts tabs)
|
||||
|
||||
if (dbInserts.length > 0) {
|
||||
// Send each insert as a separate message
|
||||
return [dbInserts, {
|
||||
topic: "anomaly-ui-update",
|
||||
payload: {
|
||||
activeCount: activeAnomalies.length,
|
||||
activeAnomalies: activeAnomalies,
|
||||
updates: uiUpdates
|
||||
}
|
||||
}];
|
||||
} else {
|
||||
return [null, {
|
||||
topic: "anomaly-ui-update",
|
||||
payload: {
|
||||
activeCount: activeAnomalies.length,
|
||||
activeAnomalies: activeAnomalies,
|
||||
updates: uiUpdates
|
||||
}
|
||||
}];
|
||||
}
|
||||
Reference in New Issue
Block a user