From 0d5220e5056ec188130c957376dff76fa40c8d35 Mon Sep 17 00:00:00 2001 From: Laura Hausmann Date: Tue, 23 Jul 2024 20:38:30 +0200 Subject: [PATCH] [backend] Discard jobs with null/undefined/empty data objects; add no-op handlers for invalid queue jobs This stops corrupted/invalid jobs from clogging up the queue. Ref: https://github.com/OptimalBits/bull/issues/2461 --- packages/backend/src/queue/processors/background/index.ts | 3 +++ packages/backend/src/queue/processors/db/index.ts | 3 +++ packages/backend/src/queue/processors/deliver.ts | 1 + .../backend/src/queue/processors/ended-poll-notification.ts | 4 ++++ packages/backend/src/queue/processors/inbox.ts | 1 + packages/backend/src/queue/processors/noop.ts | 5 +++++ .../backend/src/queue/processors/object-storage/index.ts | 3 +++ packages/backend/src/queue/processors/system/index.ts | 3 +++ packages/backend/src/queue/processors/webhook-deliver.ts | 1 + 9 files changed, 24 insertions(+) create mode 100644 packages/backend/src/queue/processors/noop.ts diff --git a/packages/backend/src/queue/processors/background/index.ts b/packages/backend/src/queue/processors/background/index.ts index 38b58680c..ac3d1d32e 100644 --- a/packages/backend/src/queue/processors/background/index.ts +++ b/packages/backend/src/queue/processors/background/index.ts @@ -1,4 +1,5 @@ import type Bull from "bull"; +import { noop } from "@/queue/processors/noop.js"; const jobs = {} as Record>>; @@ -6,4 +7,6 @@ export default function (q: Bull.Queue) { for (const [k, v] of Object.entries(jobs)) { q.process(k, 16, v); } + + q.process(noop); } diff --git a/packages/backend/src/queue/processors/db/index.ts b/packages/backend/src/queue/processors/db/index.ts index d20fc2c71..9100d458f 100644 --- a/packages/backend/src/queue/processors/db/index.ts +++ b/packages/backend/src/queue/processors/db/index.ts @@ -16,6 +16,7 @@ import { importMastoPost } from "./import-masto-post.js"; import { importCkPost } from "./import-firefish-post.js"; import { importBlocking } from "./import-blocking.js"; import { importCustomEmojis } from "./import-custom-emojis.js"; +import { noop } from "@/queue/processors/noop.js"; const jobs = { deleteDriveFiles, @@ -44,4 +45,6 @@ export default function (dbQueue: Bull.Queue) { for (const [k, v] of Object.entries(jobs)) { dbQueue.process(k, v); } + + dbQueue.process(noop); } diff --git a/packages/backend/src/queue/processors/deliver.ts b/packages/backend/src/queue/processors/deliver.ts index 65471a559..946281c89 100644 --- a/packages/backend/src/queue/processors/deliver.ts +++ b/packages/backend/src/queue/processors/deliver.ts @@ -20,6 +20,7 @@ const logger = new Logger("deliver"); let latest: string | null = null; export default async (job: Bull.Job) => { + if (job.data == null || Object.keys(job.data).length === 0) return "Skip (data was null or empty)"; const { host } = new URL(job.data.to); const puny = toPuny(host); diff --git a/packages/backend/src/queue/processors/ended-poll-notification.ts b/packages/backend/src/queue/processors/ended-poll-notification.ts index c652d6216..cbdc67440 100644 --- a/packages/backend/src/queue/processors/ended-poll-notification.ts +++ b/packages/backend/src/queue/processors/ended-poll-notification.ts @@ -11,6 +11,10 @@ export async function endedPollNotification( job: Bull.Job, done: any, ): Promise { + if (job.data == null || Object.keys(job.data).length === 0) { + done(); + return; + } const note = await Notes.findOneBy({ id: job.data.noteId }); if (note == null || !note.hasPoll) { done(); diff --git a/packages/backend/src/queue/processors/inbox.ts b/packages/backend/src/queue/processors/inbox.ts index 3c87727c4..2fe4bf653 100644 --- a/packages/backend/src/queue/processors/inbox.ts +++ b/packages/backend/src/queue/processors/inbox.ts @@ -28,6 +28,7 @@ const logger = new Logger("inbox"); // Processing when an activity arrives in the user's inbox export default async (job: Bull.Job): Promise => { + if (job.data == null || Object.keys(job.data).length === 0) return "Skip (data was null or empty)"; const signature = job.data.signature; // HTTP-signature let activity = job.data.activity; diff --git a/packages/backend/src/queue/processors/noop.ts b/packages/backend/src/queue/processors/noop.ts new file mode 100644 index 000000000..5be01d1da --- /dev/null +++ b/packages/backend/src/queue/processors/noop.ts @@ -0,0 +1,5 @@ +import Bull from "bull"; + +// Processor to be registered for jobs with __default__ (unnamed) handlers in queues that only have named handlers +// Prevents sporadic bogus jobs from clogging up the queues +export async function noop(_: Bull.Job): Promise { } diff --git a/packages/backend/src/queue/processors/object-storage/index.ts b/packages/backend/src/queue/processors/object-storage/index.ts index 5f90d4cd0..1c24f5244 100644 --- a/packages/backend/src/queue/processors/object-storage/index.ts +++ b/packages/backend/src/queue/processors/object-storage/index.ts @@ -2,6 +2,7 @@ import type Bull from "bull"; import type { ObjectStorageJobData } from "@/queue/types.js"; import deleteFile from "./delete-file.js"; import cleanRemoteFiles from "./clean-remote-files.js"; +import { noop } from "@/queue/processors/noop.js"; const jobs = { deleteFile, @@ -16,4 +17,6 @@ export default function (q: Bull.Queue) { for (const [k, v] of Object.entries(jobs)) { q.process(k, 16, v); } + + q.process(noop); } diff --git a/packages/backend/src/queue/processors/system/index.ts b/packages/backend/src/queue/processors/system/index.ts index 697d24d06..2ff2c636e 100644 --- a/packages/backend/src/queue/processors/system/index.ts +++ b/packages/backend/src/queue/processors/system/index.ts @@ -6,6 +6,7 @@ import { checkExpiredMutings } from "./check-expired-mutings.js"; import { clean } from "./clean.js"; import { setLocalEmojiSizes } from "./local-emoji-size.js"; import { verifyLinks } from "./verify-links.js"; +import { noop } from "@/queue/processors/noop.js"; const jobs = { tickCharts, @@ -25,4 +26,6 @@ export default function (dbQueue: Bull.Queue>) { for (const [k, v] of Object.entries(jobs)) { dbQueue.process(k, v); } + + dbQueue.process(noop); } diff --git a/packages/backend/src/queue/processors/webhook-deliver.ts b/packages/backend/src/queue/processors/webhook-deliver.ts index 904291da2..84397033e 100644 --- a/packages/backend/src/queue/processors/webhook-deliver.ts +++ b/packages/backend/src/queue/processors/webhook-deliver.ts @@ -9,6 +9,7 @@ import config from "@/config/index.js"; const logger = new Logger("webhook"); export default async (job: Bull.Job) => { + if (job.data == null || Object.keys(job.data).length === 0) return "Skip (data was null or empty)"; try { logger.debug(`delivering ${job.data.webhookId}`);