Merge pull request #913 from syuilo/write-file-use-stream

ファイル書き込みをバッファを使用しない形にする
This commit is contained in:
syuilo 2017-11-14 05:27:08 +09:00 committed by GitHub
commit 21e86e28a8
4 changed files with 242 additions and 153 deletions

View file

@ -54,13 +54,14 @@
"@types/node": "8.0.49", "@types/node": "8.0.49",
"@types/page": "1.5.32", "@types/page": "1.5.32",
"@types/proxy-addr": "2.0.0", "@types/proxy-addr": "2.0.0",
"@types/seedrandom": "2.4.27",
"@types/ratelimiter": "2.1.28", "@types/ratelimiter": "2.1.28",
"@types/redis": "2.8.1", "@types/redis": "2.8.1",
"@types/request": "2.0.7", "@types/request": "^2.0.7",
"@types/rimraf": "2.0.2", "@types/rimraf": "2.0.2",
"@types/riot": "3.6.1", "@types/riot": "3.6.1",
"@types/seedrandom": "2.4.27",
"@types/serve-favicon": "2.2.29", "@types/serve-favicon": "2.2.29",
"@types/tmp": "0.0.33",
"@types/uuid": "3.4.3", "@types/uuid": "3.4.3",
"@types/webpack": "3.8.0", "@types/webpack": "3.8.0",
"@types/webpack-stream": "3.2.8", "@types/webpack-stream": "3.2.8",
@ -111,7 +112,6 @@
"deep-equal": "1.0.1", "deep-equal": "1.0.1",
"deepcopy": "0.6.3", "deepcopy": "0.6.3",
"diskusage": "0.2.2", "diskusage": "0.2.2",
"download": "6.2.5",
"elasticsearch": "13.3.1", "elasticsearch": "13.3.1",
"escape-regexp": "0.0.1", "escape-regexp": "0.0.1",
"express": "4.15.4", "express": "4.15.4",
@ -140,7 +140,7 @@
"recaptcha-promise": "0.1.3", "recaptcha-promise": "0.1.3",
"reconnecting-websocket": "3.2.2", "reconnecting-websocket": "3.2.2",
"redis": "2.8.0", "redis": "2.8.0",
"request": "2.83.0", "request": "^2.83.0",
"rimraf": "2.6.2", "rimraf": "2.6.2",
"riot": "3.7.4", "riot": "3.7.4",
"rndstr": "1.0.0", "rndstr": "1.0.0",
@ -152,6 +152,7 @@
"syuilo-password-strength": "0.0.1", "syuilo-password-strength": "0.0.1",
"tcp-port-used": "0.1.2", "tcp-port-used": "0.1.2",
"textarea-caret": "3.0.2", "textarea-caret": "3.0.2",
"tmp": "0.0.33",
"ts-node": "3.3.0", "ts-node": "3.3.0",
"typescript": "2.6.1", "typescript": "2.6.1",
"uuid": "3.1.0", "uuid": "3.1.0",

View file

