fix: multiple boost publication by relay

This commit is contained in:
Namekuji 2023-06-30 17:44:36 -04:00
parent 4e5bb1194a
commit e70d99be57

View file

@ -461,31 +461,30 @@ export default async (
} }
if (!dontFederateInitially) { if (!dontFederateInitially) {
if (!note.userHost) {
// Publish if the post is local
publishNotesStream(note);
} else {
const relays = await getCachedRelays(); const relays = await getCachedRelays();
// Some relays (e.g., aode-relay) deliver posts by boosting them as // Some relays (e.g., aode-relay) deliver posts by boosting them as
// Announce activities. In that case, user is the relay's actor. // Announce activities. In that case, user is the relay's actor.
const boostedByRelay = const boostedByRelay = relays
!!user.inbox && .map((relay) => new URL(relay.inbox).host)
relays.map((relay) => relay.inbox).includes(user.inbox); .includes(note.userHost);
if (!note.uri) { if (boostedByRelay && data.renote) {
// Publish if the post is local
publishNotesStream(note);
} else if (boostedByRelay && data.renote?.uri) {
// Use Redis transaction for atomicity // Use Redis transaction for atomicity
await redisClient.watch(`publishedNote:${data.renote.uri}`); const key = `publishedNote:${data.renote.id}`;
const exists = await redisClient.exists( await redisClient.watch(key);
`publishedNote:${data.renote.uri}`, const exists = await redisClient.exists(key);
);
if (exists === 0) { if (exists === 0) {
// Start the transaction // Start the transaction
const transaction = redisClient.multi(); const transaction = redisClient.multi();
const key = `publishedNote:${data.renote.uri}`;
transaction.set(key, 1, "EX", 30); transaction.set(key, 1, "EX", 30);
// Execute the transaction // Execute the transaction
transaction.exec((err, replies) => { await transaction.exec((err, _replies) => {
// Publish after setting the key in Redis // Publish after setting the key in Redis
if (!err && data.renote) { if (!err && boostedByRelay && data.renote) {
publishNotesStream(data.renote); publishNotesStream(data.renote);
} }
}); });
@ -493,17 +492,21 @@ export default async (
// Abort the transaction // Abort the transaction
redisClient.unwatch(); redisClient.unwatch();
} }
} else if (!boostedByRelay && note.uri) { } else {
// Use Redis transaction for atomicity // Use Redis transaction for atomicity
await redisClient.watch(`publishedNote:${note.uri}`); const key = `publishedNote:${note.id}`;
const exists = await redisClient.exists(`publishedNote:${note.uri}`); await redisClient.watch(key);
const exists = await redisClient.exists(key);
if (exists === 0) { if (exists === 0) {
// Start the transaction // Start the transaction
const transaction = redisClient.multi(); const transaction = redisClient.multi();
const key = `publishedNote:${note.uri}`;
transaction.set(key, 1, "EX", 30); transaction.set(key, 1, "EX", 30);
if (note.renoteId) {
// Prevent other threads from publishing this boosting post
transaction.set(`publishedNote:${note.renoteId}`, 1, "EX", 30);
}
// Execute the transaction // Execute the transaction
transaction.exec((err, replies) => { await transaction.exec((err, _replies) => {
// Publish after setting the key in Redis // Publish after setting the key in Redis
if (!err) { if (!err) {
publishNotesStream(note); publishNotesStream(note);
@ -515,6 +518,7 @@ export default async (
} }
} }
} }
}
if (note.replyId != null) { if (note.replyId != null) {
// Only provide the reply note id here as the recipient may not be authorized to see the note. // Only provide the reply note id here as the recipient may not be authorized to see the note.
publishNoteStream(note.replyId, "replied", { publishNoteStream(note.replyId, "replied", {