From e70d99be57797b24f3e4c785cf138f7c6680a2c5 Mon Sep 17 00:00:00 2001 From: Namekuji Date: Fri, 30 Jun 2023 17:44:36 -0400 Subject: [PATCH 1/7] fix: multiple boost publication by relay --- packages/backend/src/services/note/create.ts | 98 ++++++++++---------- 1 file changed, 51 insertions(+), 47 deletions(-) diff --git a/packages/backend/src/services/note/create.ts b/packages/backend/src/services/note/create.ts index defd9742e..2a5845943 100644 --- a/packages/backend/src/services/note/create.ts +++ b/packages/backend/src/services/note/create.ts @@ -461,57 +461,61 @@ export default async ( } if (!dontFederateInitially) { - const relays = await getCachedRelays(); - // Some relays (e.g., aode-relay) deliver posts by boosting them as - // Announce activities. In that case, user is the relay's actor. - const boostedByRelay = - !!user.inbox && - relays.map((relay) => relay.inbox).includes(user.inbox); - - if (!note.uri) { + if (!note.userHost) { // Publish if the post is local publishNotesStream(note); - } else if (boostedByRelay && data.renote?.uri) { - // Use Redis transaction for atomicity - await redisClient.watch(`publishedNote:${data.renote.uri}`); - const exists = await redisClient.exists( - `publishedNote:${data.renote.uri}`, - ); - if (exists === 0) { - // Start the transaction - const transaction = redisClient.multi(); - const key = `publishedNote:${data.renote.uri}`; - transaction.set(key, 1, "EX", 30); - // Execute the transaction - transaction.exec((err, replies) => { - // Publish after setting the key in Redis - if (!err && data.renote) { - publishNotesStream(data.renote); - } - }); + } else { + const relays = await getCachedRelays(); + // Some relays (e.g., aode-relay) deliver posts by boosting them as + // Announce activities. In that case, user is the relay's actor. + const boostedByRelay = relays + .map((relay) => new URL(relay.inbox).host) + .includes(note.userHost); + + if (boostedByRelay && data.renote) { + // Use Redis transaction for atomicity + const key = `publishedNote:${data.renote.id}`; + await redisClient.watch(key); + const exists = await redisClient.exists(key); + if (exists === 0) { + // Start the transaction + const transaction = redisClient.multi(); + transaction.set(key, 1, "EX", 30); + // Execute the transaction + await transaction.exec((err, _replies) => { + // Publish after setting the key in Redis + if (!err && boostedByRelay && data.renote) { + publishNotesStream(data.renote); + } + }); + } else { + // Abort the transaction + redisClient.unwatch(); + } } else { - // Abort the transaction - redisClient.unwatch(); - } - } else if (!boostedByRelay && note.uri) { - // Use Redis transaction for atomicity - await redisClient.watch(`publishedNote:${note.uri}`); - const exists = await redisClient.exists(`publishedNote:${note.uri}`); - if (exists === 0) { - // Start the transaction - const transaction = redisClient.multi(); - const key = `publishedNote:${note.uri}`; - transaction.set(key, 1, "EX", 30); - // Execute the transaction - transaction.exec((err, replies) => { - // Publish after setting the key in Redis - if (!err) { - publishNotesStream(note); + // Use Redis transaction for atomicity + const key = `publishedNote:${note.id}`; + await redisClient.watch(key); + const exists = await redisClient.exists(key); + if (exists === 0) { + // Start the transaction + const transaction = redisClient.multi(); + transaction.set(key, 1, "EX", 30); + if (note.renoteId) { + // Prevent other threads from publishing this boosting post + transaction.set(`publishedNote:${note.renoteId}`, 1, "EX", 30); } - }); - } else { - // Abort the transaction - redisClient.unwatch(); + // Execute the transaction + await transaction.exec((err, _replies) => { + // Publish after setting the key in Redis + if (!err) { + publishNotesStream(note); + } + }); + } else { + // Abort the transaction + redisClient.unwatch(); + } } } } From 389427af275c39c708151174f59601cd46990bd7 Mon Sep 17 00:00:00 2001 From: Namekuji Date: Fri, 30 Jun 2023 18:13:07 -0400 Subject: [PATCH 2/7] fix: ignore if post boosted by relay is local --- packages/backend/src/services/note/create.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/packages/backend/src/services/note/create.ts b/packages/backend/src/services/note/create.ts index 2a5845943..dd08aca41 100644 --- a/packages/backend/src/services/note/create.ts +++ b/packages/backend/src/services/note/create.ts @@ -472,7 +472,8 @@ export default async ( .map((relay) => new URL(relay.inbox).host) .includes(note.userHost); - if (boostedByRelay && data.renote) { + if (boostedByRelay && data.renote && data.renote.userHost) { + /* A relay boosted a remote post. */ // Use Redis transaction for atomicity const key = `publishedNote:${data.renote.id}`; await redisClient.watch(key); From 64ddf35ae31e0e528128134ab5afab2413ddb4fe Mon Sep 17 00:00:00 2001 From: Namekuji Date: Fri, 30 Jun 2023 18:35:05 -0400 Subject: [PATCH 3/7] refactor: no url instantiation --- packages/backend/src/services/note/create.ts | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/packages/backend/src/services/note/create.ts b/packages/backend/src/services/note/create.ts index dd08aca41..369b3e4f7 100644 --- a/packages/backend/src/services/note/create.ts +++ b/packages/backend/src/services/note/create.ts @@ -461,16 +461,16 @@ export default async ( } if (!dontFederateInitially) { - if (!note.userHost) { + if (Users.isLocalUser(user)) { // Publish if the post is local publishNotesStream(note); } else { const relays = await getCachedRelays(); // Some relays (e.g., aode-relay) deliver posts by boosting them as // Announce activities. In that case, user is the relay's actor. - const boostedByRelay = relays - .map((relay) => new URL(relay.inbox).host) - .includes(note.userHost); + const boostedByRelay = + !!user.inbox && + relays.map((relay) => relay.inbox).includes(user.inbox); if (boostedByRelay && data.renote && data.renote.userHost) { /* A relay boosted a remote post. */ From 1f974c0c139d05e0ff192b065a230975797cfba8 Mon Sep 17 00:00:00 2001 From: Namekuji Date: Sat, 1 Jul 2023 00:50:46 -0400 Subject: [PATCH 4/7] refactor: use redis-semaphore for mutex across workers --- packages/backend/package.json | 1 + packages/backend/src/services/note/create.ts | 89 ++++++++------------ pnpm-lock.yaml | 17 ++++ 3 files changed, 53 insertions(+), 54 deletions(-) diff --git a/packages/backend/package.json b/packages/backend/package.json index f7d19d85b..b584a5691 100644 --- a/packages/backend/package.json +++ b/packages/backend/package.json @@ -112,6 +112,7 @@ "ratelimiter": "3.4.1", "re2": "1.19.0", "redis-lock": "0.1.4", + "redis-semaphore": "5.3.1", "reflect-metadata": "0.1.13", "rename": "1.0.4", "rndstr": "1.0.0", diff --git a/packages/backend/src/services/note/create.ts b/packages/backend/src/services/note/create.ts index 369b3e4f7..6d64bec50 100644 --- a/packages/backend/src/services/note/create.ts +++ b/packages/backend/src/services/note/create.ts @@ -69,6 +69,7 @@ import { getActiveWebhooks } from "@/misc/webhook-cache.js"; import { shouldSilenceInstance } from "@/misc/should-block-instance.js"; import meilisearch from "../../db/meilisearch.js"; import { redisClient } from "@/db/redis.js"; +import { Mutex } from "redis-semaphore"; const mutedWordsCache = new Cache< { userId: UserProfile["userId"]; mutedWords: UserProfile["mutedWords"] }[] @@ -461,63 +462,43 @@ export default async ( } if (!dontFederateInitially) { - if (Users.isLocalUser(user)) { - // Publish if the post is local - publishNotesStream(note); - } else { - const relays = await getCachedRelays(); - // Some relays (e.g., aode-relay) deliver posts by boosting them as - // Announce activities. In that case, user is the relay's actor. - const boostedByRelay = - !!user.inbox && - relays.map((relay) => relay.inbox).includes(user.inbox); + let publishKey: string; + let noteToPublish: Note; + const relays = await getCachedRelays(); - if (boostedByRelay && data.renote && data.renote.userHost) { - /* A relay boosted a remote post. */ - // Use Redis transaction for atomicity - const key = `publishedNote:${data.renote.id}`; - await redisClient.watch(key); - const exists = await redisClient.exists(key); - if (exists === 0) { - // Start the transaction - const transaction = redisClient.multi(); - transaction.set(key, 1, "EX", 30); - // Execute the transaction - await transaction.exec((err, _replies) => { - // Publish after setting the key in Redis - if (!err && boostedByRelay && data.renote) { - publishNotesStream(data.renote); - } - }); - } else { - // Abort the transaction - redisClient.unwatch(); - } - } else { - // Use Redis transaction for atomicity - const key = `publishedNote:${note.id}`; - await redisClient.watch(key); - const exists = await redisClient.exists(key); - if (exists === 0) { - // Start the transaction - const transaction = redisClient.multi(); - transaction.set(key, 1, "EX", 30); - if (note.renoteId) { - // Prevent other threads from publishing this boosting post - transaction.set(`publishedNote:${note.renoteId}`, 1, "EX", 30); - } - // Execute the transaction - await transaction.exec((err, _replies) => { - // Publish after setting the key in Redis - if (!err) { - publishNotesStream(note); - } - }); - } else { - // Abort the transaction - redisClient.unwatch(); + // Some relays (e.g., aode-relay) deliver posts by boosting them as + // Announce activities. In that case, user is the relay's actor. + const boostedByRelay = + !!user.inbox && + relays.map((relay) => relay.inbox).includes(user.inbox); + + if (boostedByRelay && data.renote && data.renote.userHost) { + publishKey = `publishedNote:${data.renote.id}`; + noteToPublish = data.renote; + } else { + publishKey = `publishedNote:${note.id}`; + noteToPublish = note; + } + + const lock = new Mutex(redisClient, "publishedNote:lock"); + await lock.acquire(); + try { + const exists = (await redisClient.exists(publishKey)) > 0; + if (!exists) { + await redisClient.set(publishKey, 1, "EX", 30); + if (noteToPublish.renoteId) { + // Prevents other threads from publishing the boosting post + await redisClient.set( + `publishedNote:${noteToPublish.renoteId}`, + 1, + "EX", + 30, + ); } + publishNotesStream(noteToPublish); } + } finally { + lock.release(); } } if (note.replyId != null) { diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 760931858..1002a6f95 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -339,6 +339,9 @@ importers: redis-lock: specifier: 0.1.4 version: 0.1.4 + redis-semaphore: + specifier: 5.3.1 + version: 5.3.1(ioredis@5.3.2) reflect-metadata: specifier: 0.1.13 version: 0.1.13 @@ -2666,6 +2669,7 @@ packages: engines: {node: '>=10'} cpu: [arm64] os: [android] + requiresBuild: true dependencies: '@swc/wasm': 1.2.130 @@ -2772,6 +2776,7 @@ packages: /@swc/wasm@1.2.130: resolution: {integrity: sha512-rNcJsBxS70+pv8YUWwf5fRlWX6JoY/HJc25HD/F8m6Kv7XhJdqPPMhyX6TKkUBPAG7TWlZYoxa+rHAjPy4Cj3Q==} + requiresBuild: true /@syuilo/aiscript@0.11.1: resolution: {integrity: sha512-chwOIA3yLUKvOB0G611hjLArKTeOWNmTm3lHERSaDW1d+dS6do56naX6Lkwy2UpnwWC0qzeNSgg35elk6t2gZg==} @@ -12831,6 +12836,18 @@ packages: dependencies: redis-errors: 1.2.0 + /redis-semaphore@5.3.1(ioredis@5.3.2): + resolution: {integrity: sha512-oUpxxfxSbh5eT0mvVpz2d4Qlg2CsaoQkeo80/v6CU2l97zO0u6NPgc9/zQZa9KGR3/93b0igtSct3hEFh8Ei8w==} + engines: {node: '>= 14.17.0'} + peerDependencies: + ioredis: ^4.1.0 || ^5 + dependencies: + debug: 4.3.4(supports-color@8.1.1) + ioredis: 5.3.2 + transitivePeerDependencies: + - supports-color + dev: false + /redis@4.6.7: resolution: {integrity: sha512-KrkuNJNpCwRm5vFJh0tteMxW8SaUzkm5fBH7eL5hd/D0fAkzvapxbfGPP/r+4JAXdQuX7nebsBkBqA2RHB7Usw==} dependencies: From d772da4bc48e3e82d922a1739642c164691c0caf Mon Sep 17 00:00:00 2001 From: Namekuji Date: Sat, 1 Jul 2023 02:53:08 -0400 Subject: [PATCH 5/7] change mutex key --- packages/backend/src/services/note/create.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/backend/src/services/note/create.ts b/packages/backend/src/services/note/create.ts index 6d64bec50..95d005fe0 100644 --- a/packages/backend/src/services/note/create.ts +++ b/packages/backend/src/services/note/create.ts @@ -480,7 +480,7 @@ export default async ( noteToPublish = note; } - const lock = new Mutex(redisClient, "publishedNote:lock"); + const lock = new Mutex(redisClient, "publishedNote"); await lock.acquire(); try { const exists = (await redisClient.exists(publishKey)) > 0; From 0c6af127197a05b5d9739b1640e9c83cda249e4a Mon Sep 17 00:00:00 2001 From: Namekuji Date: Sat, 1 Jul 2023 03:22:40 -0400 Subject: [PATCH 6/7] fix: await release --- packages/backend/src/services/note/create.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/backend/src/services/note/create.ts b/packages/backend/src/services/note/create.ts index 95d005fe0..6eb0df817 100644 --- a/packages/backend/src/services/note/create.ts +++ b/packages/backend/src/services/note/create.ts @@ -498,7 +498,7 @@ export default async ( publishNotesStream(noteToPublish); } } finally { - lock.release(); + await lock.release(); } } if (note.replyId != null) { From f7d0973f97de6c753a0e3e2793dc802380458f5f Mon Sep 17 00:00:00 2001 From: Namekuji Date: Sat, 1 Jul 2023 04:07:50 -0400 Subject: [PATCH 7/7] refactor: examine by get instead of exists --- packages/backend/src/services/note/create.ts | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/packages/backend/src/services/note/create.ts b/packages/backend/src/services/note/create.ts index 6eb0df817..f00678ce2 100644 --- a/packages/backend/src/services/note/create.ts +++ b/packages/backend/src/services/note/create.ts @@ -483,14 +483,14 @@ export default async ( const lock = new Mutex(redisClient, "publishedNote"); await lock.acquire(); try { - const exists = (await redisClient.exists(publishKey)) > 0; - if (!exists) { - await redisClient.set(publishKey, 1, "EX", 30); + const published = (await redisClient.get(publishKey)) !== null; + if (!published) { + await redisClient.set(publishKey, "done", "EX", 30); if (noteToPublish.renoteId) { // Prevents other threads from publishing the boosting post await redisClient.set( `publishedNote:${noteToPublish.renoteId}`, - 1, + "done", "EX", 30, );