@ -9,73 +9,83 @@ import DriveFolder from '../models/drive-folder';
import serialize from '../serializers/drive-file'; import serialize from '../serializers/drive-file';
import event from '../event'; import event from '../event';
import config from '../../conf'; import config from '../../conf';
import { Duplex } from 'stream'; import { Buffer } from 'buffer';
import * as fs from 'fs';
import * as tmp from 'tmp';
import * as stream from 'stream';
const log = debug('misskey:register-drive-file'); const log = debug('misskey:register-drive-file');
const addToGridFS = (name, binary, type, metadata): Promise<any> => new Promise(async (resolve, reject) => { const tmpFile = (): Promise<string> => new Promise((resolve, reject) => {
const dataStream = new Duplex(); tmp.file((e, path) => {
dataStream.push(binary); if (e) return reject(e);
dataStream.push(null); resolve(path);
});
const bucket = await getGridFSBucket();
const writeStream = bucket.openUploadStream(name, { contentType: type, metadata });
writeStream.once('finish', (doc) => { resolve(doc); });
writeStream.on('error', reject);
dataStream.pipe(writeStream);
}); });
/** const addToGridFS = (name: string, readable: stream.Readable, type: string, metadata: any): Promise<any> =>
* Add file to drive getGridFSBucket()
* .then(bucket => new Promise((resolve, reject) => {
* @param user User who wish to add file const writeStream = bucket.openUploadStream(name, { contentType: type, metadata });
* @param fileName File name writeStream.once('finish', (doc) => { resolve(doc); });
* @param data Contents writeStream.on('error', reject);
* @param comment Comment readable.pipe(writeStream);
* @param type File type }));
* @param folderId Folder ID
* @param force If set to true, forcibly upload the file even if there is a file with the same hash. const addFile = async (
* @return Object that represents added file
*/
export default (
user: any, user: any,
data: Buffer, path: string,
name: string = null, name: string = null,
comment: string = null, comment: string = null,
folderId: mongodb.ObjectID = null, folderId: mongodb.ObjectID = null,
force: boolean = false force: boolean = false
) => new Promise<any>(async (resolve, reject) => { ) => {
log(`registering ${name} (user: ${user.username})`); log(`registering ${name} (user: ${user.username})`);
// File size // Calculate hash, get content type and get file size
const size = data.byteLength; const [hash, [mime, ext], size] = await Promise.all([
// hash
((): Promise<string> => new Promise((res, rej) => {
const readable = fs.createReadStream(path);
const hash = crypto.createHash('md5');
const chunks = [];
readable
.on('error', rej)
.pipe(hash)
.on('error', rej)
.on('data', (chunk) => chunks.push(chunk))
.on('end', () => {
const buffer = Buffer.concat(chunks);
res(buffer.toString('hex'));
});
}))(),
// mime
((): Promise<[string, string | null]> => new Promise((res, rej) => {
const readable = fs.createReadStream(path);
readable
.on('error', rej)
.once('data', (buffer: Buffer) => {
readable.destroy();
const type = fileType(buffer);
if (!type) {
return res(['application/octet-stream', null]);
}
return res([type.mime, type.ext]);
});
}))(),
// size
((): Promise<number> => new Promise((res, rej) => {
fs.stat(path, (err, stats) => {
if (err) return rej(err);
res(stats.size);
});
}))()
]);
log(`size is ${size}`); log(`hash: ${hash}, mime: ${mime}, ext: ${ext}, size: ${size}`);
// File type // detect name
let mime = 'application/octet-stream'; const detectedName: string = name || (ext ? `untitled.${ext}` : 'untitled');
const type = fileType(data);
if (type !== null) {
mime = type.mime;
if (name === null) {
name = `untitled.${type.ext}`;
}
} else {
if (name === null) {
name = 'untitled';
}
}
log(`type is ${mime}`);
// Generate hash
const hash = crypto
.createHash('md5')
.update(data)
.digest('hex') as string;
log(`hash is ${hash}`);
if (!force) { if (!force) {
// Check if there is a file with the same hash // Check if there is a file with the same hash
@ -86,92 +96,155 @@ export default (
if (much !== null) { if (much !== null) {
log('file with same hash is found'); log('file with same hash is found');
return resolve(much); return much;
} else { } else {
log('file with same hash is not found'); log('file with same hash is not found');
} }
} }
// Calculate drive usage const [properties, folder] = await Promise.all([
const usage = ((await DriveFile // properties
.aggregate([ (async () => {
{ $match: { 'metadata.user_id': user._id } }, if (!/^image\/.*$/.test(mime)) {
{ $project: { return null;
length: true }
}}, // If the file is an image, calculate width and height to save in property
{ $group: { const g = gm(fs.createReadStream(path), name);
_id: null, const size = await prominence(g).size();
usage: { $sum: '$length' } const properties = {
}} width: size.width,
]))[0] || { height: size.height
usage: 0 };
}).usage; log('image width and height is calculated');
return properties;
log(`drive usage is ${usage}`); })(),
// folder
// If usage limit exceeded (async () => {
if (usage + size > user.drive_capacity) { if (!folderId) {
return reject('no-free-space'); return null;
} }
const driveFolder = await DriveFolder.findOne({
// If the folder is specified
let folder: any = null;
if (folderId !== null) {
folder = await DriveFolder
.findOne({
_id: folderId, _id: folderId,
user_id: user._id user_id: user._id
}); });
if (!driveFolder) {
throw 'folder-not-found';
}
return driveFolder;
})(),
// usage checker
(async () => {
// Calculate drive usage
const usage = await DriveFile
.aggregate([
{ $match: { 'metadata.user_id': user._id } },
{
$project: {
length: true
}
},
{
$group: {
_id: null,
usage: { $sum: '$length' }
}
}
])
.then((aggregates: any[]) => {
if (aggregates.length > 0) {
return aggregates[0].usage;
}
return 0;
});
if (folder === null) { log(`drive usage is ${usage}`);
return reject('folder-not-found');
}
}
let properties: any = null; // If usage limit exceeded
if (usage + size > user.drive_capacity) {
throw 'no-free-space';
}
})()
]);
// If the file is an image const readable = fs.createReadStream(path);
if (/^image\/.*$/.test(mime)) {
// Calculate width and height to save in property
const g = gm(data, name);
const size = await prominence(g).size();
properties = {
width: size.width,
height: size.height
};
log('image width and height is calculated'); return addToGridFS(detectedName, readable, mime, {
}
// Create DriveFile document
const file = await addToGridFS(name, data, mime, {
user_id: user._id, user_id: user._id,
folder_id: folder !== null ? folder._id : null, folder_id: folder !== null ? folder._id : null,
comment: comment, comment: comment,
properties: properties properties: properties
}); });
};
log(`drive file has been created ${file._id}`); /**
* Add file to drive
*
* @param user User who wish to add file
* @param file File path or readableStream
* @param comment Comment
* @param type File type
* @param folderId Folder ID
* @param force If set to true, forcibly upload the file even if there is a file with the same hash.
* @return Object that represents added file
*/
export default (user: any, file: string | stream.Readable, ...args) => new Promise<any>((resolve, reject) => {
// Get file path
new Promise((res: (v: [string, boolean]) => void, rej) => {
if (typeof file === 'string') {
res([file, false]);
return;
}
if (typeof file === 'object' && typeof file.read === 'function') {
tmpFile()
.then(path => {
const readable: stream.Readable = file;
const writable = fs.createWriteStream(path);
readable
.on('error', rej)
.on('end', () => {
res([path, true]);
})
.pipe(writable)
.on('error', rej);
})
.catch(rej);
}
rej(new Error('un-compatible file.'));
}).then(([path, remove]): Promise<any> => new Promise((res, rej) => {
addFile(user, path, ...args)
.then(file => {
res(file);
if (remove) {
fs.unlink(path, (e) => {
if (e) log(e.stack);
});
}
})
.catch(rej);
}))
.then(file => {
log(`drive file has been created ${file._id}`);
resolve(file);
resolve(file); serialize(file)
.then(serializedFile => {
// Publish drive_file_created event
event(user._id, 'drive_file_created', serializedFile);
// Serialize // Register to search database
const fileObj = await serialize(file); if (config.elasticsearch.enable) {
const es = require('../../db/elasticsearch');
// Publish drive_file_created event es.index({
event(user._id, 'drive_file_created', fileObj); index: 'misskey',
type: 'drive_file',
// Register to search database id: file._id.toString(),
if (config.elasticsearch.enable) { body: {
const es = require('../../db/elasticsearch'); name: file.name,
es.index({ user_id: user._id.toString()
index: 'misskey', }
type: 'drive_file', });
id: file._id.toString(), }
body: { });
name: file.name, })
user_id: user._id.toString() .catch(reject);
}
});
}
}); });

