diff --git a/packages/backend/package.json b/packages/backend/package.json index f7d19d85b..b584a5691 100644 --- a/packages/backend/package.json +++ b/packages/backend/package.json @@ -112,6 +112,7 @@ "ratelimiter": "3.4.1", "re2": "1.19.0", "redis-lock": "0.1.4", + "redis-semaphore": "5.3.1", "reflect-metadata": "0.1.13", "rename": "1.0.4", "rndstr": "1.0.0", diff --git a/packages/backend/src/services/note/create.ts b/packages/backend/src/services/note/create.ts index defd9742e..f00678ce2 100644 --- a/packages/backend/src/services/note/create.ts +++ b/packages/backend/src/services/note/create.ts @@ -69,6 +69,7 @@ import { getActiveWebhooks } from "@/misc/webhook-cache.js"; import { shouldSilenceInstance } from "@/misc/should-block-instance.js"; import meilisearch from "../../db/meilisearch.js"; import { redisClient } from "@/db/redis.js"; +import { Mutex } from "redis-semaphore"; const mutedWordsCache = new Cache< { userId: UserProfile["userId"]; mutedWords: UserProfile["mutedWords"] }[] @@ -461,58 +462,43 @@ export default async ( } if (!dontFederateInitially) { + let publishKey: string; + let noteToPublish: Note; const relays = await getCachedRelays(); + // Some relays (e.g., aode-relay) deliver posts by boosting them as // Announce activities. In that case, user is the relay's actor. const boostedByRelay = !!user.inbox && relays.map((relay) => relay.inbox).includes(user.inbox); - if (!note.uri) { - // Publish if the post is local - publishNotesStream(note); - } else if (boostedByRelay && data.renote?.uri) { - // Use Redis transaction for atomicity - await redisClient.watch(`publishedNote:${data.renote.uri}`); - const exists = await redisClient.exists( - `publishedNote:${data.renote.uri}`, - ); - if (exists === 0) { - // Start the transaction - const transaction = redisClient.multi(); - const key = `publishedNote:${data.renote.uri}`; - transaction.set(key, 1, "EX", 30); - // Execute the transaction - transaction.exec((err, replies) => { - // Publish after setting the key in Redis - if (!err && data.renote) { - publishNotesStream(data.renote); - } - }); - } else { - // Abort the transaction - redisClient.unwatch(); - } - } else if (!boostedByRelay && note.uri) { - // Use Redis transaction for atomicity - await redisClient.watch(`publishedNote:${note.uri}`); - const exists = await redisClient.exists(`publishedNote:${note.uri}`); - if (exists === 0) { - // Start the transaction - const transaction = redisClient.multi(); - const key = `publishedNote:${note.uri}`; - transaction.set(key, 1, "EX", 30); - // Execute the transaction - transaction.exec((err, replies) => { - // Publish after setting the key in Redis - if (!err) { - publishNotesStream(note); - } - }); - } else { - // Abort the transaction - redisClient.unwatch(); + if (boostedByRelay && data.renote && data.renote.userHost) { + publishKey = `publishedNote:${data.renote.id}`; + noteToPublish = data.renote; + } else { + publishKey = `publishedNote:${note.id}`; + noteToPublish = note; + } + + const lock = new Mutex(redisClient, "publishedNote"); + await lock.acquire(); + try { + const published = (await redisClient.get(publishKey)) !== null; + if (!published) { + await redisClient.set(publishKey, "done", "EX", 30); + if (noteToPublish.renoteId) { + // Prevents other threads from publishing the boosting post + await redisClient.set( + `publishedNote:${noteToPublish.renoteId}`, + "done", + "EX", + 30, + ); + } + publishNotesStream(noteToPublish); } + } finally { + await lock.release(); } } if (note.replyId != null) { diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 760931858..1002a6f95 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -339,6 +339,9 @@ importers: redis-lock: specifier: 0.1.4 version: 0.1.4 + redis-semaphore: + specifier: 5.3.1 + version: 5.3.1(ioredis@5.3.2) reflect-metadata: specifier: 0.1.13 version: 0.1.13 @@ -2666,6 +2669,7 @@ packages: engines: {node: '>=10'} cpu: [arm64] os: [android] + requiresBuild: true dependencies: '@swc/wasm': 1.2.130 @@ -2772,6 +2776,7 @@ packages: /@swc/wasm@1.2.130: resolution: {integrity: sha512-rNcJsBxS70+pv8YUWwf5fRlWX6JoY/HJc25HD/F8m6Kv7XhJdqPPMhyX6TKkUBPAG7TWlZYoxa+rHAjPy4Cj3Q==} + requiresBuild: true /@syuilo/aiscript@0.11.1: resolution: {integrity: sha512-chwOIA3yLUKvOB0G611hjLArKTeOWNmTm3lHERSaDW1d+dS6do56naX6Lkwy2UpnwWC0qzeNSgg35elk6t2gZg==} @@ -12831,6 +12836,18 @@ packages: dependencies: redis-errors: 1.2.0 + /redis-semaphore@5.3.1(ioredis@5.3.2): + resolution: {integrity: sha512-oUpxxfxSbh5eT0mvVpz2d4Qlg2CsaoQkeo80/v6CU2l97zO0u6NPgc9/zQZa9KGR3/93b0igtSct3hEFh8Ei8w==} + engines: {node: '>= 14.17.0'} + peerDependencies: + ioredis: ^4.1.0 || ^5 + dependencies: + debug: 4.3.4(supports-color@8.1.1) + ioredis: 5.3.2 + transitivePeerDependencies: + - supports-color + dev: false + /redis@4.6.7: resolution: {integrity: sha512-KrkuNJNpCwRm5vFJh0tteMxW8SaUzkm5fBH7eL5hd/D0fAkzvapxbfGPP/r+4JAXdQuX7nebsBkBqA2RHB7Usw==} dependencies: