withdrawal metrics and obliterate

This commit is contained in:
limepotato 2024-07-01 17:56:13 -06:00
parent 2c98459dc6
commit 8e26dd6bc3
21 changed files with 418 additions and 11 deletions

View file

@ -103,6 +103,7 @@
"pg": "8.11.1", "pg": "8.11.1",
"private-ip": "2.3.4", "private-ip": "2.3.4",
"probe-image-size": "7.2.3", "probe-image-size": "7.2.3",
"prom-client": "^15.1.0",
"promise-limit": "2.7.0", "promise-limit": "2.7.0",
"punycode": "2.3.0", "punycode": "2.3.0",
"pureimage": "0.3.15", "pureimage": "0.3.15",

View file

@ -72,6 +72,8 @@ export default function load() {
if (config.wordMuteCache.ttlSeconds == null) throw new Error('Failed to parse config.wordMuteCache.ttl'); if (config.wordMuteCache.ttlSeconds == null) throw new Error('Failed to parse config.wordMuteCache.ttl');
config.searchEngine = config.searchEngine ?? 'https://duckduckgo.com/?q='; config.searchEngine = config.searchEngine ?? 'https://duckduckgo.com/?q=';
config.enableMetrics = config.enableMetrics ?? false;
config.enableObliterate = config.enableObliterate ?? false;
mixin.version = meta.version; mixin.version = meta.version;
mixin.host = url.host; mixin.host = url.host;

View file

@ -92,10 +92,15 @@ export type Source = {
deliverJobConcurrency?: number; deliverJobConcurrency?: number;
inboxJobConcurrency?: number; inboxJobConcurrency?: number;
obliterateJobConcurrency?: number;
deliverJobPerSec?: number; deliverJobPerSec?: number;
inboxJobPerSec?: number; inboxJobPerSec?: number;
obliterateJobPerSec?: number;
deliverJobMaxAttempts?: number; deliverJobMaxAttempts?: number;
inboxJobMaxAttempts?: number; inboxJobMaxAttempts?: number;
obliterateJobMaxAttempts?: number;
enableObliterate: boolean;
syslog: { syslog: {
host: string; host: string;
@ -152,6 +157,7 @@ export type Source = {
s3ForcePathStyle?: boolean; s3ForcePathStyle?: boolean;
}; };
summalyProxyUrl?: string; summalyProxyUrl?: string;
enableMetrics?: boolean;
}; };
/** /**

View file

@ -0,0 +1,135 @@
import Router from "@koa/router";
import {
collectDefaultMetrics,
register,
Gauge,
Counter,
CounterConfiguration,
} from "prom-client";
import config from "./config/index.js";
import { queues } from "./queue/queues.js";
import cluster from "node:cluster";
import Xev from "xev";
const xev = new Xev();
const metricsMaster = cluster.worker?.id === 1;
if (config.enableMetrics) {
if (metricsMaster) {
collectDefaultMetrics();
new Gauge({
name: "iceshrimp_queue_jobs",
help: "Amount of jobs in the bull queues",
labelNames: ["queue", "status"] as const,
async collect() {
for (const queue of queues) {
const counts = await queue.getJobCounts();
this.set({ queue: queue.name, status: "completed" }, counts.completed);
this.set({ queue: queue.name, status: "waiting" }, counts.waiting);
this.set({ queue: queue.name, status: "active" }, counts.active);
this.set({ queue: queue.name, status: "delayed" }, counts.delayed);
this.set({ queue: queue.name, status: "failed" }, counts.failed);
}
},
});
}
}
if (metricsMaster) {
xev.on("registry-request", async () => {
try {
const metrics = await register.metrics();
xev.emit("registry-response", {
contentType: register.contentType,
body: metrics
});
} catch (error) {
xev.emit("registry-response", { error });
}
});
}
export const handleMetrics: Router.Middleware = async (ctx) => {
try {
if (metricsMaster) {
ctx.set("content-type", register.contentType);
ctx.body = await register.metrics();
} else {
const wait = new Promise<void>((resolve, reject) => {
const timeout = setTimeout(
() => reject("Timeout while waiting for cluster master"),
1000 * 60
);
xev.once("registry-response", (response) => {
clearTimeout(timeout);
if (response.error) reject(response.error);
ctx.set("content-type", response.contentType);
ctx.body = response.body;
resolve();
});
});
xev.emit("registry-request");
await wait;
}
} catch (err) {
ctx.res.statusCode = 500;
ctx.body = err;
}
};
const counter = (configuration: CounterConfiguration<string>) => {
if (config.enableMetrics) {
if (metricsMaster) {
const counter = new Counter(configuration);
counter.reset(); // initialize internal hashmap
xev.on(`metrics-counter-${configuration.name}`, () => counter.inc());
return () => counter.inc();
} else {
return () => xev.emit(`metrics-counter-${configuration.name}`);
}
} else {
return () => { };
}
};
export const tickOutbox = counter({
name: "iceshrimp_outbox_total",
help: "Total AP outbox calls",
});
export const tickInbox = counter({
name: "iceshrimp_inbox_total",
help: "Total AP inbox calls",
});
export const tickFetch = counter({
name: "iceshrimp_fetch_total",
help: "Total AP fetch calls",
});
export const tickResolve = counter({
name: "iceshrimp_resolve_total",
help: "Total AP resolve calls",
});
export const tickObliterate = counter({
name: "iceshrimp_obliterate_total",
help: "Total obliterate jobs processed",
});
export const tickBiteIncoming = counter({
name: "iceshrimp_bite_remote_incoming_total",
help: "Total bites received from remote",
});
export const tickBiteOutgoing = counter({
name: "iceshrimp_bite_remote_outgoing_total",
help: "Total bites sent to remote",
});
export const tickBiteLocal = counter ({
name: "iceshrimp_bite_local_total",
help: "Total local bites"
});

View file

@ -0,0 +1,13 @@
import { MigrationInterface, QueryRunner } from "typeorm";
export class IndexSharedInbox1705383368782 implements MigrationInterface {
name = 'IndexSharedInbox1705383368782'
public async up(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(`CREATE INDEX "IDX_92d3de2fe0efd289de109b6627" ON "user" ("sharedInbox") `);
}
public async down(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(`DROP INDEX "public"."IDX_92d3de2fe0efd289de109b6627"`);
}
}

View file

@ -11,6 +11,7 @@ import { DriveFile } from "./drive-file.js";
@Entity() @Entity()
@Index(["usernameLower", "host"], { unique: true }) @Index(["usernameLower", "host"], { unique: true })
@Index(["sharedInbox"])
export class User { export class User {
@PrimaryColumn(id()) @PrimaryColumn(id())
public id: string; public id: string;

View file

@ -14,6 +14,7 @@ import processObjectStorage from "./processors/object-storage/index.js";
import processSystemQueue from "./processors/system/index.js"; import processSystemQueue from "./processors/system/index.js";
import processWebhookDeliver from "./processors/webhook-deliver.js"; import processWebhookDeliver from "./processors/webhook-deliver.js";
import processBackground from "./processors/background/index.js"; import processBackground from "./processors/background/index.js";
import processObliterate from "./processors/obliterate.js";
import { endedPollNotification } from "./processors/ended-poll-notification.js"; import { endedPollNotification } from "./processors/ended-poll-notification.js";
import { queueLogger } from "./logger.js"; import { queueLogger } from "./logger.js";
import { getJobInfo } from "./get-job-info.js"; import { getJobInfo } from "./get-job-info.js";
@ -26,8 +27,10 @@ import {
endedPollNotificationQueue, endedPollNotificationQueue,
webhookDeliverQueue, webhookDeliverQueue,
backgroundQueue, backgroundQueue,
obliterateQueue,
} from "./queues.js"; } from "./queues.js";
import type { ThinUser } from "./types.js"; import type { ThinUser } from "./types.js";
import { updateInboxCache } from "@/services/note/delete.js";
function renderError(e: Error): any { function renderError(e: Error): any {
return { return {
@ -43,6 +46,7 @@ const webhookLogger = queueLogger.createSubLogger("webhook");
const inboxLogger = queueLogger.createSubLogger("inbox"); const inboxLogger = queueLogger.createSubLogger("inbox");
const dbLogger = queueLogger.createSubLogger("db"); const dbLogger = queueLogger.createSubLogger("db");
const objectStorageLogger = queueLogger.createSubLogger("objectStorage"); const objectStorageLogger = queueLogger.createSubLogger("objectStorage");
const obliterateLogger = queueLogger.createSubLogger("obliterate");
systemQueue systemQueue
.on("waiting", (jobId) => systemLogger.debug(`waiting id=${jobId}`)) .on("waiting", (jobId) => systemLogger.debug(`waiting id=${jobId}`))
@ -157,6 +161,30 @@ webhookDeliverQueue
webhookLogger.warn(`stalled ${getJobInfo(job)} to=${job.data.to}`), webhookLogger.warn(`stalled ${getJobInfo(job)} to=${job.data.to}`),
); );
obliterateQueue
.on("waiting", (jobId) => obliterateLogger.debug(`waiting id=${jobId}`))
.on("active", (job) =>
obliterateLogger.debug(
`active ${getJobInfo(job, true)} to=${job.data.to}`,
),
)
.on("completed", (job, result) =>
obliterateLogger.debug(
`completed(${result}) ${getJobInfo(job, true)} to=${job.data.to}`,
),
)
.on("failed", (job, err) =>
obliterateLogger.warn(
`failed(${err}) ${getJobInfo(job)} to=${job.data.to}`,
),
)
.on("error", (job: any, err: Error) =>
obliterateLogger.error(`error ${err}`, { job, e: renderError(err) }),
)
.on("stalled", (job) =>
obliterateLogger.warn(`stalled ${getJobInfo(job)} to=${job.data.to}`),
export function deliver(user: ThinUser, content: unknown, to: string | null) { export function deliver(user: ThinUser, content: unknown, to: string | null) {
if (content == null) return null; if (content == null) return null;
if (to == null) return null; if (to == null) return null;
@ -514,6 +542,30 @@ export function webhookDeliver(
}); });
} }
export function obliterate(
user: ThinUser,
content: unknown,
to: string,
) {
const data = {
user: {
id: user.id,
},
content,
to,
};
return obliterateQueue.add(data, {
attempts: config.obliterateJobMaxAttempts || 4,
timeout: 1 * 60 * 1000, // 1min
backoff: {
type: "apBackoff",
},
removeOnComplete: true,
removeOnFail: true,
});
}
export default function () { export default function () {
if (envOption.onlyServer) return; if (envOption.onlyServer) return;
@ -602,6 +654,11 @@ export default function () {
); );
processSystemQueue(systemQueue); processSystemQueue(systemQueue);
updateInboxCache();
obliterateQueue.process(
config.obliterateJobConcurrency || 16,
processObliterate,
);
} }
export function destroy() { export function destroy() {
@ -614,4 +671,9 @@ export function destroy() {
inboxLogger.succ(`Cleaned ${jobs.length} ${status} jobs`); inboxLogger.succ(`Cleaned ${jobs.length} ${status} jobs`);
}); });
inboxQueue.clean(0, "delayed"); inboxQueue.clean(0, "delayed");
obliterateQueue.once("cleaned", (jobs, status) => {
obliterateLogger.succ(`Cleaned ${jobs.length} ${status} jobs`);
});
obliterateQueue.clean(0, "delayed");
} }

View file

@ -14,6 +14,7 @@ import { StatusError } from "@/misc/fetch.js";
import { shouldSkipInstance } from "@/misc/skipped-instances.js"; import { shouldSkipInstance } from "@/misc/skipped-instances.js";
import type { DeliverJobData } from "@/queue/types.js"; import type { DeliverJobData } from "@/queue/types.js";
import type Bull from "bull"; import type Bull from "bull";
import { tickOutbox } from "@/metrics.js";
import { patchText, shouldPatchText } from "@/remote/activitypub/renderer/note.js"; import { patchText, shouldPatchText } from "@/remote/activitypub/renderer/note.js";
const logger = new Logger("deliver"); const logger = new Logger("deliver");
@ -72,6 +73,8 @@ export default async (job: Bull.Job<DeliverJobData>) => {
federationChart.deliverd(i.host, true); federationChart.deliverd(i.host, true);
})(); })();
tickOutbox();
return "Success"; return "Success";
} catch (res) { } catch (res) {
// Update stats // Update stats

View file

@ -23,6 +23,7 @@ import type { CacheableRemoteUser } from "@/models/entities/user.js";
import type { UserPublickey } from "@/models/entities/user-publickey.js"; import type { UserPublickey } from "@/models/entities/user-publickey.js";
import { shouldBlockInstance } from "@/misc/should-block-instance.js"; import { shouldBlockInstance } from "@/misc/should-block-instance.js";
import { verifySignature } from "@/remote/activitypub/check-fetch.js"; import { verifySignature } from "@/remote/activitypub/check-fetch.js";
import { tickInbox } from "@/metrics.js";
const logger = new Logger("inbox"); const logger = new Logger("inbox");
@ -217,6 +218,8 @@ export default async (job: Bull.Job<InboxJobData>): Promise<string> => {
} }
} }
tickInbox();
// アクティビティを処理 // アクティビティを処理
await perform(authUser.user, activity); await perform(authUser.user, activity);
return "ok"; return "ok";

View file

@ -0,0 +1,60 @@
import Bull from "bull";
import { DeliverJobData } from "../types.js";
import { tickObliterate } from "@/metrics.js";
import { StatusError } from "@/misc/fetch.js";
import request from "@/remote/activitypub/request.js";
import { registerOrFetchInstanceDoc } from "@/services/register-or-fetch-instance-doc.js";
import { DAY, HOUR } from "@/const.js";
import { Instances } from "@/models/index.js";
export default async (job: Bull.Job<DeliverJobData>) => {
const { host } = new URL(job.data.to);
try {
const notResponding = await Instances.createQueryBuilder("i")
.where("i.host = :host", { host })
.andWhere("i.isNotResponding")
.andWhere("i.lastCommunicatedAt < :cutoff", {
cutoff: new Date(Date.now() - DAY * 14),
})
.cache(HOUR)
.getCount();
// what are these string returns anyways. what do they mean. why did bull implement it like
// this. do string returns mean anything? is the actual string itself important? would i
// achieve the same by returning your home address? many such questions
if (notResponding && (Math.random() > 0.01)) throw "skip";
await request(job.data.user, job.data.to, job.data.content, 3000);
tickObliterate();
registerOrFetchInstanceDoc(host).then((i) => {
Instances.update(i.id, {
latestRequestSentAt: new Date(),
latestStatus: 200,
lastCommunicatedAt: new Date(),
isNotResponding: false,
});
});
return "Success";
} catch (res) {
registerOrFetchInstanceDoc(host).then((i) => {
Instances.update(i.id, {
latestRequestSentAt: new Date(),
latestStatus:
res instanceof StatusError ? res.statusCode : null,
isNotResponding: true,
});
});
if (res instanceof StatusError) {
if (res.isClientError) {
return `${res.statusCode} ${res.statusMessage}`;
}
throw new Error(`${res.statusCode} ${res.statusMessage}`);
} else {
throw res;
}
}
};

View file

@ -28,6 +28,10 @@ export const webhookDeliverQueue = initializeQueue<WebhookDeliverJobData>(
64, 64,
); );
export const backgroundQueue = initializeQueue<Record<string, unknown>>("bg"); export const backgroundQueue = initializeQueue<Record<string, unknown>>("bg");
export const obliterateQueue = initializeQueue<DeliverJobData>(
"obliterate",
config.obliterateJobPerSec || 16,
);
export const queues = [ export const queues = [
systemQueue, systemQueue,
@ -38,4 +42,5 @@ export const queues = [
objectStorageQueue, objectStorageQueue,
webhookDeliverQueue, webhookDeliverQueue,
backgroundQueue, backgroundQueue,
obliterateQueue,
]; ];

View file

@ -12,6 +12,7 @@ import type { UserPublickey } from "@/models/entities/user-publickey.js";
import { verify } from "node:crypto"; import { verify } from "node:crypto";
import { toSingle } from "@/prelude/array.js"; import { toSingle } from "@/prelude/array.js";
import { createHash } from "node:crypto"; import { createHash } from "node:crypto";
import { tickFetch } from "@/metrics.js";
export async function hasSignature(req: IncomingMessage): Promise<string> { export async function hasSignature(req: IncomingMessage): Promise<string> {
const meta = await fetchMeta(); const meta = await fetchMeta();
@ -120,7 +121,12 @@ export async function checkFetch(req: IncomingMessage): Promise<{ status: number
return { status: 403 }; return { status: 403 };
} }
return verifySignature(signature, authUser.key) ? 200 : 401; if (!verifySignature(signature, authUser.key)) {
return 401;
}
tickFetch();
return 200;
} }
return { status: 200 }; return { status: 200 };
} }

View file

@ -76,7 +76,7 @@ export default class DeliverManager {
/** /**
* Execute delivers * Execute delivers
*/ */
public async execute() { public async execute(): Promise<Set<string>> {
if (!Users.isLocalUser(this.actor)) return; if (!Users.isLocalUser(this.actor)) return;
const inboxes = new Set<string>(); const inboxes = new Set<string>();
@ -155,6 +155,8 @@ export default class DeliverManager {
deliver(this.actor, this.activity, inbox); deliver(this.actor, this.activity, inbox);
} }
return inboxes;
} }
} }
@ -167,10 +169,10 @@ export default class DeliverManager {
export async function deliverToFollowers( export async function deliverToFollowers(
actor: { id: ILocalUser["id"]; host: null }, actor: { id: ILocalUser["id"]; host: null },
activity: any, activity: any,
) { ): Promise<Set<string>> {
const manager = new DeliverManager(actor, activity); const manager = new DeliverManager(actor, activity);
manager.addFollowersRecipe(); manager.addFollowersRecipe();
await manager.execute(); return await manager.execute();
} }
/** /**
@ -182,9 +184,9 @@ export async function deliverToUser(
actor: { id: ILocalUser["id"]; host: null }, actor: { id: ILocalUser["id"]; host: null },
activity: any, activity: any,
to: IRemoteUser, to: IRemoteUser,
) { ): Promise<Set<string>> {
const manager = new DeliverManager(actor, activity); const manager = new DeliverManager(actor, activity);
manager.addDirectRecipe(to); manager.addDirectRecipe(to);
await manager.execute(); return await manager.execute();
} }
//#endregion //#endregion

View file

@ -6,6 +6,7 @@ import config from "@/config/index.js";
import { genId } from "@/misc/gen-id.js"; import { genId } from "@/misc/gen-id.js";
import { createBite } from "@/services/create-bite.js"; import { createBite } from "@/services/create-bite.js";
import { Bite } from "@/models/entities/bite.js"; import { Bite } from "@/models/entities/bite.js";
import { tickBiteIncoming } from "@/metrics.js";
export default async ( export default async (
actor: CacheableRemoteUser, actor: CacheableRemoteUser,
@ -75,5 +76,7 @@ export default async (
bite.published ? new Date(bite.published) : null, bite.published ? new Date(bite.published) : null,
); );
tickBiteIncoming();
return "ok"; return "ok";
}; };

View file

@ -5,7 +5,7 @@ import { getResponse } from "../../misc/fetch.js";
import { createSignedPost, createSignedGet } from "./ap-request.js"; import { createSignedPost, createSignedGet } from "./ap-request.js";
import { apLogger } from "@/remote/activitypub/logger.js"; import { apLogger } from "@/remote/activitypub/logger.js";
export default async (user: { id: User["id"] }, url: string, object: any) => { export default async (user: { id: User["id"] }, url: string, object: any, timeout = undefined) => {
const body = JSON.stringify(object); const body = JSON.stringify(object);
const keypair = await getUserKeypair(user.id); const keypair = await getUserKeypair(user.id);
@ -27,6 +27,7 @@ export default async (user: { id: User["id"] }, url: string, object: any) => {
method: req.request.method, method: req.request.method,
headers: req.request.headers, headers: req.request.headers,
body, body,
timeout,
}); });
}; };

View file

@ -27,6 +27,7 @@ import { shouldBlockInstance } from "@/misc/should-block-instance.js";
import { apLogger } from "@/remote/activitypub/logger.js"; import { apLogger } from "@/remote/activitypub/logger.js";
import { In, IsNull, Not } from "typeorm"; import { In, IsNull, Not } from "typeorm";
import renderBite from "@/remote/activitypub/renderer/bite.js"; import renderBite from "@/remote/activitypub/renderer/bite.js";
import { tickResolve } from "@/metrics.js";
export default class Resolver { export default class Resolver {
private history: Set<string>; private history: Set<string>;
@ -127,7 +128,10 @@ export default class Resolver {
const {res, object} = await this.doFetch(value); const {res, object} = await this.doFetch(value);
if (object.id == null) throw new Error("Object has no ID"); if (object.id == null) throw new Error("Object has no ID");
if (res.finalUrl === object.id) return object; if (res.finalUrl === object.id) {
tickResolve();
return object;
}
if (new URL(res.finalUrl).host !== new URL(object.id).host) if (new URL(res.finalUrl).host !== new URL(object.id).host)
throw new Error("Object ID host doesn't match final url host"); throw new Error("Object ID host doesn't match final url host");
@ -137,6 +141,8 @@ export default class Resolver {
if (finalRes.finalUrl !== finalObject.id) if (finalRes.finalUrl !== finalObject.id)
throw new Error("Object ID still doesn't match final URL after second fetch attempt") throw new Error("Object ID still doesn't match final URL after second fetch attempt")
tickResolve();
return finalObject; return finalObject;
} }

View file

@ -37,6 +37,7 @@ import { serverLogger } from "./index.js";
import config from "@/config/index.js"; import config from "@/config/index.js";
import Koa from "koa"; import Koa from "koa";
import renderBite from "@/remote/activitypub/renderer/bite.js"; import renderBite from "@/remote/activitypub/renderer/bite.js";
import { tickFetch } from "@/metrics.js";
// Init router // Init router
const router = new Router(); const router = new Router();
@ -225,6 +226,7 @@ router.get("/users/:user/collections/featured", Featured);
router.get("/users/:user/publickey", async (ctx) => { router.get("/users/:user/publickey", async (ctx) => {
const instanceActor = await getInstanceActor(); const instanceActor = await getInstanceActor();
if (ctx.params.user === instanceActor.id) { if (ctx.params.user === instanceActor.id) {
tickFetch();
ctx.body = renderActivity( ctx.body = renderActivity(
renderKey(instanceActor, await getUserKeypair(instanceActor.id)), renderKey(instanceActor, await getUserKeypair(instanceActor.id)),
); );
@ -289,6 +291,7 @@ router.get("/users/:user", async (ctx, next) => {
const instanceActor = await getInstanceActor(); const instanceActor = await getInstanceActor();
if (ctx.params.user === instanceActor.id) { if (ctx.params.user === instanceActor.id) {
tickFetch();
await userInfo(ctx, instanceActor); await userInfo(ctx, instanceActor);
return; return;
} }
@ -313,6 +316,7 @@ router.get("/@:user", async (ctx, next) => {
if (!isActivityPubReq(ctx)) return await next(); if (!isActivityPubReq(ctx)) return await next();
if (ctx.params.user === "instance.actor") { if (ctx.params.user === "instance.actor") {
tickFetch();
const instanceActor = await getInstanceActor(); const instanceActor = await getInstanceActor();
await userInfo(ctx, instanceActor); await userInfo(ctx, instanceActor);
return; return;
@ -334,6 +338,7 @@ router.get("/@:user", async (ctx, next) => {
}); });
router.get("/actor", async (ctx, next) => { router.get("/actor", async (ctx, next) => {
tickFetch();
const instanceActor = await getInstanceActor(); const instanceActor = await getInstanceActor();
await userInfo(ctx, instanceActor); await userInfo(ctx, instanceActor);
}); });

View file

@ -33,6 +33,7 @@ import removeTrailingSlash from "koa-remove-trailing-slashes";
import { koaBody } from "koa-body"; import { koaBody } from "koa-body";
import { setupEndpointsAuthRoot } from "@/server/api/mastodon/endpoints/auth.js"; import { setupEndpointsAuthRoot } from "@/server/api/mastodon/endpoints/auth.js";
import { CatchErrorsMiddleware } from "@/server/api/mastodon/middleware/catch-errors.js"; import { CatchErrorsMiddleware } from "@/server/api/mastodon/middleware/catch-errors.js";
import { handleMetrics } from "@/metrics.js";
export const serverLogger = new Logger("server", "gray", false); export const serverLogger = new Logger("server", "gray", false);
// Init app // Init app
@ -117,6 +118,10 @@ router.get("/identicon/:x", async (ctx) => {
} }
}); });
if (config.enableMetrics) {
router.get("/metrics", handleMetrics);
}
mastoRouter.use( mastoRouter.use(
koaBody({ koaBody({
urlencoded: true, urlencoded: true,

View file

@ -6,6 +6,7 @@ import { renderActivity } from "@/remote/activitypub/renderer/index.js";
import renderBite from "@/remote/activitypub/renderer/bite.js"; import renderBite from "@/remote/activitypub/renderer/bite.js";
import { deliverToUser } from "@/remote/activitypub/deliver-manager.js"; import { deliverToUser } from "@/remote/activitypub/deliver-manager.js";
import { createNotification } from "./create-notification.js"; import { createNotification } from "./create-notification.js";
import { tickBiteLocal, tickBiteOutgoing } from "@/metrics.js";
export async function createBite( export async function createBite(
sender: User, sender: User,
@ -61,6 +62,8 @@ export async function createBite(
renderActivity(await renderBite(bite)), renderActivity(await renderBite(bite)),
deliverTarget, deliverTarget,
); );
tickBiteOutgoing();
} }
if (Users.isLocalUser(deliverTarget)) { if (Users.isLocalUser(deliverTarget)) {
@ -68,6 +71,8 @@ export async function createBite(
notifierId: sender.id, notifierId: sender.id,
biteId: bite.id, biteId: bite.id,
}); });
if (Users.isLocalUser(sender)) tickBiteLocal();
} }
return id; return id;

View file

@ -21,6 +21,9 @@ import {
import { countSameRenotes } from "@/misc/count-same-renotes.js"; import { countSameRenotes } from "@/misc/count-same-renotes.js";
import { registerOrFetchInstanceDoc } from "../register-or-fetch-instance-doc.js"; import { registerOrFetchInstanceDoc } from "../register-or-fetch-instance-doc.js";
import { deliverToRelays } from "../relay.js"; import { deliverToRelays } from "../relay.js";
import { HOUR } from "@/const.js";
import { Mutex } from "async-mutex";
import { obliterate } from "@/queue/index.js";
/** /**
* 稿 * 稿
@ -174,15 +177,62 @@ async function getMentionedRemoteUsers(note: Note) {
})) as IRemoteUser[]; })) as IRemoteUser[];
} }
let inboxCache: string[] = [];
let inboxCacheExpiration = 0;
const inboxCacheUpdate = new Mutex();
export async function updateInboxCache() {
// we only want one instance of the update running at a time
const release = await inboxCacheUpdate.acquire();
try {
// the cache might have been updated while we were
// acquiring the mutex so we check again
if (inboxCacheExpiration < new Date().getTime()) {
const result = await Users.query(
'SELECT DISTINCT ON (u."sharedInbox") u."sharedInbox" FROM \
"user" as u INNER JOIN "instance" as i ON i."host" = u."host" \
WHERE u."sharedInbox" IS NOT NULL AND i."isNotResponding" = false',
);
inboxCache = result.map((user: any) => user.sharedInbox as string);
inboxCacheExpiration = new Date().getTime() + HOUR;
}
} finally {
release();
}
}
async function deliverToConcerned( async function deliverToConcerned(
user: { id: ILocalUser["id"]; host: null }, user: { id: ILocalUser["id"]; host: null },
note: Note, note: Note,
content: any, content: any,
) { ) {
deliverToFollowers(user, content);
deliverToRelays(user, content); deliverToRelays(user, content);
const remoteUsers = await getMentionedRemoteUsers(note); const remoteUsers = await getMentionedRemoteUsers(note);
for (const remoteUser of remoteUsers) { const sets = await Promise.all([
deliverToUser(user, content, remoteUser); deliverToFollowers(user, content),
...remoteUsers.map((remoteUser) =>
deliverToUser(user, content, remoteUser),
),
]);
const alreadyDelivered = new Set();
for (const set of sets) {
for (const inbox of set) {
alreadyDelivered.add(inbox);
}
}
if (config.enableObliterate) {
// sacrificate note to the void goddess \o/
if (inboxCacheExpiration < new Date().getTime()) {
await updateInboxCache();
}
const toObliterate = inboxCache.filter(
(inbox) => !alreadyDelivered.has(inbox),
);
for (const inbox of toObliterate) {
obliterate(user, content, inbox);
}
} }
} }

View file

@ -1936,6 +1936,12 @@ __metadata:
checksum: 10/673c11518dba2e582e42415cbefe928513616f3af25e12f6e4e6b1b98b52b3e6c14bc251a361654af63cd64f208f22a1f7556fa49da2bf7efcf28cb14f16f807 checksum: 10/673c11518dba2e582e42415cbefe928513616f3af25e12f6e4e6b1b98b52b3e6c14bc251a361654af63cd64f208f22a1f7556fa49da2bf7efcf28cb14f16f807
languageName: node languageName: node
linkType: hard linkType: hard
"@opentelemetry/api@npm:^1.4.0":
version: 1.7.0
resolution: "@opentelemetry/api@npm:1.7.0"
checksum: 10/bcf7afa7051dcd4583898a68f8a57fb4c85b5cedddf7b6eb3616595c0b3bcd7f5448143b8355b00935a755de004d6285489f8e132f34127efe7b1be404622a3e
languageName: node
linkType: hard
"@paralleldrive/cuid2@npm:^2.2.2": "@paralleldrive/cuid2@npm:^2.2.2":
version: 2.2.2 version: 2.2.2
@ -5379,6 +5385,7 @@ __metadata:
pg: "npm:8.11.1" pg: "npm:8.11.1"
private-ip: "npm:2.3.4" private-ip: "npm:2.3.4"
probe-image-size: "npm:7.2.3" probe-image-size: "npm:7.2.3"
prom-client: "npm:^15.1.0"
promise-limit: "npm:2.7.0" promise-limit: "npm:2.7.0"
pug: "npm:3.0.2" pug: "npm:3.0.2"
punycode: "npm:2.3.0" punycode: "npm:2.3.0"
@ -5540,6 +5547,13 @@ __metadata:
languageName: node languageName: node
linkType: hard linkType: hard
"bintrees@npm:1.0.2":
version: 1.0.2
resolution: "bintrees@npm:1.0.2"
checksum: 10/071896cea5ea5413316c8436e95799444c208630d5c539edd8a7089fc272fc5d3634aa4a2e4847b28350dda1796162e14a34a0eda53108cc5b3c2ff6a036c1fa
languageName: node
linkType: hard
"bl@npm:^1.0.0": "bl@npm:^1.0.0":
version: 1.2.3 version: 1.2.3
resolution: "bl@npm:1.2.3" resolution: "bl@npm:1.2.3"
@ -16926,6 +16940,16 @@ __metadata:
languageName: node languageName: node
linkType: hard linkType: hard
"prom-client@npm:^15.1.0":
version: 15.1.0
resolution: "prom-client@npm:15.1.0"
dependencies:
"@opentelemetry/api": "npm:^1.4.0"
tdigest: "npm:^0.1.1"
checksum: 10/ecb6f40de755ca9cc6dde758d195ed3e1d3b47a341d2092af8c18dbf7e6ef1079c8b8bb02496f2f430cf8bd9d391c1ea5bebbb85cdda95f67dad2dbfb90509aa
languageName: node
linkType: hard
"promise-limit@npm:2.7.0": "promise-limit@npm:2.7.0":
version: 2.7.0 version: 2.7.0
resolution: "promise-limit@npm:2.7.0" resolution: "promise-limit@npm:2.7.0"
@ -19493,6 +19517,15 @@ __metadata:
languageName: node languageName: node
linkType: hard linkType: hard
"tdigest@npm:^0.1.1":
version: 0.1.2
resolution: "tdigest@npm:0.1.2"
dependencies:
bintrees: "npm:1.0.2"
checksum: 10/45be99fa52dab74b8edafe150e473cdc45aa1352c75ed516a39905f350a08c3175f6555598111042c3677ba042d7e3cae6b5ce4c663fe609bc634f326aabc9d6
languageName: node
linkType: hard
"terminal-link@npm:^2.0.0": "terminal-link@npm:^2.0.0":
version: 2.1.1 version: 2.1.1
resolution: "terminal-link@npm:2.1.1" resolution: "terminal-link@npm:2.1.1"