From 1957128ba507d1066802cbe9e9ca1234956af1d2 Mon Sep 17 00:00:00 2001 From: syuilo Date: Sun, 10 Feb 2019 11:44:08 +0900 Subject: [PATCH] =?UTF-8?q?=E9=87=8D=E3=81=84=E3=81=AE=E3=81=A7=E3=82=B8?= =?UTF-8?q?=E3=83=A7=E3=83=96=E3=82=AD=E3=83=A5=E3=83=BC=E7=84=A1=E5=8A=B9?= =?UTF-8?q?=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/argv.ts | 7 ++- src/queue/index.ts | 61 ++++++++++++++---------- src/queue/processors/export-blocking.ts | 4 +- src/queue/processors/export-following.ts | 4 +- src/queue/processors/export-mute.ts | 4 +- src/queue/processors/export-notes.ts | 4 +- 6 files changed, 48 insertions(+), 36 deletions(-) diff --git a/src/argv.ts b/src/argv.ts index 4914b89e2..d9cd12dfc 100644 --- a/src/argv.ts +++ b/src/argv.ts @@ -5,9 +5,9 @@ program .version(pkg.version) .option('--no-daemons', 'Disable daemon processes (for debbuging)') .option('--disable-clustering', 'Disable clustering') - .option('--disable-ap-queue', 'Disable creating job queue related to ap') .option('--disable-queue', 'Disable job queue processing') - .option('--only-queue', 'Pocessing job queue only') + .option('--only-server', 'Run server only (without job queue)') + .option('--only-queue', 'Pocessing job queue only (without server)') .option('--quiet', 'Suppress all logs') .option('--verbose', 'Enable all logs') .option('--with-log-time', 'Include timestamp for each logs') @@ -15,8 +15,7 @@ program .option('--color', 'This option is a dummy for some external program\'s (e.g. forever) issue.') .parse(process.argv); -/*if (process.env.MK_DISABLE_AP_QUEUE)*/ program.disableApQueue = true; -if (process.env.MK_DISABLE_QUEUE) program.disableQueue = true; +/*if (process.env.MK_DISABLE_QUEUE)*/ program.disableQueue = true; if (process.env.MK_ONLY_QUEUE) program.onlyQueue = true; export { program }; diff --git a/src/queue/index.ts b/src/queue/index.ts index 5d3baa824..7dc2319f5 100644 --- a/src/queue/index.ts +++ b/src/queue/index.ts @@ -8,12 +8,13 @@ import handler from './processors'; import { queueLogger } from './logger'; const enableQueue = !program.disableQueue; +const enableQueueProcessing = !program.onlyServer && enableQueue; const queueAvailable = config.redis != null; const queue = initializeQueue(); function initializeQueue() { - if (queueAvailable) { + if (queueAvailable && enableQueue) { return new Queue('misskey', { redis: { port: config.redis.port, @@ -42,7 +43,7 @@ export function deliver(user: ILocalUser, content: any, to: any) { to }; - if (queueAvailable && !program.disableApQueue) { + if (queueAvailable && enableQueueProcessing) { return queue.createJob(data) .retries(8) .backoff('exponential', 1000) @@ -59,7 +60,7 @@ export function processInbox(activity: any, signature: httpSignature.IParsedSign signature }; - if (queueAvailable && !program.disableApQueue) { + if (queueAvailable && enableQueueProcessing) { return queue.createJob(data) .retries(3) .backoff('exponential', 500) @@ -70,47 +71,59 @@ export function processInbox(activity: any, signature: httpSignature.IParsedSign } export function createExportNotesJob(user: ILocalUser) { - if (!queueAvailable) throw 'queue unavailable'; - - return queue.createJob({ + const data = { type: 'exportNotes', user: user - }) - .save(); + }; + + if (queueAvailable && enableQueueProcessing) { + return queue.createJob(data).save(); + } else { + return handler({ data }, () => {}); + } } export function createExportFollowingJob(user: ILocalUser) { - if (!queueAvailable) throw 'queue unavailable'; - - return queue.createJob({ + const data = { type: 'exportFollowing', user: user - }) - .save(); + }; + + if (queueAvailable && enableQueueProcessing) { + return queue.createJob(data).save(); + } else { + return handler({ data }, () => {}); + } } export function createExportMuteJob(user: ILocalUser) { - if (!queueAvailable) throw 'queue unavailable'; - - return queue.createJob({ + const data = { type: 'exportMute', user: user - }) - .save(); + }; + + if (queueAvailable && enableQueueProcessing) { + return queue.createJob(data).save(); + } else { + return handler({ data }, () => {}); + } } export function createExportBlockingJob(user: ILocalUser) { - if (!queueAvailable) throw 'queue unavailable'; - - return queue.createJob({ + const data = { type: 'exportBlocking', user: user - }) - .save(); + }; + + if (queueAvailable && enableQueueProcessing) { + return queue.createJob(data).save(); + } else { + return handler({ data }, () => {}); + } } export default function() { - if (queueAvailable && enableQueue) { + if (queueAvailable && enableQueueProcessing) { queue.process(128, handler); queueLogger.succ('Processing started'); } diff --git a/src/queue/processors/export-blocking.ts b/src/queue/processors/export-blocking.ts index 95465a5e5..b30d8e3bc 100644 --- a/src/queue/processors/export-blocking.ts +++ b/src/queue/processors/export-blocking.ts @@ -48,7 +48,7 @@ export async function exportBlocking(job: bq.Job, done: any): Promise { if (blockings.length === 0) { ended = true; - job.reportProgress(100); + if (job.reportProgress) job.reportProgress(100); break; } @@ -74,7 +74,7 @@ export async function exportBlocking(job: bq.Job, done: any): Promise { blockerId: user._id, }); - job.reportProgress(exportedCount / total); + if (job.reportProgress) job.reportProgress(exportedCount / total); } stream.end(); diff --git a/src/queue/processors/export-following.ts b/src/queue/processors/export-following.ts index 13ba0888f..e6521d065 100644 --- a/src/queue/processors/export-following.ts +++ b/src/queue/processors/export-following.ts @@ -48,7 +48,7 @@ export async function exportFollowing(job: bq.Job, done: any): Promise { if (followings.length === 0) { ended = true; - job.reportProgress(100); + if (job.reportProgress) job.reportProgress(100); break; } @@ -74,7 +74,7 @@ export async function exportFollowing(job: bq.Job, done: any): Promise { followerId: user._id, }); - job.reportProgress(exportedCount / total); + if (job.reportProgress) job.reportProgress(exportedCount / total); } stream.end(); diff --git a/src/queue/processors/export-mute.ts b/src/queue/processors/export-mute.ts index 8f72133cd..74456c1da 100644 --- a/src/queue/processors/export-mute.ts +++ b/src/queue/processors/export-mute.ts @@ -48,7 +48,7 @@ export async function exportMute(job: bq.Job, done: any): Promise { if (mutes.length === 0) { ended = true; - job.reportProgress(100); + if (job.reportProgress) job.reportProgress(100); break; } @@ -74,7 +74,7 @@ export async function exportMute(job: bq.Job, done: any): Promise { muterId: user._id, }); - job.reportProgress(exportedCount / total); + if (job.reportProgress) job.reportProgress(exportedCount / total); } stream.end(); diff --git a/src/queue/processors/export-notes.ts b/src/queue/processors/export-notes.ts index 4d973d015..32e4cd1d6 100644 --- a/src/queue/processors/export-notes.ts +++ b/src/queue/processors/export-notes.ts @@ -58,7 +58,7 @@ export async function exportNotes(job: bq.Job, done: any): Promise { if (notes.length === 0) { ended = true; - job.reportProgress(100); + if (job.reportProgress) job.reportProgress(100); break; } @@ -83,7 +83,7 @@ export async function exportNotes(job: bq.Job, done: any): Promise { userId: user._id, }); - job.reportProgress(exportedNotesCount / total); + if (job.reportProgress) job.reportProgress(exportedNotesCount / total); } await new Promise((res, rej) => {