From 37cd41e0c90d7e0132877ad6aa6e8f00fe7b284e Mon Sep 17 00:00:00 2001 From: MeiMei <30769358+mei23@users.noreply.github.com> Date: Thu, 7 Nov 2019 00:02:18 +0900 Subject: [PATCH] =?UTF-8?q?AP=E3=81=AE=E7=B5=B1=E8=A8=88=E3=81=A8=E3=83=AD?= =?UTF-8?q?=E3=82=B0=E3=81=AE=E4=BF=AE=E6=AD=A3=E3=81=A8=E5=BC=B7=E5=8C=96?= =?UTF-8?q?=20(#5585)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Fix #5580 * Improve AP logging --- src/queue/get-job-info.ts | 15 +++++++++++++++ src/queue/index.ts | 17 ++++++++-------- src/queue/processors/deliver.ts | 22 ++++++++++++++++++--- src/remote/activitypub/request.ts | 32 ++----------------------------- 4 files changed, 45 insertions(+), 41 deletions(-) create mode 100644 src/queue/get-job-info.ts diff --git a/src/queue/get-job-info.ts b/src/queue/get-job-info.ts new file mode 100644 index 000000000..f601ae62d --- /dev/null +++ b/src/queue/get-job-info.ts @@ -0,0 +1,15 @@ +import * as Bull from 'bull'; + +export function getJobInfo(job: Bull.Job, increment = false) { + const age = Date.now() - job.timestamp; + + const formated = age > 60000 ? `${Math.floor(age / 1000 / 60)}m` + : age > 10000 ? `${Math.floor(age / 1000)}s` + : `${age}ms`; + + // onActiveとかonCompletedのattemptsMadeがなぜか0始まりなのでインクリメントする + const currentAttempts = job.attemptsMade + (increment ? 1 : 0); + const maxAttempts = job.opts ? job.opts.attempts : 0; + + return `id=${job.id} attempts=${currentAttempts}/${maxAttempts} age=${formated}`; +} diff --git a/src/queue/index.ts b/src/queue/index.ts index 0b2001729..2d3cf6ee3 100644 --- a/src/queue/index.ts +++ b/src/queue/index.ts @@ -11,6 +11,7 @@ import processDb from './processors/db'; import procesObjectStorage from './processors/object-storage'; import { queueLogger } from './logger'; import { DriveFile } from '../models/entities/drive-file'; +import { getJobInfo } from './get-job-info'; function initializeQueue(name: string) { return new Queue(name, { @@ -44,19 +45,19 @@ const objectStorageLogger = queueLogger.createSubLogger('objectStorage'); deliverQueue .on('waiting', (jobId) => deliverLogger.debug(`waiting id=${jobId}`)) - .on('active', (job) => deliverLogger.debug(`active id=${job.id} to=${job.data.to}`)) - .on('completed', (job, result) => deliverLogger.debug(`completed(${result}) id=${job.id} to=${job.data.to}`)) - .on('failed', (job, err) => deliverLogger.warn(`failed(${err}) id=${job.id} to=${job.data.to}`, { job, e: renderError(err) })) + .on('active', (job) => deliverLogger.debug(`active ${getJobInfo(job, true)} to=${job.data.to}`)) + .on('completed', (job, result) => deliverLogger.debug(`completed(${result}) ${getJobInfo(job, true)} to=${job.data.to}`)) + .on('failed', (job, err) => deliverLogger.warn(`failed(${err}) ${getJobInfo(job)} to=${job.data.to}`)) .on('error', (job: any, err: Error) => deliverLogger.error(`error ${err}`, { job, e: renderError(err) })) - .on('stalled', (job) => deliverLogger.warn(`stalled id=${job.id} to=${job.data.to}`)); + .on('stalled', (job) => deliverLogger.warn(`stalled ${getJobInfo(job)} to=${job.data.to}`)); inboxQueue .on('waiting', (jobId) => inboxLogger.debug(`waiting id=${jobId}`)) - .on('active', (job) => inboxLogger.debug(`active id=${job.id}`)) - .on('completed', (job, result) => inboxLogger.debug(`completed(${result}) id=${job.id}`)) - .on('failed', (job, err) => inboxLogger.warn(`failed(${err}) id=${job.id} activity=${job.data.activity ? job.data.activity.id : 'none'}`, { job, e: renderError(err) })) + .on('active', (job) => inboxLogger.debug(`active ${getJobInfo(job, true)}`)) + .on('completed', (job, result) => inboxLogger.debug(`completed(${result}) ${getJobInfo(job, true)}`)) + .on('failed', (job, err) => inboxLogger.warn(`failed(${err}) ${getJobInfo(job)} activity=${job.data.activity ? job.data.activity.id : 'none'}`, { job, e: renderError(err) })) .on('error', (job: any, err: Error) => inboxLogger.error(`error ${err}`, { job, e: renderError(err) })) - .on('stalled', (job) => inboxLogger.warn(`stalled id=${job.id} activity=${job.data.activity ? job.data.activity.id : 'none'}`)); + .on('stalled', (job) => inboxLogger.warn(`stalled ${getJobInfo(job)} activity=${job.data.activity ? job.data.activity.id : 'none'}`)); dbQueue .on('waiting', (jobId) => dbLogger.debug(`waiting id=${jobId}`)) diff --git a/src/queue/processors/deliver.ts b/src/queue/processors/deliver.ts index b252c2016..980ca3a43 100644 --- a/src/queue/processors/deliver.ts +++ b/src/queue/processors/deliver.ts @@ -5,6 +5,8 @@ import Logger from '../../services/logger'; import { Instances } from '../../models'; import { instanceChart } from '../../services/chart'; import { fetchNodeinfo } from '../../services/fetch-nodeinfo'; +import { fetchMeta } from '../../misc/fetch-meta'; +import { toPuny } from '../../misc/convert-host'; const logger = new Logger('deliver'); @@ -13,6 +15,23 @@ let latest: string | null = null; export default async (job: Bull.Job) => { const { host } = new URL(job.data.to); + // ブロックしてたら中断 + const meta = await fetchMeta(); + if (meta.blockedHosts.includes(toPuny(host))) { + return 'skip (blocked)'; + } + + // closedなら中断 + const closedHosts = await Instances.find({ + where: { + isMarkedAsClosed: true + }, + cache: 60 * 1000 + }); + if (closedHosts.map(x => x.host).includes(toPuny(host))) { + return 'skip (closed)'; + } + try { if (latest !== (latest = JSON.stringify(job.data.content, null, 2))) { logger.debug(`delivering ${latest}`); @@ -48,8 +67,6 @@ export default async (job: Bull.Job) => { }); if (res != null && res.hasOwnProperty('statusCode')) { - logger.warn(`deliver failed: ${res.statusCode} ${res.statusMessage} to=${job.data.to}`); - // 4xx if (res.statusCode >= 400 && res.statusCode < 500) { // HTTPステータスコード4xxはクライアントエラーであり、それはつまり @@ -61,7 +78,6 @@ export default async (job: Bull.Job) => { throw `${res.statusCode} ${res.statusMessage}`; } else { // DNS error, socket error, timeout ... - logger.warn(`deliver failed: ${res} to=${job.data.to}`); throw res; } } diff --git a/src/remote/activitypub/request.ts b/src/remote/activitypub/request.ts index a48bc1e3f..e4f301833 100644 --- a/src/remote/activitypub/request.ts +++ b/src/remote/activitypub/request.ts @@ -6,15 +6,10 @@ import * as cache from 'lookup-dns-cache'; import config from '../../config'; import { ILocalUser } from '../../models/entities/user'; import { publishApLogStream } from '../../services/stream'; -import { apLogger } from './logger'; -import { UserKeypairs, Instances } from '../../models'; -import { fetchMeta } from '../../misc/fetch-meta'; -import { toPuny } from '../../misc/convert-host'; +import { UserKeypairs } from '../../models'; import { ensure } from '../../prelude/ensure'; import * as httpsProxyAgent from 'https-proxy-agent'; -export const logger = apLogger.createSubLogger('deliver'); - const agent = config.proxy ? new httpsProxyAgent(config.proxy) : new https.Agent({ @@ -24,28 +19,7 @@ const agent = config.proxy export default async (user: ILocalUser, url: string, object: any) => { const timeout = 10 * 1000; - const { protocol, host, hostname, port, pathname, search } = new URL(url); - - // ブロックしてたら中断 - const meta = await fetchMeta(); - if (meta.blockedHosts.includes(toPuny(host))) { - logger.info(`skip (blocked) ${url}`); - return; - } - - // closedなら中断 - const closedHosts = await Instances.find({ - where: { - isMarkedAsClosed: true - }, - cache: 60 * 1000 - }); - if (closedHosts.map(x => x.host).includes(toPuny(host))) { - logger.info(`skip (closed) ${url}`); - return; - } - - logger.info(`--> ${url}`); + const { protocol, hostname, port, pathname, search } = new URL(url); const data = JSON.stringify(object); @@ -73,10 +47,8 @@ export default async (user: ILocalUser, url: string, object: any) => { } }, res => { if (res.statusCode! >= 400) { - logger.warn(`${url} --> ${res.statusCode}`); reject(res); } else { - logger.succ(`${url} --> ${res.statusCode}`); resolve(); } });