diff --git a/packages/backend/package.json b/packages/backend/package.json index 7ff83b54d..7d93ac6eb 100644 --- a/packages/backend/package.json +++ b/packages/backend/package.json @@ -103,6 +103,7 @@ "pg": "8.11.1", "private-ip": "2.3.4", "probe-image-size": "7.2.3", + "prom-client": "^15.1.0", "promise-limit": "2.7.0", "punycode": "2.3.0", "pureimage": "0.3.15", diff --git a/packages/backend/src/config/load.ts b/packages/backend/src/config/load.ts index 34b667226..050079211 100644 --- a/packages/backend/src/config/load.ts +++ b/packages/backend/src/config/load.ts @@ -72,6 +72,8 @@ export default function load() { if (config.wordMuteCache.ttlSeconds == null) throw new Error('Failed to parse config.wordMuteCache.ttl'); config.searchEngine = config.searchEngine ?? 'https://duckduckgo.com/?q='; + config.enableMetrics = config.enableMetrics ?? false; + config.enableObliterate = config.enableObliterate ?? false; mixin.version = meta.version; mixin.host = url.host; diff --git a/packages/backend/src/config/types.ts b/packages/backend/src/config/types.ts index bdfadc7f4..dbfeddc56 100644 --- a/packages/backend/src/config/types.ts +++ b/packages/backend/src/config/types.ts @@ -92,10 +92,15 @@ export type Source = { deliverJobConcurrency?: number; inboxJobConcurrency?: number; + obliterateJobConcurrency?: number; deliverJobPerSec?: number; inboxJobPerSec?: number; + obliterateJobPerSec?: number; deliverJobMaxAttempts?: number; inboxJobMaxAttempts?: number; + obliterateJobMaxAttempts?: number; + + enableObliterate: boolean; syslog: { host: string; @@ -152,6 +157,7 @@ export type Source = { s3ForcePathStyle?: boolean; }; summalyProxyUrl?: string; + enableMetrics?: boolean; }; /** diff --git a/packages/backend/src/metrics.ts b/packages/backend/src/metrics.ts new file mode 100644 index 000000000..bd9925c9d --- /dev/null +++ b/packages/backend/src/metrics.ts @@ -0,0 +1,135 @@ +import Router from "@koa/router"; +import { + collectDefaultMetrics, + register, + Gauge, + Counter, + CounterConfiguration, +} from "prom-client"; +import config from "./config/index.js"; +import { queues } from "./queue/queues.js"; +import cluster from "node:cluster"; +import Xev from "xev"; + +const xev = new Xev(); +const metricsMaster = cluster.worker?.id === 1; + +if (config.enableMetrics) { + if (metricsMaster) { + collectDefaultMetrics(); + + new Gauge({ + name: "iceshrimp_queue_jobs", + help: "Amount of jobs in the bull queues", + labelNames: ["queue", "status"] as const, + async collect() { + for (const queue of queues) { + const counts = await queue.getJobCounts(); + this.set({ queue: queue.name, status: "completed" }, counts.completed); + this.set({ queue: queue.name, status: "waiting" }, counts.waiting); + this.set({ queue: queue.name, status: "active" }, counts.active); + this.set({ queue: queue.name, status: "delayed" }, counts.delayed); + this.set({ queue: queue.name, status: "failed" }, counts.failed); + } + }, + }); + } +} + +if (metricsMaster) { + xev.on("registry-request", async () => { + try { + const metrics = await register.metrics(); + xev.emit("registry-response", { + contentType: register.contentType, + body: metrics + }); + } catch (error) { + xev.emit("registry-response", { error }); + } + }); +} + +export const handleMetrics: Router.Middleware = async (ctx) => { + try { + if (metricsMaster) { + ctx.set("content-type", register.contentType); + ctx.body = await register.metrics(); + } else { + const wait = new Promise((resolve, reject) => { + const timeout = setTimeout( + () => reject("Timeout while waiting for cluster master"), + 1000 * 60 + ); + xev.once("registry-response", (response) => { + clearTimeout(timeout); + if (response.error) reject(response.error); + ctx.set("content-type", response.contentType); + ctx.body = response.body; + resolve(); + }); + }); + xev.emit("registry-request"); + await wait; + } + } catch (err) { + ctx.res.statusCode = 500; + ctx.body = err; + } +}; + +const counter = (configuration: CounterConfiguration) => { + if (config.enableMetrics) { + if (metricsMaster) { + const counter = new Counter(configuration); + counter.reset(); // initialize internal hashmap + xev.on(`metrics-counter-${configuration.name}`, () => counter.inc()); + return () => counter.inc(); + } else { + return () => xev.emit(`metrics-counter-${configuration.name}`); + } + } else { + return () => { }; + } +}; + +export const tickOutbox = counter({ + name: "iceshrimp_outbox_total", + help: "Total AP outbox calls", +}); + +export const tickInbox = counter({ + name: "iceshrimp_inbox_total", + help: "Total AP inbox calls", +}); + +export const tickFetch = counter({ + name: "iceshrimp_fetch_total", + help: "Total AP fetch calls", +}); + +export const tickResolve = counter({ + name: "iceshrimp_resolve_total", + help: "Total AP resolve calls", +}); + +export const tickObliterate = counter({ + name: "iceshrimp_obliterate_total", + help: "Total obliterate jobs processed", + }); + + +export const tickBiteIncoming = counter({ + name: "iceshrimp_bite_remote_incoming_total", + help: "Total bites received from remote", +}); + +export const tickBiteOutgoing = counter({ + name: "iceshrimp_bite_remote_outgoing_total", + help: "Total bites sent to remote", +}); + +export const tickBiteLocal = counter ({ + name: "iceshrimp_bite_local_total", + help: "Total local bites" +}); diff --git a/packages/backend/src/migration/1705383368782-index-sharedInbox.ts b/packages/backend/src/migration/1705383368782-index-sharedInbox.ts new file mode 100644 index 000000000..bd66e1261 --- /dev/null +++ b/packages/backend/src/migration/1705383368782-index-sharedInbox.ts @@ -0,0 +1,13 @@ +import { MigrationInterface, QueryRunner } from "typeorm"; + +export class IndexSharedInbox1705383368782 implements MigrationInterface { + name = 'IndexSharedInbox1705383368782' + + public async up(queryRunner: QueryRunner): Promise { + await queryRunner.query(`CREATE INDEX "IDX_92d3de2fe0efd289de109b6627" ON "user" ("sharedInbox") `); + } + + public async down(queryRunner: QueryRunner): Promise { + await queryRunner.query(`DROP INDEX "public"."IDX_92d3de2fe0efd289de109b6627"`); + } +} diff --git a/packages/backend/src/models/entities/user.ts b/packages/backend/src/models/entities/user.ts index 92de37e3f..8234f0124 100644 --- a/packages/backend/src/models/entities/user.ts +++ b/packages/backend/src/models/entities/user.ts @@ -11,6 +11,7 @@ import { DriveFile } from "./drive-file.js"; @Entity() @Index(["usernameLower", "host"], { unique: true }) +@Index(["sharedInbox"]) export class User { @PrimaryColumn(id()) public id: string; diff --git a/packages/backend/src/queue/index.ts b/packages/backend/src/queue/index.ts index 787fa848f..0e998ed81 100644 --- a/packages/backend/src/queue/index.ts +++ b/packages/backend/src/queue/index.ts @@ -14,6 +14,7 @@ import processObjectStorage from "./processors/object-storage/index.js"; import processSystemQueue from "./processors/system/index.js"; import processWebhookDeliver from "./processors/webhook-deliver.js"; import processBackground from "./processors/background/index.js"; +import processObliterate from "./processors/obliterate.js"; import { endedPollNotification } from "./processors/ended-poll-notification.js"; import { queueLogger } from "./logger.js"; import { getJobInfo } from "./get-job-info.js"; @@ -26,8 +27,10 @@ import { endedPollNotificationQueue, webhookDeliverQueue, backgroundQueue, + obliterateQueue, } from "./queues.js"; import type { ThinUser } from "./types.js"; +import { updateInboxCache } from "@/services/note/delete.js"; function renderError(e: Error): any { return { @@ -43,6 +46,7 @@ const webhookLogger = queueLogger.createSubLogger("webhook"); const inboxLogger = queueLogger.createSubLogger("inbox"); const dbLogger = queueLogger.createSubLogger("db"); const objectStorageLogger = queueLogger.createSubLogger("objectStorage"); +const obliterateLogger = queueLogger.createSubLogger("obliterate"); systemQueue .on("waiting", (jobId) => systemLogger.debug(`waiting id=${jobId}`)) @@ -157,6 +161,30 @@ webhookDeliverQueue webhookLogger.warn(`stalled ${getJobInfo(job)} to=${job.data.to}`), ); +obliterateQueue + .on("waiting", (jobId) => obliterateLogger.debug(`waiting id=${jobId}`)) + .on("active", (job) => + obliterateLogger.debug( + `active ${getJobInfo(job, true)} to=${job.data.to}`, + ), + ) + .on("completed", (job, result) => + obliterateLogger.debug( + `completed(${result}) ${getJobInfo(job, true)} to=${job.data.to}`, + ), + ) + .on("failed", (job, err) => + obliterateLogger.warn( + `failed(${err}) ${getJobInfo(job)} to=${job.data.to}`, + ), + ) + .on("error", (job: any, err: Error) => + obliterateLogger.error(`error ${err}`, { job, e: renderError(err) }), + ) + .on("stalled", (job) => + obliterateLogger.warn(`stalled ${getJobInfo(job)} to=${job.data.to}`), + + export function deliver(user: ThinUser, content: unknown, to: string | null) { if (content == null) return null; if (to == null) return null; @@ -514,6 +542,30 @@ export function webhookDeliver( }); } +export function obliterate( + user: ThinUser, + content: unknown, + to: string, + ) { + const data = { + user: { + id: user.id, + }, + content, + to, + }; + + return obliterateQueue.add(data, { + attempts: config.obliterateJobMaxAttempts || 4, + timeout: 1 * 60 * 1000, // 1min + backoff: { + type: "apBackoff", + }, + removeOnComplete: true, + removeOnFail: true, + }); + } + export default function () { if (envOption.onlyServer) return; @@ -602,6 +654,11 @@ export default function () { ); processSystemQueue(systemQueue); + updateInboxCache(); + obliterateQueue.process( + config.obliterateJobConcurrency || 16, + processObliterate, + ); } export function destroy() { @@ -614,4 +671,9 @@ export function destroy() { inboxLogger.succ(`Cleaned ${jobs.length} ${status} jobs`); }); inboxQueue.clean(0, "delayed"); + + obliterateQueue.once("cleaned", (jobs, status) => { + obliterateLogger.succ(`Cleaned ${jobs.length} ${status} jobs`); + }); + obliterateQueue.clean(0, "delayed"); } diff --git a/packages/backend/src/queue/processors/deliver.ts b/packages/backend/src/queue/processors/deliver.ts index 74dcc3abf..f5d511ced 100644 --- a/packages/backend/src/queue/processors/deliver.ts +++ b/packages/backend/src/queue/processors/deliver.ts @@ -14,6 +14,7 @@ import { StatusError } from "@/misc/fetch.js"; import { shouldSkipInstance } from "@/misc/skipped-instances.js"; import type { DeliverJobData } from "@/queue/types.js"; import type Bull from "bull"; +import { tickOutbox } from "@/metrics.js"; import { patchText, shouldPatchText } from "@/remote/activitypub/renderer/note.js"; const logger = new Logger("deliver"); @@ -72,6 +73,8 @@ export default async (job: Bull.Job) => { federationChart.deliverd(i.host, true); })(); + tickOutbox(); + return "Success"; } catch (res) { // Update stats diff --git a/packages/backend/src/queue/processors/inbox.ts b/packages/backend/src/queue/processors/inbox.ts index 3c87727c4..e310c4760 100644 --- a/packages/backend/src/queue/processors/inbox.ts +++ b/packages/backend/src/queue/processors/inbox.ts @@ -23,6 +23,7 @@ import type { CacheableRemoteUser } from "@/models/entities/user.js"; import type { UserPublickey } from "@/models/entities/user-publickey.js"; import { shouldBlockInstance } from "@/misc/should-block-instance.js"; import { verifySignature } from "@/remote/activitypub/check-fetch.js"; +import { tickInbox } from "@/metrics.js"; const logger = new Logger("inbox"); @@ -217,6 +218,8 @@ export default async (job: Bull.Job): Promise => { } } + tickInbox(); + // アクティビティを処理 await perform(authUser.user, activity); return "ok"; diff --git a/packages/backend/src/queue/processors/obliterate.ts b/packages/backend/src/queue/processors/obliterate.ts new file mode 100644 index 000000000..fdcb83a81 --- /dev/null +++ b/packages/backend/src/queue/processors/obliterate.ts @@ -0,0 +1,60 @@ +import Bull from "bull"; +import { DeliverJobData } from "../types.js"; +import { tickObliterate } from "@/metrics.js"; +import { StatusError } from "@/misc/fetch.js"; +import request from "@/remote/activitypub/request.js"; +import { registerOrFetchInstanceDoc } from "@/services/register-or-fetch-instance-doc.js"; +import { DAY, HOUR } from "@/const.js"; +import { Instances } from "@/models/index.js"; + +export default async (job: Bull.Job) => { + const { host } = new URL(job.data.to); + try { + const notResponding = await Instances.createQueryBuilder("i") + .where("i.host = :host", { host }) + .andWhere("i.isNotResponding") + .andWhere("i.lastCommunicatedAt < :cutoff", { + cutoff: new Date(Date.now() - DAY * 14), + }) + .cache(HOUR) + .getCount(); + // what are these string returns anyways. what do they mean. why did bull implement it like + // this. do string returns mean anything? is the actual string itself important? would i + // achieve the same by returning your home address? many such questions + if (notResponding && (Math.random() > 0.01)) throw "skip"; + + await request(job.data.user, job.data.to, job.data.content, 3000); + + tickObliterate(); + + registerOrFetchInstanceDoc(host).then((i) => { + Instances.update(i.id, { + latestRequestSentAt: new Date(), + latestStatus: 200, + lastCommunicatedAt: new Date(), + isNotResponding: false, + }); + }); + + return "Success"; + } catch (res) { + registerOrFetchInstanceDoc(host).then((i) => { + Instances.update(i.id, { + latestRequestSentAt: new Date(), + latestStatus: + res instanceof StatusError ? res.statusCode : null, + isNotResponding: true, + }); + }); + + if (res instanceof StatusError) { + if (res.isClientError) { + return `${res.statusCode} ${res.statusMessage}`; + } + + throw new Error(`${res.statusCode} ${res.statusMessage}`); + } else { + throw res; + } + } +}; diff --git a/packages/backend/src/queue/queues.ts b/packages/backend/src/queue/queues.ts index 6b0eb2de4..a117da9ac 100644 --- a/packages/backend/src/queue/queues.ts +++ b/packages/backend/src/queue/queues.ts @@ -28,6 +28,10 @@ export const webhookDeliverQueue = initializeQueue( 64, ); export const backgroundQueue = initializeQueue>("bg"); +export const obliterateQueue = initializeQueue( + "obliterate", + config.obliterateJobPerSec || 16, + ); export const queues = [ systemQueue, @@ -38,4 +42,5 @@ export const queues = [ objectStorageQueue, webhookDeliverQueue, backgroundQueue, + obliterateQueue, ]; diff --git a/packages/backend/src/remote/activitypub/check-fetch.ts b/packages/backend/src/remote/activitypub/check-fetch.ts index f299a1624..cb8ec0b6f 100644 --- a/packages/backend/src/remote/activitypub/check-fetch.ts +++ b/packages/backend/src/remote/activitypub/check-fetch.ts @@ -12,6 +12,7 @@ import type { UserPublickey } from "@/models/entities/user-publickey.js"; import { verify } from "node:crypto"; import { toSingle } from "@/prelude/array.js"; import { createHash } from "node:crypto"; +import { tickFetch } from "@/metrics.js"; export async function hasSignature(req: IncomingMessage): Promise { const meta = await fetchMeta(); @@ -120,7 +121,12 @@ export async function checkFetch(req: IncomingMessage): Promise<{ status: number return { status: 403 }; } - return verifySignature(signature, authUser.key) ? 200 : 401; + if (!verifySignature(signature, authUser.key)) { + return 401; + } + + tickFetch(); + return 200; } return { status: 200 }; } diff --git a/packages/backend/src/remote/activitypub/deliver-manager.ts b/packages/backend/src/remote/activitypub/deliver-manager.ts index 7f0215e3b..5625cd76a 100644 --- a/packages/backend/src/remote/activitypub/deliver-manager.ts +++ b/packages/backend/src/remote/activitypub/deliver-manager.ts @@ -76,7 +76,7 @@ export default class DeliverManager { /** * Execute delivers */ - public async execute() { + public async execute(): Promise> { if (!Users.isLocalUser(this.actor)) return; const inboxes = new Set(); @@ -155,6 +155,8 @@ export default class DeliverManager { deliver(this.actor, this.activity, inbox); } + + return inboxes; } } @@ -167,10 +169,10 @@ export default class DeliverManager { export async function deliverToFollowers( actor: { id: ILocalUser["id"]; host: null }, activity: any, -) { +): Promise> { const manager = new DeliverManager(actor, activity); manager.addFollowersRecipe(); - await manager.execute(); + return await manager.execute(); } /** @@ -182,9 +184,9 @@ export async function deliverToUser( actor: { id: ILocalUser["id"]; host: null }, activity: any, to: IRemoteUser, -) { +): Promise> { const manager = new DeliverManager(actor, activity); manager.addDirectRecipe(to); - await manager.execute(); + return await manager.execute(); } //#endregion diff --git a/packages/backend/src/remote/activitypub/kernel/bite.ts b/packages/backend/src/remote/activitypub/kernel/bite.ts index ecce5cc7e..157d588ae 100644 --- a/packages/backend/src/remote/activitypub/kernel/bite.ts +++ b/packages/backend/src/remote/activitypub/kernel/bite.ts @@ -6,6 +6,7 @@ import config from "@/config/index.js"; import { genId } from "@/misc/gen-id.js"; import { createBite } from "@/services/create-bite.js"; import { Bite } from "@/models/entities/bite.js"; +import { tickBiteIncoming } from "@/metrics.js"; export default async ( actor: CacheableRemoteUser, @@ -75,5 +76,7 @@ export default async ( bite.published ? new Date(bite.published) : null, ); + tickBiteIncoming(); + return "ok"; }; diff --git a/packages/backend/src/remote/activitypub/request.ts b/packages/backend/src/remote/activitypub/request.ts index e4aa190d3..83d23405b 100644 --- a/packages/backend/src/remote/activitypub/request.ts +++ b/packages/backend/src/remote/activitypub/request.ts @@ -5,7 +5,7 @@ import { getResponse } from "../../misc/fetch.js"; import { createSignedPost, createSignedGet } from "./ap-request.js"; import { apLogger } from "@/remote/activitypub/logger.js"; -export default async (user: { id: User["id"] }, url: string, object: any) => { +export default async (user: { id: User["id"] }, url: string, object: any, timeout = undefined) => { const body = JSON.stringify(object); const keypair = await getUserKeypair(user.id); @@ -27,6 +27,7 @@ export default async (user: { id: User["id"] }, url: string, object: any) => { method: req.request.method, headers: req.request.headers, body, + timeout, }); }; diff --git a/packages/backend/src/remote/activitypub/resolver.ts b/packages/backend/src/remote/activitypub/resolver.ts index 8baa2958a..a0b47778e 100644 --- a/packages/backend/src/remote/activitypub/resolver.ts +++ b/packages/backend/src/remote/activitypub/resolver.ts @@ -27,6 +27,7 @@ import { shouldBlockInstance } from "@/misc/should-block-instance.js"; import { apLogger } from "@/remote/activitypub/logger.js"; import { In, IsNull, Not } from "typeorm"; import renderBite from "@/remote/activitypub/renderer/bite.js"; +import { tickResolve } from "@/metrics.js"; export default class Resolver { private history: Set; @@ -127,7 +128,10 @@ export default class Resolver { const {res, object} = await this.doFetch(value); if (object.id == null) throw new Error("Object has no ID"); - if (res.finalUrl === object.id) return object; + if (res.finalUrl === object.id) { + tickResolve(); + return object; + } if (new URL(res.finalUrl).host !== new URL(object.id).host) throw new Error("Object ID host doesn't match final url host"); @@ -137,6 +141,8 @@ export default class Resolver { if (finalRes.finalUrl !== finalObject.id) throw new Error("Object ID still doesn't match final URL after second fetch attempt") + tickResolve(); + return finalObject; } diff --git a/packages/backend/src/server/activitypub.ts b/packages/backend/src/server/activitypub.ts index 347660a70..3545b95c7 100644 --- a/packages/backend/src/server/activitypub.ts +++ b/packages/backend/src/server/activitypub.ts @@ -37,6 +37,7 @@ import { serverLogger } from "./index.js"; import config from "@/config/index.js"; import Koa from "koa"; import renderBite from "@/remote/activitypub/renderer/bite.js"; +import { tickFetch } from "@/metrics.js"; // Init router const router = new Router(); @@ -225,6 +226,7 @@ router.get("/users/:user/collections/featured", Featured); router.get("/users/:user/publickey", async (ctx) => { const instanceActor = await getInstanceActor(); if (ctx.params.user === instanceActor.id) { + tickFetch(); ctx.body = renderActivity( renderKey(instanceActor, await getUserKeypair(instanceActor.id)), ); @@ -289,6 +291,7 @@ router.get("/users/:user", async (ctx, next) => { const instanceActor = await getInstanceActor(); if (ctx.params.user === instanceActor.id) { + tickFetch(); await userInfo(ctx, instanceActor); return; } @@ -313,6 +316,7 @@ router.get("/@:user", async (ctx, next) => { if (!isActivityPubReq(ctx)) return await next(); if (ctx.params.user === "instance.actor") { + tickFetch(); const instanceActor = await getInstanceActor(); await userInfo(ctx, instanceActor); return; @@ -334,6 +338,7 @@ router.get("/@:user", async (ctx, next) => { }); router.get("/actor", async (ctx, next) => { + tickFetch(); const instanceActor = await getInstanceActor(); await userInfo(ctx, instanceActor); }); diff --git a/packages/backend/src/server/index.ts b/packages/backend/src/server/index.ts index 929efd73b..3acce660f 100644 --- a/packages/backend/src/server/index.ts +++ b/packages/backend/src/server/index.ts @@ -33,6 +33,7 @@ import removeTrailingSlash from "koa-remove-trailing-slashes"; import { koaBody } from "koa-body"; import { setupEndpointsAuthRoot } from "@/server/api/mastodon/endpoints/auth.js"; import { CatchErrorsMiddleware } from "@/server/api/mastodon/middleware/catch-errors.js"; +import { handleMetrics } from "@/metrics.js"; export const serverLogger = new Logger("server", "gray", false); // Init app @@ -117,6 +118,10 @@ router.get("/identicon/:x", async (ctx) => { } }); +if (config.enableMetrics) { + router.get("/metrics", handleMetrics); + } + mastoRouter.use( koaBody({ urlencoded: true, diff --git a/packages/backend/src/services/create-bite.ts b/packages/backend/src/services/create-bite.ts index dea96e20d..1c0f3b08c 100644 --- a/packages/backend/src/services/create-bite.ts +++ b/packages/backend/src/services/create-bite.ts @@ -6,6 +6,7 @@ import { renderActivity } from "@/remote/activitypub/renderer/index.js"; import renderBite from "@/remote/activitypub/renderer/bite.js"; import { deliverToUser } from "@/remote/activitypub/deliver-manager.js"; import { createNotification } from "./create-notification.js"; +import { tickBiteLocal, tickBiteOutgoing } from "@/metrics.js"; export async function createBite( sender: User, @@ -61,6 +62,8 @@ export async function createBite( renderActivity(await renderBite(bite)), deliverTarget, ); + + tickBiteOutgoing(); } if (Users.isLocalUser(deliverTarget)) { @@ -68,6 +71,8 @@ export async function createBite( notifierId: sender.id, biteId: bite.id, }); + + if (Users.isLocalUser(sender)) tickBiteLocal(); } return id; diff --git a/packages/backend/src/services/note/delete.ts b/packages/backend/src/services/note/delete.ts index a3c2fad12..cc765183c 100644 --- a/packages/backend/src/services/note/delete.ts +++ b/packages/backend/src/services/note/delete.ts @@ -21,6 +21,9 @@ import { import { countSameRenotes } from "@/misc/count-same-renotes.js"; import { registerOrFetchInstanceDoc } from "../register-or-fetch-instance-doc.js"; import { deliverToRelays } from "../relay.js"; +import { HOUR } from "@/const.js"; +import { Mutex } from "async-mutex"; +import { obliterate } from "@/queue/index.js"; /** * 投稿を削除します。 @@ -174,15 +177,62 @@ async function getMentionedRemoteUsers(note: Note) { })) as IRemoteUser[]; } +let inboxCache: string[] = []; +let inboxCacheExpiration = 0; +const inboxCacheUpdate = new Mutex(); + +export async function updateInboxCache() { + // we only want one instance of the update running at a time + const release = await inboxCacheUpdate.acquire(); + try { + // the cache might have been updated while we were + // acquiring the mutex so we check again + if (inboxCacheExpiration < new Date().getTime()) { + const result = await Users.query( + 'SELECT DISTINCT ON (u."sharedInbox") u."sharedInbox" FROM \ + "user" as u INNER JOIN "instance" as i ON i."host" = u."host" \ + WHERE u."sharedInbox" IS NOT NULL AND i."isNotResponding" = false', + ); + inboxCache = result.map((user: any) => user.sharedInbox as string); + inboxCacheExpiration = new Date().getTime() + HOUR; + } + } finally { + release(); + } +} + + async function deliverToConcerned( user: { id: ILocalUser["id"]; host: null }, note: Note, content: any, ) { - deliverToFollowers(user, content); deliverToRelays(user, content); const remoteUsers = await getMentionedRemoteUsers(note); - for (const remoteUser of remoteUsers) { - deliverToUser(user, content, remoteUser); + const sets = await Promise.all([ + deliverToFollowers(user, content), + ...remoteUsers.map((remoteUser) => + deliverToUser(user, content, remoteUser), + ), + ]); + const alreadyDelivered = new Set(); + for (const set of sets) { + for (const inbox of set) { + alreadyDelivered.add(inbox); + } + } + + if (config.enableObliterate) { + // sacrificate note to the void goddess \o/ + if (inboxCacheExpiration < new Date().getTime()) { + await updateInboxCache(); + } + + const toObliterate = inboxCache.filter( + (inbox) => !alreadyDelivered.has(inbox), + ); + for (const inbox of toObliterate) { + obliterate(user, content, inbox); + } } } diff --git a/yarn.lock b/yarn.lock index 2574f9f44..6f14ef389 100644 --- a/yarn.lock +++ b/yarn.lock @@ -1936,6 +1936,12 @@ __metadata: checksum: 10/673c11518dba2e582e42415cbefe928513616f3af25e12f6e4e6b1b98b52b3e6c14bc251a361654af63cd64f208f22a1f7556fa49da2bf7efcf28cb14f16f807 languageName: node linkType: hard +"@opentelemetry/api@npm:^1.4.0": + version: 1.7.0 + resolution: "@opentelemetry/api@npm:1.7.0" + checksum: 10/bcf7afa7051dcd4583898a68f8a57fb4c85b5cedddf7b6eb3616595c0b3bcd7f5448143b8355b00935a755de004d6285489f8e132f34127efe7b1be404622a3e + languageName: node + linkType: hard "@paralleldrive/cuid2@npm:^2.2.2": version: 2.2.2 @@ -5379,6 +5385,7 @@ __metadata: pg: "npm:8.11.1" private-ip: "npm:2.3.4" probe-image-size: "npm:7.2.3" + prom-client: "npm:^15.1.0" promise-limit: "npm:2.7.0" pug: "npm:3.0.2" punycode: "npm:2.3.0" @@ -5540,6 +5547,13 @@ __metadata: languageName: node linkType: hard +"bintrees@npm:1.0.2": + version: 1.0.2 + resolution: "bintrees@npm:1.0.2" + checksum: 10/071896cea5ea5413316c8436e95799444c208630d5c539edd8a7089fc272fc5d3634aa4a2e4847b28350dda1796162e14a34a0eda53108cc5b3c2ff6a036c1fa + languageName: node + linkType: hard + "bl@npm:^1.0.0": version: 1.2.3 resolution: "bl@npm:1.2.3" @@ -16926,6 +16940,16 @@ __metadata: languageName: node linkType: hard +"prom-client@npm:^15.1.0": + version: 15.1.0 + resolution: "prom-client@npm:15.1.0" + dependencies: + "@opentelemetry/api": "npm:^1.4.0" + tdigest: "npm:^0.1.1" + checksum: 10/ecb6f40de755ca9cc6dde758d195ed3e1d3b47a341d2092af8c18dbf7e6ef1079c8b8bb02496f2f430cf8bd9d391c1ea5bebbb85cdda95f67dad2dbfb90509aa + languageName: node + linkType: hard + "promise-limit@npm:2.7.0": version: 2.7.0 resolution: "promise-limit@npm:2.7.0" @@ -19493,6 +19517,15 @@ __metadata: languageName: node linkType: hard +"tdigest@npm:^0.1.1": + version: 0.1.2 + resolution: "tdigest@npm:0.1.2" + dependencies: + bintrees: "npm:1.0.2" + checksum: 10/45be99fa52dab74b8edafe150e473cdc45aa1352c75ed516a39905f350a08c3175f6555598111042c3677ba042d7e3cae6b5ce4c663fe609bc634f326aabc9d6 + languageName: node + linkType: hard + "terminal-link@npm:^2.0.0": version: 2.1.1 resolution: "terminal-link@npm:2.1.1"