Macrostop and timeline segmentation
This commit is contained in:
124
lib/mqtt.ts
Normal file
124
lib/mqtt.ts
Normal file
@@ -0,0 +1,124 @@
|
||||
import "server-only";
|
||||
|
||||
import mqtt, { MqttClient } from "mqtt";
|
||||
|
||||
type SettingsUpdate = {
|
||||
orgId: string;
|
||||
version: number;
|
||||
source?: string;
|
||||
updatedAt?: string;
|
||||
machineId?: string;
|
||||
overridesUpdatedAt?: string;
|
||||
};
|
||||
|
||||
type WorkOrdersUpdate = {
|
||||
orgId: string;
|
||||
machineId: string;
|
||||
count?: number;
|
||||
source?: string;
|
||||
updatedAt?: string;
|
||||
};
|
||||
|
||||
const MQTT_URL = process.env.MQTT_BROKER_URL || "";
|
||||
const MQTT_USERNAME = process.env.MQTT_USERNAME;
|
||||
const MQTT_PASSWORD = process.env.MQTT_PASSWORD;
|
||||
const MQTT_CLIENT_ID = process.env.MQTT_CLIENT_ID;
|
||||
const MQTT_TOPIC_PREFIX = (process.env.MQTT_TOPIC_PREFIX || "mis").replace(/\/+$/, "");
|
||||
const MQTT_QOS_RAW = Number(process.env.MQTT_QOS ?? "2");
|
||||
const MQTT_QOS = MQTT_QOS_RAW === 0 || MQTT_QOS_RAW === 1 || MQTT_QOS_RAW === 2 ? MQTT_QOS_RAW : 2;
|
||||
|
||||
let client: MqttClient | null = null;
|
||||
let connecting: Promise<MqttClient> | null = null;
|
||||
|
||||
function buildSettingsTopic(orgId: string, machineId?: string) {
|
||||
const base = `${MQTT_TOPIC_PREFIX}/org/${orgId}`;
|
||||
if (machineId) return `${base}/machines/${machineId}/settings/updated`;
|
||||
return `${base}/settings/updated`;
|
||||
}
|
||||
|
||||
function buildWorkOrdersTopic(orgId: string, machineId: string) {
|
||||
const base = `${MQTT_TOPIC_PREFIX}/org/${orgId}`;
|
||||
return `${base}/machines/${machineId}/work_orders/updated`;
|
||||
}
|
||||
|
||||
async function getClient() {
|
||||
if (!MQTT_URL) return null;
|
||||
if (client?.connected) return client;
|
||||
if (connecting) return connecting;
|
||||
|
||||
connecting = new Promise((resolve, reject) => {
|
||||
const next = mqtt.connect(MQTT_URL, {
|
||||
clientId: MQTT_CLIENT_ID,
|
||||
username: MQTT_USERNAME,
|
||||
password: MQTT_PASSWORD,
|
||||
clean: true,
|
||||
reconnectPeriod: 5000,
|
||||
});
|
||||
|
||||
next.once("connect", () => {
|
||||
client = next;
|
||||
connecting = null;
|
||||
resolve(next);
|
||||
});
|
||||
|
||||
next.once("error", (err) => {
|
||||
next.end(true);
|
||||
client = null;
|
||||
connecting = null;
|
||||
reject(err);
|
||||
});
|
||||
|
||||
next.once("close", () => {
|
||||
client = null;
|
||||
});
|
||||
});
|
||||
|
||||
return connecting;
|
||||
}
|
||||
|
||||
export async function publishSettingsUpdate(update: SettingsUpdate) {
|
||||
if (!MQTT_URL) return { ok: false, reason: "MQTT_NOT_CONFIGURED" as const };
|
||||
const mqttClient = await getClient();
|
||||
if (!mqttClient) return { ok: false, reason: "MQTT_NOT_CONFIGURED" as const };
|
||||
|
||||
const topic = buildSettingsTopic(update.orgId, update.machineId);
|
||||
const payload = JSON.stringify({
|
||||
type: update.machineId ? "machine_settings_updated" : "org_settings_updated",
|
||||
orgId: update.orgId,
|
||||
machineId: update.machineId,
|
||||
version: update.version,
|
||||
source: update.source || "control_tower",
|
||||
updatedAt: update.updatedAt,
|
||||
overridesUpdatedAt: update.overridesUpdatedAt,
|
||||
});
|
||||
|
||||
return new Promise<{ ok: true }>((resolve, reject) => {
|
||||
mqttClient.publish(topic, payload, { qos: MQTT_QOS }, (err) => {
|
||||
if (err) return reject(err);
|
||||
resolve({ ok: true });
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
export async function publishWorkOrdersUpdate(update: WorkOrdersUpdate) {
|
||||
if (!MQTT_URL) return { ok: false, reason: "MQTT_NOT_CONFIGURED" as const };
|
||||
const mqttClient = await getClient();
|
||||
if (!mqttClient) return { ok: false, reason: "MQTT_NOT_CONFIGURED" as const };
|
||||
|
||||
const topic = buildWorkOrdersTopic(update.orgId, update.machineId);
|
||||
const payload = JSON.stringify({
|
||||
type: "work_orders_updated",
|
||||
orgId: update.orgId,
|
||||
machineId: update.machineId,
|
||||
count: update.count ?? null,
|
||||
source: update.source || "control_tower",
|
||||
updatedAt: update.updatedAt,
|
||||
});
|
||||
|
||||
return new Promise<{ ok: true }>((resolve, reject) => {
|
||||
mqttClient.publish(topic, payload, { qos: MQTT_QOS }, (err) => {
|
||||
if (err) return reject(err);
|
||||
resolve({ ok: true });
|
||||
});
|
||||
});
|
||||
}
|
||||
Reference in New Issue
Block a user