initial push
This commit is contained in:
392
src/lib/licitations/sync.ts
Normal file
392
src/lib/licitations/sync.ts
Normal file
@@ -0,0 +1,392 @@
|
||||
import "server-only";
|
||||
|
||||
import { LicitationSource, Prisma, SyncRunStatus } from "@prisma/client";
|
||||
import { licitationsConfig } from "@/lib/licitations/config";
|
||||
import { fetchMunicipalBackupLicitations, fetchMunicipalOpenLicitations } from "@/lib/licitations/connectors/municipal";
|
||||
import { fetchPntLicitations } from "@/lib/licitations/connectors/pnt";
|
||||
import { buildStableRecordId, sanitizeDocuments } from "@/lib/licitations/normalize";
|
||||
import { prisma } from "@/lib/prisma";
|
||||
import type { ConnectorResult, MunicipalityConnectorInput, SyncStats } from "@/lib/licitations/types";
|
||||
import { delay } from "@/lib/licitations/utils";
|
||||
|
||||
const MUNICIPALITY_SELECT = {
|
||||
id: true,
|
||||
stateCode: true,
|
||||
stateName: true,
|
||||
municipalityCode: true,
|
||||
municipalityName: true,
|
||||
openPortalUrl: true,
|
||||
openPortalType: true,
|
||||
openSyncIntervalDays: true,
|
||||
lastOpenSyncAt: true,
|
||||
pntSubjectId: true,
|
||||
pntEntityId: true,
|
||||
pntSectorId: true,
|
||||
pntEntryUrl: true,
|
||||
backupUrl: true,
|
||||
scrapingEnabled: true,
|
||||
isActive: true,
|
||||
} satisfies Prisma.MunicipalitySelect;
|
||||
|
||||
function createEmptyStats(): SyncStats {
|
||||
return {
|
||||
totalFetched: 0,
|
||||
inserted: 0,
|
||||
updated: 0,
|
||||
skipped: 0,
|
||||
errors: 0,
|
||||
latestPublishDate: null,
|
||||
};
|
||||
}
|
||||
|
||||
function toNullableJson(value: Record<string, unknown> | null | undefined) {
|
||||
return value ? (value as Prisma.InputJsonValue) : Prisma.DbNull;
|
||||
}
|
||||
|
||||
function getLatestPublishDate(items: { publishDate?: Date | null }[]) {
|
||||
let latest: Date | null = null;
|
||||
|
||||
for (const item of items) {
|
||||
if (!item.publishDate) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!latest || item.publishDate > latest) {
|
||||
latest = item.publishDate;
|
||||
}
|
||||
}
|
||||
|
||||
return latest;
|
||||
}
|
||||
|
||||
function inferOpenFlag(source: LicitationSource, item: { isOpen?: boolean; closingDate?: Date | null }) {
|
||||
if (typeof item.isOpen === "boolean") {
|
||||
return item.isOpen;
|
||||
}
|
||||
|
||||
if (source !== LicitationSource.MUNICIPAL_OPEN_PORTAL) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!item.closingDate) {
|
||||
return true;
|
||||
}
|
||||
|
||||
return item.closingDate >= new Date();
|
||||
}
|
||||
|
||||
async function upsertConnectorItems(municipality: MunicipalityConnectorInput, connectorResult: ConnectorResult): Promise<SyncStats> {
|
||||
const stats = createEmptyStats();
|
||||
stats.totalFetched = connectorResult.items.length;
|
||||
|
||||
const latestDate = getLatestPublishDate(connectorResult.items);
|
||||
stats.latestPublishDate = latestDate ? latestDate.toISOString() : null;
|
||||
|
||||
for (const item of connectorResult.items) {
|
||||
try {
|
||||
const title = item.title?.trim();
|
||||
|
||||
if (!title) {
|
||||
stats.skipped += 1;
|
||||
continue;
|
||||
}
|
||||
|
||||
const sourceRecordId = (item.sourceRecordId?.trim() || buildStableRecordId(municipality, item)).slice(0, 255);
|
||||
const existing = await prisma.licitation.findUnique({
|
||||
where: {
|
||||
municipalityId_source_sourceRecordId: {
|
||||
municipalityId: municipality.id,
|
||||
source: connectorResult.source,
|
||||
sourceRecordId,
|
||||
},
|
||||
},
|
||||
select: { id: true },
|
||||
});
|
||||
|
||||
await prisma.licitation.upsert({
|
||||
where: {
|
||||
municipalityId_source_sourceRecordId: {
|
||||
municipalityId: municipality.id,
|
||||
source: connectorResult.source,
|
||||
sourceRecordId,
|
||||
},
|
||||
},
|
||||
update: {
|
||||
tenderCode: item.tenderCode ?? null,
|
||||
procedureType: item.procedureType,
|
||||
title,
|
||||
description: item.description ?? null,
|
||||
category: item.category ?? undefined,
|
||||
isOpen: inferOpenFlag(connectorResult.source, item),
|
||||
openingDate: item.openingDate ?? null,
|
||||
closingDate: item.closingDate ?? null,
|
||||
publishDate: item.publishDate ?? null,
|
||||
eventDates: toNullableJson(item.eventDates ?? null),
|
||||
amount: item.amount ?? null,
|
||||
currency: item.currency ?? null,
|
||||
status: item.status ?? null,
|
||||
supplierAwarded: item.supplierAwarded ?? null,
|
||||
documents: sanitizeDocuments(item.documents) as Prisma.InputJsonValue,
|
||||
rawSourceUrl: item.rawSourceUrl ?? connectorResult.rawSourceUrl ?? null,
|
||||
rawPayload: item.rawPayload as Prisma.InputJsonValue,
|
||||
lastSeenAt: new Date(),
|
||||
},
|
||||
create: {
|
||||
municipalityId: municipality.id,
|
||||
source: connectorResult.source,
|
||||
sourceRecordId,
|
||||
tenderCode: item.tenderCode ?? null,
|
||||
procedureType: item.procedureType,
|
||||
title,
|
||||
description: item.description ?? null,
|
||||
category: item.category ?? undefined,
|
||||
isOpen: inferOpenFlag(connectorResult.source, item),
|
||||
openingDate: item.openingDate ?? null,
|
||||
closingDate: item.closingDate ?? null,
|
||||
publishDate: item.publishDate ?? null,
|
||||
eventDates: toNullableJson(item.eventDates ?? null),
|
||||
amount: item.amount ?? null,
|
||||
currency: item.currency ?? null,
|
||||
status: item.status ?? null,
|
||||
supplierAwarded: item.supplierAwarded ?? null,
|
||||
documents: sanitizeDocuments(item.documents) as Prisma.InputJsonValue,
|
||||
rawSourceUrl: item.rawSourceUrl ?? connectorResult.rawSourceUrl ?? null,
|
||||
rawPayload: item.rawPayload as Prisma.InputJsonValue,
|
||||
lastSeenAt: new Date(),
|
||||
},
|
||||
});
|
||||
|
||||
if (existing) {
|
||||
stats.updated += 1;
|
||||
} else {
|
||||
stats.inserted += 1;
|
||||
}
|
||||
} catch {
|
||||
stats.errors += 1;
|
||||
}
|
||||
}
|
||||
|
||||
return stats;
|
||||
}
|
||||
|
||||
function resolveSyncStatus(stats: SyncStats): SyncRunStatus {
|
||||
if (stats.errors === 0) {
|
||||
return SyncRunStatus.SUCCESS;
|
||||
}
|
||||
|
||||
if (stats.inserted > 0 || stats.updated > 0) {
|
||||
return SyncRunStatus.PARTIAL;
|
||||
}
|
||||
|
||||
return SyncRunStatus.FAILED;
|
||||
}
|
||||
|
||||
async function runConnectorSync(municipality: MunicipalityConnectorInput, source: LicitationSource, connector: () => Promise<ConnectorResult>) {
|
||||
const syncRun = await prisma.syncRun.create({
|
||||
data: {
|
||||
municipalityId: municipality.id,
|
||||
source,
|
||||
status: SyncRunStatus.SUCCESS,
|
||||
},
|
||||
});
|
||||
|
||||
try {
|
||||
const connectorResult = await connector();
|
||||
const stats = await upsertConnectorItems(municipality, connectorResult);
|
||||
const status = resolveSyncStatus(stats);
|
||||
const error = status === SyncRunStatus.FAILED ? "No se pudieron persistir registros de licitaciones." : null;
|
||||
|
||||
await prisma.syncRun.update({
|
||||
where: { id: syncRun.id },
|
||||
data: {
|
||||
finishedAt: new Date(),
|
||||
status,
|
||||
stats: {
|
||||
...stats,
|
||||
warnings: connectorResult.warnings,
|
||||
},
|
||||
error,
|
||||
},
|
||||
});
|
||||
|
||||
if (source === LicitationSource.MUNICIPAL_OPEN_PORTAL && status !== SyncRunStatus.FAILED) {
|
||||
await prisma.municipality.update({
|
||||
where: { id: municipality.id },
|
||||
data: {
|
||||
lastOpenSyncAt: new Date(),
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
return {
|
||||
source,
|
||||
status,
|
||||
stats,
|
||||
warnings: connectorResult.warnings,
|
||||
};
|
||||
} catch (error) {
|
||||
const message = error instanceof Error ? error.message : "Error desconocido en sync.";
|
||||
|
||||
await prisma.syncRun.update({
|
||||
where: { id: syncRun.id },
|
||||
data: {
|
||||
finishedAt: new Date(),
|
||||
status: SyncRunStatus.FAILED,
|
||||
stats: createEmptyStats(),
|
||||
error: message,
|
||||
},
|
||||
});
|
||||
|
||||
return {
|
||||
source,
|
||||
status: SyncRunStatus.FAILED,
|
||||
stats: createEmptyStats(),
|
||||
warnings: [message],
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
async function shouldRunOpenSourceThisCycle(municipality: MunicipalityConnectorInput, force = false) {
|
||||
if (force) {
|
||||
return true;
|
||||
}
|
||||
|
||||
const intervalDays = municipality.openSyncIntervalDays > 0 ? municipality.openSyncIntervalDays : 7;
|
||||
const now = new Date();
|
||||
|
||||
if (municipality.lastOpenSyncAt) {
|
||||
const nextRunAt = new Date(municipality.lastOpenSyncAt);
|
||||
nextRunAt.setUTCDate(nextRunAt.getUTCDate() + intervalDays);
|
||||
|
||||
if (nextRunAt > now) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
const latestRun = await prisma.syncRun.findFirst({
|
||||
where: {
|
||||
municipalityId: municipality.id,
|
||||
source: LicitationSource.MUNICIPAL_OPEN_PORTAL,
|
||||
status: {
|
||||
in: [SyncRunStatus.SUCCESS, SyncRunStatus.PARTIAL],
|
||||
},
|
||||
finishedAt: {
|
||||
not: null,
|
||||
},
|
||||
},
|
||||
orderBy: {
|
||||
finishedAt: "desc",
|
||||
},
|
||||
select: {
|
||||
finishedAt: true,
|
||||
},
|
||||
});
|
||||
|
||||
if (!latestRun?.finishedAt) {
|
||||
return true;
|
||||
}
|
||||
|
||||
const nextRunAt = new Date(latestRun.finishedAt);
|
||||
nextRunAt.setUTCDate(nextRunAt.getUTCDate() + intervalDays);
|
||||
|
||||
return nextRunAt <= now;
|
||||
}
|
||||
|
||||
export async function runMunicipalityLicitationsSync(
|
||||
municipality: MunicipalityConnectorInput,
|
||||
options?: {
|
||||
targetYear?: number;
|
||||
includePnt?: boolean;
|
||||
force?: boolean;
|
||||
},
|
||||
) {
|
||||
const sourceResults: Array<{ source: LicitationSource; status: SyncRunStatus; stats: SyncStats; warnings: string[] }> = [];
|
||||
|
||||
if (!licitationSyncEnabledForMunicipality(municipality)) {
|
||||
return {
|
||||
municipality,
|
||||
skipped: true,
|
||||
reason: "Scraping deshabilitado para municipio o entorno.",
|
||||
sourceResults,
|
||||
};
|
||||
}
|
||||
|
||||
let openItemsFetched = 0;
|
||||
|
||||
if (municipality.openPortalUrl && (await shouldRunOpenSourceThisCycle(municipality, options?.force))) {
|
||||
const openResult = await runConnectorSync(municipality, LicitationSource.MUNICIPAL_OPEN_PORTAL, () =>
|
||||
fetchMunicipalOpenLicitations(municipality, {
|
||||
targetYear: options?.targetYear,
|
||||
}),
|
||||
);
|
||||
|
||||
sourceResults.push(openResult);
|
||||
openItemsFetched = openResult.stats.totalFetched;
|
||||
await delay(licitationsConfig.requestDelayMs);
|
||||
}
|
||||
|
||||
if (openItemsFetched === 0 && municipality.backupUrl) {
|
||||
const backup = await runConnectorSync(municipality, LicitationSource.MUNICIPAL_BACKUP, () => fetchMunicipalBackupLicitations(municipality));
|
||||
sourceResults.push(backup);
|
||||
await delay(licitationsConfig.requestDelayMs);
|
||||
}
|
||||
|
||||
if (options?.includePnt && (municipality.pntSubjectId || municipality.pntEntryUrl)) {
|
||||
const pnt = await runConnectorSync(municipality, LicitationSource.PNT, () =>
|
||||
fetchPntLicitations(municipality, {
|
||||
targetYear: options?.targetYear,
|
||||
}),
|
||||
);
|
||||
sourceResults.push(pnt);
|
||||
}
|
||||
|
||||
return {
|
||||
municipality,
|
||||
skipped: false,
|
||||
reason: null,
|
||||
sourceResults,
|
||||
};
|
||||
}
|
||||
|
||||
function licitationSyncEnabledForMunicipality(municipality: MunicipalityConnectorInput) {
|
||||
return licitationsConfig.scrapingEnabledByDefault && municipality.scrapingEnabled;
|
||||
}
|
||||
|
||||
export async function runDailyLicitationsSync(options?: {
|
||||
municipalityId?: string;
|
||||
limit?: number;
|
||||
skip?: number;
|
||||
targetYear?: number;
|
||||
includePnt?: boolean;
|
||||
force?: boolean;
|
||||
}) {
|
||||
const municipalities = await prisma.municipality.findMany({
|
||||
where: {
|
||||
isActive: true,
|
||||
...(options?.municipalityId ? { id: options.municipalityId } : {}),
|
||||
},
|
||||
orderBy: [{ stateCode: "asc" }, { municipalityName: "asc" }],
|
||||
take: options?.limit ?? licitationsConfig.maxMunicipalitiesPerRun,
|
||||
skip: options?.skip ?? 0,
|
||||
select: MUNICIPALITY_SELECT,
|
||||
});
|
||||
|
||||
const results = [] as Awaited<ReturnType<typeof runMunicipalityLicitationsSync>>[];
|
||||
|
||||
for (const municipality of municipalities) {
|
||||
const result = await runMunicipalityLicitationsSync(municipality, {
|
||||
targetYear: options?.targetYear,
|
||||
includePnt: options?.includePnt,
|
||||
force: options?.force,
|
||||
});
|
||||
|
||||
results.push(result);
|
||||
await delay(licitationsConfig.requestDelayMs);
|
||||
}
|
||||
|
||||
return {
|
||||
processedMunicipalities: municipalities.length,
|
||||
skippedMunicipalities: options?.skip ?? 0,
|
||||
results,
|
||||
};
|
||||
}
|
||||
Reference in New Issue
Block a user