View file

@ -1,7 +1,6 @@
/** /**
* Module dependencies * Module dependencies
*/ */
import * as fs from 'fs';
import $ from 'cafy'; import $ from 'cafy';
import { validateFileName } from '../../../models/drive-file'; import { validateFileName } from '../../../models/drive-file';
import serialize from '../../../serializers/drive-file'; import serialize from '../../../serializers/drive-file';
@ -15,15 +14,11 @@ import create from '../../../common/add-file-to-drive';
* @param {any} user * @param {any} user
* @return {Promise<any>} * @return {Promise<any>}
*/ */
module.exports = (file, params, user) => new Promise(async (res, rej) => { module.exports = async (file, params, user): Promise<any> => {
if (file == null) { if (file == null) {
return rej('file is required'); throw 'file is required';
} }
// TODO: 非同期にしたい。Promise対応してないんだろうか...
const buffer = fs.readFileSync(file.path);
fs.unlink(file.path, (err) => { if (err) console.log(err); });
// Get 'name' parameter // Get 'name' parameter
let name = file.originalname; let name = file.originalname;
if (name !== undefined && name !== null) { if (name !== undefined && name !== null) {
@ -33,7 +28,7 @@ module.exports = (file, params, user) => new Promise(async (res, rej) => {
} else if (name === 'blob') { } else if (name === 'blob') {
name = null; name = null;
} else if (!validateFileName(name)) { } else if (!validateFileName(name)) {
return rej('invalid name'); throw 'invalid name';
} }
} else { } else {
name = null; name = null;
@ -41,14 +36,11 @@ module.exports = (file, params, user) => new Promise(async (res, rej) => {
// Get 'folder_id' parameter // Get 'folder_id' parameter
const [folderId = null, folderIdErr] = $(params.folder_id).optional.nullable.id().$; const [folderId = null, folderIdErr] = $(params.folder_id).optional.nullable.id().$;
if (folderIdErr) return rej('invalid folder_id param'); if (folderIdErr) throw 'invalid folder_id param';
// Create file // Create file
const driveFile = await create(user, buffer, name, null, folderId); const driveFile = await create(user, file.path, name, null, folderId);
// Serialize // Serialize
const fileObj = await serialize(driveFile); return serialize(driveFile);
};
// Response
res(fileObj);
});

View file

@ -2,11 +2,16 @@
* Module dependencies * Module dependencies
*/ */
import * as URL from 'url'; import * as URL from 'url';
const download = require('download');
import $ from 'cafy'; import $ from 'cafy';
import { validateFileName } from '../../../models/drive-file'; import { validateFileName } from '../../../models/drive-file';
import serialize from '../../../serializers/drive-file'; import serialize from '../../../serializers/drive-file';
import create from '../../../common/add-file-to-drive'; import create from '../../../common/add-file-to-drive';
import * as debug from 'debug';
import * as tmp from 'tmp';
import * as fs from 'fs';
import * as request from 'request';
const log = debug('misskey:endpoint:upload_from_url');
/** /**
* Create a file from a URL * Create a file from a URL
@ -15,11 +20,11 @@ import create from '../../../common/add-file-to-drive';
* @param {any} user * @param {any} user
* @return {Promise<any>} * @return {Promise<any>}
*/ */
module.exports = (params, user) => new Promise(async (res, rej) => { module.exports = async (params, user): Promise<any> => {
// Get 'url' parameter // Get 'url' parameter
// TODO: Validate this url // TODO: Validate this url
const [url, urlErr] = $(params.url).string().$; const [url, urlErr] = $(params.url).string().$;
if (urlErr) return rej('invalid url param'); if (urlErr) throw 'invalid url param';
let name = URL.parse(url).pathname.split('/').pop(); let name = URL.parse(url).pathname.split('/').pop();
if (!validateFileName(name)) { if (!validateFileName(name)) {
@ -28,17 +33,35 @@ module.exports = (params, user) => new Promise(async (res, rej) => {
// Get 'folder_id' parameter // Get 'folder_id' parameter
const [folderId = null, folderIdErr] = $(params.folder_id).optional.nullable.id().$; const [folderId = null, folderIdErr] = $(params.folder_id).optional.nullable.id().$;
if (folderIdErr) return rej('invalid folder_id param'); if (folderIdErr) throw 'invalid folder_id param';
// Download file // Create temp file
const data = await download(url); const path = await new Promise((res: (string) => void, rej) => {
tmp.file((e, path) => {
if (e) return rej(e);
res(path);
});
});
// Create file // write content at URL to temp file
const driveFile = await create(user, data, name, null, folderId); await new Promise((res, rej) => {
const writable = fs.createWriteStream(path);
request(url)
.on('error', rej)
.on('end', () => {
writable.close();
res(path);
})
.pipe(writable)
.on('error', rej);
});
// Serialize const driveFile = await create(user, path, name, null, folderId);
const fileObj = await serialize(driveFile);
// Response // clean-up
res(fileObj); fs.unlink(path, (e) => {
}); if (e) log(e.stack);
});
return serialize(driveFile);
};