feat: allow upload of large files (#1862)
* Allow upload of large files * feat: createByteCountingStream utility function. --------- Co-authored-by: gpapp <gergely.papp@itworks.hu>
This commit is contained in:
parent
063ea99b66
commit
efb0a9317b
8 changed files with 93 additions and 32 deletions
|
|
@ -2,6 +2,7 @@ import * as path from 'path';
|
||||||
import * as bcrypt from 'bcrypt';
|
import * as bcrypt from 'bcrypt';
|
||||||
import { sanitize } from 'sanitize-filename-ts';
|
import { sanitize } from 'sanitize-filename-ts';
|
||||||
import { FastifyRequest } from 'fastify';
|
import { FastifyRequest } from 'fastify';
|
||||||
|
import { Readable, Transform } from 'stream';
|
||||||
|
|
||||||
export const envPath = path.resolve(process.cwd(), '..', '..', '.env');
|
export const envPath = path.resolve(process.cwd(), '..', '..', '.env');
|
||||||
|
|
||||||
|
|
@ -118,3 +119,18 @@ export function normalizePostgresUrl(url: string): string {
|
||||||
parsed.search = newParams.toString();
|
parsed.search = newParams.toString();
|
||||||
return parsed.toString();
|
return parsed.toString();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export function createByteCountingStream(source: Readable) {
|
||||||
|
let bytesRead = 0;
|
||||||
|
const stream = new Transform({
|
||||||
|
transform(chunk, encoding, callback) {
|
||||||
|
bytesRead += chunk.length;
|
||||||
|
callback(null, chunk);
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
source.pipe(stream);
|
||||||
|
source.on('error', (err) => stream.emit('error', err));
|
||||||
|
|
||||||
|
return { stream, getBytesRead: () => bytesRead };
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -5,15 +5,17 @@ import { sanitizeFileName } from '../../common/helpers';
|
||||||
import * as sharp from 'sharp';
|
import * as sharp from 'sharp';
|
||||||
|
|
||||||
export interface PreparedFile {
|
export interface PreparedFile {
|
||||||
buffer: Buffer;
|
buffer?: Buffer;
|
||||||
fileName: string;
|
fileName: string;
|
||||||
fileSize: number;
|
fileSize: number;
|
||||||
fileExtension: string;
|
fileExtension: string;
|
||||||
mimeType: string;
|
mimeType: string;
|
||||||
|
multiPartFile?: MultipartFile;
|
||||||
}
|
}
|
||||||
|
|
||||||
export async function prepareFile(
|
export async function prepareFile(
|
||||||
filePromise: Promise<MultipartFile>,
|
filePromise: Promise<MultipartFile>,
|
||||||
|
options: { skipBuffer?: boolean } = {},
|
||||||
): Promise<PreparedFile> {
|
): Promise<PreparedFile> {
|
||||||
const file = await filePromise;
|
const file = await filePromise;
|
||||||
|
|
||||||
|
|
@ -22,10 +24,16 @@ export async function prepareFile(
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
const buffer = await file.toBuffer();
|
let buffer: Buffer | undefined;
|
||||||
|
let fileSize = 0;
|
||||||
|
|
||||||
|
if (!options.skipBuffer) {
|
||||||
|
buffer = await file.toBuffer();
|
||||||
|
fileSize = buffer.length;
|
||||||
|
}
|
||||||
|
|
||||||
const sanitizedFilename = sanitizeFileName(file.filename);
|
const sanitizedFilename = sanitizeFileName(file.filename);
|
||||||
const fileName = sanitizedFilename.slice(0, 255);
|
const fileName = sanitizedFilename.slice(0, 255);
|
||||||
const fileSize = buffer.length;
|
|
||||||
const fileExtension = path.extname(file.filename).toLowerCase();
|
const fileExtension = path.extname(file.filename).toLowerCase();
|
||||||
|
|
||||||
return {
|
return {
|
||||||
|
|
@ -34,6 +42,7 @@ export async function prepareFile(
|
||||||
fileSize,
|
fileSize,
|
||||||
fileExtension,
|
fileExtension,
|
||||||
mimeType: file.mimetype,
|
mimeType: file.mimetype,
|
||||||
|
multiPartFile: file,
|
||||||
};
|
};
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
throw error;
|
throw error;
|
||||||
|
|
|
||||||
|
|
@ -4,6 +4,7 @@ import {
|
||||||
Logger,
|
Logger,
|
||||||
NotFoundException,
|
NotFoundException,
|
||||||
} from '@nestjs/common';
|
} from '@nestjs/common';
|
||||||
|
import { Readable } from 'stream';
|
||||||
import { StorageService } from '../../../integrations/storage/storage.service';
|
import { StorageService } from '../../../integrations/storage/storage.service';
|
||||||
import { MultipartFile } from '@fastify/multipart';
|
import { MultipartFile } from '@fastify/multipart';
|
||||||
import {
|
import {
|
||||||
|
|
@ -26,6 +27,7 @@ import { SpaceRepo } from '@docmost/db/repos/space/space.repo';
|
||||||
import { InjectQueue } from '@nestjs/bullmq';
|
import { InjectQueue } from '@nestjs/bullmq';
|
||||||
import { QueueJob, QueueName } from '../../../integrations/queue/constants';
|
import { QueueJob, QueueName } from '../../../integrations/queue/constants';
|
||||||
import { Queue } from 'bullmq';
|
import { Queue } from 'bullmq';
|
||||||
|
import { createByteCountingStream } from '../../../common/helpers/utils';
|
||||||
|
|
||||||
@Injectable()
|
@Injectable()
|
||||||
export class AttachmentService {
|
export class AttachmentService {
|
||||||
|
|
@ -49,7 +51,9 @@ export class AttachmentService {
|
||||||
attachmentId?: string;
|
attachmentId?: string;
|
||||||
}) {
|
}) {
|
||||||
const { filePromise, pageId, spaceId, userId, workspaceId } = opts;
|
const { filePromise, pageId, spaceId, userId, workspaceId } = opts;
|
||||||
const preparedFile: PreparedFile = await prepareFile(filePromise);
|
const preparedFile: PreparedFile = await prepareFile(filePromise, {
|
||||||
|
skipBuffer: true,
|
||||||
|
});
|
||||||
|
|
||||||
let isUpdate = false;
|
let isUpdate = false;
|
||||||
let attachmentId = null;
|
let attachmentId = null;
|
||||||
|
|
@ -81,7 +85,14 @@ export class AttachmentService {
|
||||||
|
|
||||||
const filePath = `${getAttachmentFolderPath(AttachmentType.File, workspaceId)}/${attachmentId}/${preparedFile.fileName}`;
|
const filePath = `${getAttachmentFolderPath(AttachmentType.File, workspaceId)}/${attachmentId}/${preparedFile.fileName}`;
|
||||||
|
|
||||||
await this.uploadToDrive(filePath, preparedFile.buffer);
|
const { stream, getBytesRead } = createByteCountingStream(
|
||||||
|
preparedFile.multiPartFile.file,
|
||||||
|
);
|
||||||
|
|
||||||
|
await this.uploadToDrive(filePath, stream);
|
||||||
|
|
||||||
|
// Update fileSize from the consumed stream
|
||||||
|
preparedFile.fileSize = getBytesRead();
|
||||||
|
|
||||||
let attachment: Attachment = null;
|
let attachment: Attachment = null;
|
||||||
try {
|
try {
|
||||||
|
|
@ -142,7 +153,10 @@ export class AttachmentService {
|
||||||
const preparedFile: PreparedFile = await prepareFile(filePromise);
|
const preparedFile: PreparedFile = await prepareFile(filePromise);
|
||||||
validateFileType(preparedFile.fileExtension, validImageExtensions);
|
validateFileType(preparedFile.fileExtension, validImageExtensions);
|
||||||
|
|
||||||
const processedBuffer = await compressAndResizeIcon(preparedFile.buffer, type);
|
const processedBuffer = await compressAndResizeIcon(
|
||||||
|
preparedFile.buffer,
|
||||||
|
type,
|
||||||
|
);
|
||||||
preparedFile.buffer = processedBuffer;
|
preparedFile.buffer = processedBuffer;
|
||||||
preparedFile.fileSize = processedBuffer.length;
|
preparedFile.fileSize = processedBuffer.length;
|
||||||
preparedFile.fileName = uuid4() + preparedFile.fileExtension;
|
preparedFile.fileName = uuid4() + preparedFile.fileExtension;
|
||||||
|
|
@ -232,9 +246,9 @@ export class AttachmentService {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async uploadToDrive(filePath: string, fileBuffer: any) {
|
async uploadToDrive(filePath: string, fileContent: Buffer | Readable) {
|
||||||
try {
|
try {
|
||||||
await this.storageService.upload(filePath, fileBuffer);
|
await this.storageService.upload(filePath, fileContent);
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
this.logger.error('Error uploading file to drive:', err);
|
this.logger.error('Error uploading file to drive:', err);
|
||||||
throw new BadRequestException('Error uploading file to drive');
|
throw new BadRequestException('Error uploading file to drive');
|
||||||
|
|
|
||||||
|
|
@ -10,7 +10,11 @@ import {
|
||||||
} from '../../../collaboration/collaboration.util';
|
} from '../../../collaboration/collaboration.util';
|
||||||
import { InjectKysely } from 'nestjs-kysely';
|
import { InjectKysely } from 'nestjs-kysely';
|
||||||
import { KyselyDB } from '@docmost/db/types/kysely.types';
|
import { KyselyDB } from '@docmost/db/types/kysely.types';
|
||||||
import { generateSlugId, sanitizeFileName } from '../../../common/helpers';
|
import {
|
||||||
|
generateSlugId,
|
||||||
|
sanitizeFileName,
|
||||||
|
createByteCountingStream,
|
||||||
|
} from '../../../common/helpers';
|
||||||
import { generateJitteredKeyBetween } from 'fractional-indexing-jittered';
|
import { generateJitteredKeyBetween } from 'fractional-indexing-jittered';
|
||||||
import { TiptapTransformer } from '@hocuspocus/transformer';
|
import { TiptapTransformer } from '@hocuspocus/transformer';
|
||||||
import * as Y from 'yjs';
|
import * as Y from 'yjs';
|
||||||
|
|
@ -173,15 +177,24 @@ export class ImportService {
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
async getNewPagePosition(spaceId: string): Promise<string> {
|
async getNewPagePosition(
|
||||||
const lastPage = await this.db
|
spaceId: string,
|
||||||
|
parentPageId?: string,
|
||||||
|
): Promise<string> {
|
||||||
|
let query = this.db
|
||||||
.selectFrom('pages')
|
.selectFrom('pages')
|
||||||
.select(['id', 'position'])
|
.select(['id', 'position'])
|
||||||
.where('spaceId', '=', spaceId)
|
.where('spaceId', '=', spaceId)
|
||||||
.orderBy('position', (ob) => ob.collate('C').desc())
|
.orderBy('position', (ob) => ob.collate('C').desc())
|
||||||
.limit(1)
|
.limit(1);
|
||||||
.where('parentPageId', 'is', null)
|
|
||||||
.executeTakeFirst();
|
if (parentPageId) {
|
||||||
|
query = query.where('parentPageId', '=', parentPageId);
|
||||||
|
} else {
|
||||||
|
query = query.where('parentPageId', 'is', null);
|
||||||
|
}
|
||||||
|
|
||||||
|
const lastPage = await query.executeTakeFirst();
|
||||||
|
|
||||||
if (lastPage) {
|
if (lastPage) {
|
||||||
return generateJitteredKeyBetween(lastPage.position, null);
|
return generateJitteredKeyBetween(lastPage.position, null);
|
||||||
|
|
@ -198,20 +211,21 @@ export class ImportService {
|
||||||
workspaceId: string,
|
workspaceId: string,
|
||||||
) {
|
) {
|
||||||
const file = await filePromise;
|
const file = await filePromise;
|
||||||
const fileBuffer = await file.toBuffer();
|
|
||||||
const fileExtension = path.extname(file.filename).toLowerCase();
|
const fileExtension = path.extname(file.filename).toLowerCase();
|
||||||
const fileName = sanitizeFileName(
|
const fileName = sanitizeFileName(
|
||||||
path.basename(file.filename, fileExtension),
|
path.basename(file.filename, fileExtension),
|
||||||
);
|
);
|
||||||
const fileSize = fileBuffer.length;
|
|
||||||
|
|
||||||
const fileNameWithExt = fileName + fileExtension;
|
const fileNameWithExt = fileName + fileExtension;
|
||||||
|
|
||||||
const fileTaskId = uuid7();
|
const fileTaskId = uuid7();
|
||||||
const filePath = `${getFileTaskFolderPath(FileTaskType.Import, workspaceId)}/${fileTaskId}/${fileNameWithExt}`;
|
const filePath = `${getFileTaskFolderPath(FileTaskType.Import, workspaceId)}/${fileTaskId}/${fileNameWithExt}`;
|
||||||
|
|
||||||
// upload file
|
// upload file
|
||||||
await this.storageService.upload(filePath, fileBuffer);
|
const { stream, getBytesRead } = createByteCountingStream(file.file);
|
||||||
|
|
||||||
|
await this.storageService.upload(filePath, stream);
|
||||||
|
|
||||||
|
const fileSize = getBytesRead();
|
||||||
|
|
||||||
const fileTask = await this.db
|
const fileTask = await this.db
|
||||||
.insertInto('fileTasks')
|
.insertInto('fileTasks')
|
||||||
|
|
|
||||||
|
|
@ -20,9 +20,15 @@ export class LocalDriver implements StorageDriver {
|
||||||
return join(this.config.storagePath, filePath);
|
return join(this.config.storagePath, filePath);
|
||||||
}
|
}
|
||||||
|
|
||||||
async upload(filePath: string, file: Buffer): Promise<void> {
|
async upload(filePath: string, file: Buffer | Readable): Promise<void> {
|
||||||
try {
|
try {
|
||||||
await fs.outputFile(this._fullPath(filePath), file);
|
const fullPath = this._fullPath(filePath);
|
||||||
|
if (file instanceof Buffer) {
|
||||||
|
await fs.outputFile(fullPath, file);
|
||||||
|
} else {
|
||||||
|
await fs.mkdir(dirname(fullPath), { recursive: true });
|
||||||
|
await pipeline(file, createWriteStream(fullPath));
|
||||||
|
}
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
throw new Error(`Failed to upload file: ${(err as Error).message}`);
|
throw new Error(`Failed to upload file: ${(err as Error).message}`);
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -23,19 +23,21 @@ export class S3Driver implements StorageDriver {
|
||||||
this.s3Client = new S3Client(config as any);
|
this.s3Client = new S3Client(config as any);
|
||||||
}
|
}
|
||||||
|
|
||||||
async upload(filePath: string, file: Buffer): Promise<void> {
|
async upload(filePath: string, file: Buffer | Readable): Promise<void> {
|
||||||
try {
|
try {
|
||||||
const contentType = getMimeType(filePath);
|
const contentType = getMimeType(filePath);
|
||||||
|
|
||||||
const command = new PutObjectCommand({
|
const upload = new Upload({
|
||||||
Bucket: this.config.bucket,
|
client: this.s3Client,
|
||||||
Key: filePath,
|
params: {
|
||||||
Body: file,
|
Bucket: this.config.bucket,
|
||||||
ContentType: contentType,
|
Key: filePath,
|
||||||
// ACL: "public-read",
|
Body: file,
|
||||||
|
ContentType: contentType,
|
||||||
|
},
|
||||||
});
|
});
|
||||||
|
|
||||||
await this.s3Client.send(command);
|
await upload.done();
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
throw new Error(`Failed to upload file: ${(err as Error).message}`);
|
throw new Error(`Failed to upload file: ${(err as Error).message}`);
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,7 +1,7 @@
|
||||||
import { Readable } from 'stream';
|
import { Readable } from 'stream';
|
||||||
|
|
||||||
export interface StorageDriver {
|
export interface StorageDriver {
|
||||||
upload(filePath: string, file: Buffer): Promise<void>;
|
upload(filePath: string, file: Buffer | Readable): Promise<void>;
|
||||||
|
|
||||||
uploadStream(filePath: string, file: Readable, options?: { recreateClient?: boolean }): Promise<void>;
|
uploadStream(filePath: string, file: Readable, options?: { recreateClient?: boolean }): Promise<void>;
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -8,9 +8,9 @@ export class StorageService {
|
||||||
private readonly logger = new Logger(StorageService.name);
|
private readonly logger = new Logger(StorageService.name);
|
||||||
constructor(
|
constructor(
|
||||||
@Inject(STORAGE_DRIVER_TOKEN) private storageDriver: StorageDriver,
|
@Inject(STORAGE_DRIVER_TOKEN) private storageDriver: StorageDriver,
|
||||||
) {}
|
) { }
|
||||||
|
|
||||||
async upload(filePath: string, fileContent: Buffer | any) {
|
async upload(filePath: string, fileContent: Buffer | Readable) {
|
||||||
await this.storageDriver.upload(filePath, fileContent);
|
await this.storageDriver.upload(filePath, fileContent);
|
||||||
this.logger.debug(`File uploaded successfully. Path: ${filePath}`);
|
this.logger.debug(`File uploaded successfully. Path: ${filePath}`);
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue