- sync
- reinstantiate S3 client to fix file upload errors during import - delete import zip file after use
This commit is contained in:
parent
8e16ad952a
commit
f413720e15
8 changed files with 68 additions and 13 deletions
|
|
@ -1 +1 @@
|
||||||
Subproject commit d03a6a3f2de77df4447b56135e1600243bd67173
|
Subproject commit fd34d4183aaae765b27f95e49830c6ff2ac9aa1f
|
||||||
|
|
@ -41,15 +41,32 @@ export class FileTaskProcessor extends WorkerHost implements OnModuleDestroy {
|
||||||
@OnWorkerEvent('failed')
|
@OnWorkerEvent('failed')
|
||||||
async onFailed(job: Job) {
|
async onFailed(job: Job) {
|
||||||
this.logger.error(
|
this.logger.error(
|
||||||
`Error processing ${job.name} job. Reason: ${job.failedReason}`,
|
`Error processing ${job.name} job. Import Task ID: ${job.data.fileTaskId}. Reason: ${job.failedReason}`,
|
||||||
);
|
);
|
||||||
|
|
||||||
|
await this.handleFailedJob(job);
|
||||||
|
}
|
||||||
|
|
||||||
|
@OnWorkerEvent('stalled')
|
||||||
|
async onStalled(job: Job) {
|
||||||
|
this.logger.error(
|
||||||
|
`Job ${job.name} stalled. . Import Task ID: ${job.data.fileTaskId}.. Job ID: ${job.id}`,
|
||||||
|
);
|
||||||
|
|
||||||
|
// Set failedReason for stalled jobs since it's not automatically set
|
||||||
|
job.failedReason = 'Job stalled and was marked as failed';
|
||||||
|
await this.handleFailedJob(job);
|
||||||
|
}
|
||||||
|
|
||||||
|
private async handleFailedJob(job: Job) {
|
||||||
try {
|
try {
|
||||||
const fileTaskId = job.data.fileTaskId;
|
const fileTaskId = job.data.fileTaskId;
|
||||||
|
const reason = job.failedReason || 'Unknown error';
|
||||||
|
|
||||||
await this.fileTaskService.updateTaskStatus(
|
await this.fileTaskService.updateTaskStatus(
|
||||||
fileTaskId,
|
fileTaskId,
|
||||||
FileTaskStatus.Failed,
|
FileTaskStatus.Failed,
|
||||||
job.failedReason,
|
reason,
|
||||||
);
|
);
|
||||||
|
|
||||||
const fileTask = await this.fileTaskService.getFileTask(fileTaskId);
|
const fileTask = await this.fileTaskService.getFileTask(fileTaskId);
|
||||||
|
|
@ -62,10 +79,22 @@ export class FileTaskProcessor extends WorkerHost implements OnModuleDestroy {
|
||||||
}
|
}
|
||||||
|
|
||||||
@OnWorkerEvent('completed')
|
@OnWorkerEvent('completed')
|
||||||
onCompleted(job: Job) {
|
async onCompleted(job: Job) {
|
||||||
this.logger.log(
|
this.logger.log(
|
||||||
`Completed ${job.name} job for File task ID ${job.data.fileTaskId}`,
|
`Completed ${job.name} job for File task ID ${job.data.fileTaskId}`,
|
||||||
);
|
);
|
||||||
|
|
||||||
|
try {
|
||||||
|
const fileTask = await this.fileTaskService.getFileTask(
|
||||||
|
job.data.fileTaskId,
|
||||||
|
);
|
||||||
|
if (fileTask) {
|
||||||
|
await this.storageService.delete(fileTask.filePath);
|
||||||
|
this.logger.debug(`Deleted imported zip file: ${fileTask.filePath}`);
|
||||||
|
}
|
||||||
|
} catch (err) {
|
||||||
|
this.logger.error(`Failed to delete imported zip file:`, err);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async onModuleDestroy(): Promise<void> {
|
async onModuleDestroy(): Promise<void> {
|
||||||
|
|
|
||||||
|
|
@ -35,7 +35,7 @@ interface DrawioPair {
|
||||||
@Injectable()
|
@Injectable()
|
||||||
export class ImportAttachmentService {
|
export class ImportAttachmentService {
|
||||||
private readonly logger = new Logger(ImportAttachmentService.name);
|
private readonly logger = new Logger(ImportAttachmentService.name);
|
||||||
private readonly CONCURRENT_UPLOADS = 5;
|
private readonly CONCURRENT_UPLOADS = 1;
|
||||||
private readonly MAX_RETRIES = 2;
|
private readonly MAX_RETRIES = 2;
|
||||||
private readonly RETRY_DELAY = 2000;
|
private readonly RETRY_DELAY = 2000;
|
||||||
|
|
||||||
|
|
@ -139,7 +139,9 @@ export class ImportAttachmentService {
|
||||||
const stream = Readable.from(svgBuffer);
|
const stream = Readable.from(svgBuffer);
|
||||||
|
|
||||||
// Upload to storage
|
// Upload to storage
|
||||||
await this.storageService.uploadStream(storageFilePath, stream);
|
await this.storageService.uploadStream(storageFilePath, stream, {
|
||||||
|
recreateClient: true,
|
||||||
|
});
|
||||||
|
|
||||||
// Insert into database
|
// Insert into database
|
||||||
await this.db
|
await this.db
|
||||||
|
|
@ -802,7 +804,10 @@ export class ImportAttachmentService {
|
||||||
for (let attempt = 1; attempt <= this.MAX_RETRIES; attempt++) {
|
for (let attempt = 1; attempt <= this.MAX_RETRIES; attempt++) {
|
||||||
try {
|
try {
|
||||||
const fileStream = createReadStream(abs);
|
const fileStream = createReadStream(abs);
|
||||||
await this.storageService.uploadStream(storageFilePath, fileStream);
|
await this.storageService.uploadStream(storageFilePath, fileStream, {
|
||||||
|
recreateClient: true,
|
||||||
|
});
|
||||||
|
|
||||||
const stat = await fs.stat(abs);
|
const stat = await fs.stat(abs);
|
||||||
|
|
||||||
await this.db
|
await this.db
|
||||||
|
|
|
||||||
|
|
@ -214,6 +214,9 @@ export function notionFormatter($: CheerioAPI, $root: Cheerio<any>) {
|
||||||
$fig.replaceWith($newAnchor);
|
$fig.replaceWith($newAnchor);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// remove user icons
|
||||||
|
$root.find('span.user img.user-icon').remove();
|
||||||
|
|
||||||
// remove toc
|
// remove toc
|
||||||
$root.find('nav.table_of_contents').remove();
|
$root.find('nav.table_of_contents').remove();
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -28,7 +28,7 @@ export class LocalDriver implements StorageDriver {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async uploadStream(filePath: string, file: Readable): Promise<void> {
|
async uploadStream(filePath: string, file: Readable, options?: { recreateClient?: boolean }): Promise<void> {
|
||||||
try {
|
try {
|
||||||
const fullPath = this._fullPath(filePath);
|
const fullPath = this._fullPath(filePath);
|
||||||
await fs.mkdir(dirname(fullPath), { recursive: true });
|
await fs.mkdir(dirname(fullPath), { recursive: true });
|
||||||
|
|
|
||||||
|
|
@ -41,12 +41,26 @@ export class S3Driver implements StorageDriver {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async uploadStream(filePath: string, file: Readable): Promise<void> {
|
async uploadStream(
|
||||||
|
filePath: string,
|
||||||
|
file: Readable,
|
||||||
|
options?: { recreateClient?: boolean },
|
||||||
|
): Promise<void> {
|
||||||
|
let clientToUse = this.s3Client;
|
||||||
|
let shouldDestroyClient = false;
|
||||||
|
|
||||||
|
// optionally recreate client to avoid socket hang errors
|
||||||
|
// (during multi-attachments imports)
|
||||||
|
if (options?.recreateClient) {
|
||||||
|
clientToUse = new S3Client(this.config as any);
|
||||||
|
shouldDestroyClient = true;
|
||||||
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
const contentType = getMimeType(filePath);
|
const contentType = getMimeType(filePath);
|
||||||
|
|
||||||
const upload = new Upload({
|
const upload = new Upload({
|
||||||
client: this.s3Client,
|
client: clientToUse,
|
||||||
params: {
|
params: {
|
||||||
Bucket: this.config.bucket,
|
Bucket: this.config.bucket,
|
||||||
Key: filePath,
|
Key: filePath,
|
||||||
|
|
@ -58,6 +72,10 @@ export class S3Driver implements StorageDriver {
|
||||||
await upload.done();
|
await upload.done();
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
throw new Error(`Failed to upload file: ${(err as Error).message}`);
|
throw new Error(`Failed to upload file: ${(err as Error).message}`);
|
||||||
|
} finally {
|
||||||
|
if (shouldDestroyClient && clientToUse) {
|
||||||
|
clientToUse.destroy();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -3,7 +3,7 @@ import { Readable } from 'stream';
|
||||||
export interface StorageDriver {
|
export interface StorageDriver {
|
||||||
upload(filePath: string, file: Buffer): Promise<void>;
|
upload(filePath: string, file: Buffer): Promise<void>;
|
||||||
|
|
||||||
uploadStream(filePath: string, file: Readable): Promise<void>;
|
uploadStream(filePath: string, file: Readable, options?: { recreateClient?: boolean }): Promise<void>;
|
||||||
|
|
||||||
copy(fromFilePath: string, toFilePath: string): Promise<void>;
|
copy(fromFilePath: string, toFilePath: string): Promise<void>;
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -15,8 +15,8 @@ export class StorageService {
|
||||||
this.logger.debug(`File uploaded successfully. Path: ${filePath}`);
|
this.logger.debug(`File uploaded successfully. Path: ${filePath}`);
|
||||||
}
|
}
|
||||||
|
|
||||||
async uploadStream(filePath: string, fileContent: Readable) {
|
async uploadStream(filePath: string, fileContent: Readable, options?: { recreateClient?: boolean }) {
|
||||||
await this.storageDriver.uploadStream(filePath, fileContent);
|
await this.storageDriver.uploadStream(filePath, fileContent, options);
|
||||||
this.logger.debug(`File uploaded successfully. Path: ${filePath}`);
|
this.logger.debug(`File uploaded successfully. Path: ${filePath}`);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue