FEATURE: Local chunked uppy backup uploads with a new uploader plugin (#14894)

This takes the uppy chunking algorithm and combines it with some
form submission from resumable.js for parity with the current
backup controller to make local backup uploads work with uppy.
We can then use this to replace the resumable-upload component
and the resumable.js library from our codebase, once stable.

This is disabled by default, so people using local backups will not
be affected. The enable_experimental_backup_uploader site setting
must be enabled for this to work.
This commit is contained in:
Martin Brennan 2021-11-23 08:45:42 +10:00 committed by GitHub
parent 377c8d9c8b
commit 49c49e8ae0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 688 additions and 49 deletions

View File

@ -12,9 +12,6 @@ export default Controller.extend({
uploadLabel: i18n("admin.backups.upload.label"),
backupLocation: setting("backup_location"),
localBackupStorage: equal("backupLocation", "local"),
enableExperimentalBackupUploader: setting(
"enable_experimental_backup_uploader"
),
@discourseComputed("status.allowRestore", "status.isOperationRunning")
restoreTitle(allowRestore, isOperationRunning) {

View File

@ -1,5 +1,8 @@
<div class="backup-options">
{{#if localBackupStorage}}
{{#if siteSettings.enable_experimental_backup_uploader}}
{{uppy-backup-uploader done=(route-action "uploadSuccess") localBackupStorage=localBackupStorage}}
{{else}}
{{resumable-upload
target="/admin/backups/upload"
success=(route-action "uploadSuccess")
@ -7,8 +10,9 @@
uploadText=uploadLabel
title="admin.backups.upload.title"
class="btn-default"}}
{{/if}}
{{else}}
{{#if enableExperimentalBackupUploader}}
{{#if (and siteSettings.enable_direct_s3_uploads siteSettings.enable_experimental_backup_uploader)}}
{{uppy-backup-uploader done=(route-action "remoteUploadSuccess")}}
{{else}}
{{backup-uploader done=(route-action "remoteUploadSuccess")}}

View File

@ -45,3 +45,19 @@ define("@uppy/xhr-upload", ["exports"], function (__exports__) {
define("@uppy/drop-target", ["exports"], function (__exports__) {
__exports__.default = window.Uppy.DropTarget;
});
define("@uppy/utils/lib/delay", ["exports"], function (__exports__) {
__exports__.default = window.Uppy.Utils.delay;
});
define("@uppy/utils/lib/EventTracker", ["exports"], function (__exports__) {
__exports__.default = window.Uppy.Utils.EventTracker;
});
define("@uppy/utils/lib/AbortController", ["exports"], function (__exports__) {
__exports__.AbortController =
window.Uppy.Utils.AbortControllerLib.AbortController;
__exports__.AbortSignal = window.Uppy.Utils.AbortControllerLib.AbortSignal;
__exports__.createAbortError =
window.Uppy.Utils.AbortControllerLib.createAbortError;
});

View File

@ -1,13 +1,29 @@
import Component from "@ember/component";
import { alias, not } from "@ember/object/computed";
import I18n from "I18n";
import UppyUploadMixin from "discourse/mixins/uppy-upload";
import discourseComputed from "discourse-common/utils/decorators";
export default Component.extend(UppyUploadMixin, {
id: "uppy-backup-uploader",
tagName: "span",
type: "backup",
useMultipartUploadsIfAvailable: true,
uploadRootPath: "/admin/backups",
uploadUrl: "/admin/backups/upload",
// TODO (martin) Add functionality to make this usable _without_ multipart
// uploads, direct to S3, which needs to call get-presigned-put on the
// BackupsController (which extends ExternalUploadHelpers) rather than
// the old create_upload_url route. The two are functionally equivalent;
// they both generate a presigned PUT url for the upload to S3, and do
// the whole thing in one request rather than multipart.
// direct s3 backups
useMultipartUploadsIfAvailable: not("localBackupStorage"),
// local backups
useChunkedUploads: alias("localBackupStorage"),
@discourseComputed("uploading", "uploadProgress")
uploadButtonText(uploading, progress) {
@ -20,7 +36,7 @@ export default Component.extend(UppyUploadMixin, {
return { skipValidation: true };
},
uploadDone() {
this.done();
uploadDone(responseData) {
this.done(responseData.file_name);
},
});

View File

@ -0,0 +1,339 @@
import { Promise } from "rsvp";
import delay from "@uppy/utils/lib/delay";
import {
AbortController,
createAbortError,
} from "@uppy/utils/lib/AbortController";
const MB = 1024 * 1024;
const defaultOptions = {
limit: 5,
retryDelays: [0, 1000, 3000, 5000],
getChunkSize() {
return 5 * MB;
},
onStart() {},
onProgress() {},
onChunkComplete() {},
onSuccess() {},
onError(err) {
throw err;
},
};
/**
* Used mainly as a replacement for Resumable.js, using code cribbed from
* uppy's S3 Multipart class, which we mainly use the chunking algorithm
* and retry/abort functions of. The _buildFormData function is the one
* which shapes the data into the same parameters as Resumable.js used.
*
* See the UppyChunkedUploader class for the uppy uploader plugin which
* uses UppyChunkedUpload.
*/
export default class UppyChunkedUpload {
constructor(file, options) {
this.options = {
...defaultOptions,
...options,
};
this.file = file;
if (!this.options.getChunkSize) {
this.options.getChunkSize = defaultOptions.getChunkSize;
this.chunkSize = this.options.getChunkSize(this.file);
}
this.abortController = new AbortController();
this._initChunks();
}
_aborted() {
return this.abortController.signal.aborted;
}
_initChunks() {
this.chunksInProgress = 0;
this.chunks = null;
this.chunkState = null;
const chunks = [];
if (this.file.size === 0) {
chunks.push(this.file.data);
} else {
for (let i = 0; i < this.file.data.size; i += this.chunkSize) {
const end = Math.min(this.file.data.size, i + this.chunkSize);
chunks.push(this.file.data.slice(i, end));
}
}
this.chunks = chunks;
this.chunkState = chunks.map(() => ({
bytesUploaded: 0,
busy: false,
done: false,
}));
}
_createUpload() {
if (this._aborted()) {
throw createAbortError();
}
this.options.onStart();
this._uploadChunks();
}
_uploadChunks() {
if (this.chunkState.every((state) => state.done)) {
this._completeUpload();
return;
}
// For a 100MB file, with the default min chunk size of 5MB and a limit of 10:
//
// Total 20 chunks
// ---------
// Need 1 is 10
// Need 2 is 5
// Need 3 is 5
const need = this.options.limit - this.chunksInProgress;
const completeChunks = this.chunkState.filter((state) => state.done).length;
const remainingChunks = this.chunks.length - completeChunks;
let minNeeded = Math.ceil(this.options.limit / 2);
if (minNeeded > remainingChunks) {
minNeeded = remainingChunks;
}
if (need < minNeeded) {
return;
}
const candidates = [];
for (let i = 0; i < this.chunkState.length; i++) {
const state = this.chunkState[i];
if (!state.done && !state.busy) {
candidates.push(i);
if (candidates.length >= need) {
break;
}
}
}
if (candidates.length === 0) {
return;
}
candidates.forEach((index) => {
this._uploadChunkRetryable(index).then(
() => {
this._uploadChunks();
},
(err) => {
this._onError(err);
}
);
});
}
_shouldRetry(err) {
if (err.source && typeof err.source.status === "number") {
const { status } = err.source;
// 0 probably indicates network failure
return (
status === 0 ||
status === 409 ||
status === 423 ||
(status >= 500 && status < 600)
);
}
return false;
}
_retryable({ before, attempt, after }) {
const { retryDelays } = this.options;
const { signal } = this.abortController;
if (before) {
before();
}
const doAttempt = (retryAttempt) =>
attempt().catch((err) => {
if (this._aborted()) {
throw createAbortError();
}
if (this._shouldRetry(err) && retryAttempt < retryDelays.length) {
return delay(retryDelays[retryAttempt], { signal }).then(() =>
doAttempt(retryAttempt + 1)
);
}
throw err;
});
return doAttempt(0).then(
(result) => {
if (after) {
after();
}
return result;
},
(err) => {
if (after) {
after();
}
throw err;
}
);
}
_uploadChunkRetryable(index) {
return this._retryable({
before: () => {
this.chunksInProgress += 1;
},
attempt: () => this._uploadChunk(index),
after: () => {
this.chunksInProgress -= 1;
},
});
}
_uploadChunk(index) {
this.chunkState[index].busy = true;
if (this._aborted()) {
this.chunkState[index].busy = false;
throw createAbortError();
}
return this._uploadChunkBytes(
index,
this.options.url,
this.options.headers
);
}
_onChunkProgress(index, sent) {
this.chunkState[index].bytesUploaded = parseInt(sent, 10);
const totalUploaded = this.chunkState.reduce(
(total, chunk) => total + chunk.bytesUploaded,
0
);
this.options.onProgress(totalUploaded, this.file.data.size);
}
_onChunkComplete(index) {
this.chunkState[index].done = true;
this.options.onChunkComplete(index);
}
_uploadChunkBytes(index, url, headers) {
const body = this.chunks[index];
const { signal } = this.abortController;
return new Promise((resolve, reject) => {
const xhr = new XMLHttpRequest();
function cleanup() {
signal.removeEventListener("abort", () => xhr.abort());
}
signal.addEventListener("abort", xhr.abort());
xhr.open(this.options.method || "POST", url, true);
if (headers) {
Object.keys(headers).forEach((key) => {
xhr.setRequestHeader(key, headers[key]);
});
}
xhr.responseType = "text";
xhr.upload.addEventListener("progress", (ev) => {
if (!ev.lengthComputable) {
return;
}
this._onChunkProgress(index, ev.loaded, ev.total);
});
xhr.addEventListener("abort", () => {
cleanup();
this.chunkState[index].busy = false;
reject(createAbortError());
});
xhr.addEventListener("load", (ev) => {
cleanup();
this.chunkState[index].busy = false;
if (ev.target.status < 200 || ev.target.status >= 300) {
const error = new Error("Non 2xx");
error.source = ev.target;
reject(error);
return;
}
// This avoids the net::ERR_OUT_OF_MEMORY in Chromium Browsers.
this.chunks[index] = null;
this._onChunkProgress(index, body.size, body.size);
this._onChunkComplete(index);
resolve();
});
xhr.addEventListener("error", (ev) => {
cleanup();
this.chunkState[index].busy = false;
const error = new Error("Unknown error");
error.source = ev.target;
reject(error);
});
xhr.send(this._buildFormData(index + 1, body));
});
}
async _completeUpload() {
this.options.onSuccess();
}
_buildFormData(currentChunkNumber, body) {
const uniqueIdentifier =
this.file.data.size +
"-" +
this.file.data.name.replace(/[^0-9a-zA-Z_-]/gim, "");
const formData = new FormData();
formData.append("file", body);
formData.append("resumableChunkNumber", currentChunkNumber);
formData.append("resumableCurrentChunkSize", body.size);
formData.append("resumableChunkSize", this.chunkSize);
formData.append("resumableTotalSize", this.file.data.size);
formData.append("resumableFilename", this.file.data.name);
formData.append("resumableIdentifier", uniqueIdentifier);
return formData;
}
_abortUpload() {
this.abortController.abort();
}
_onError(err) {
if (err && err.name === "AbortError") {
return;
}
this.options.onError(err);
}
start() {
this._createUpload();
}
abort(opts = undefined) {
if (opts?.really) {
this._abortUpload();
}
}
}

View File

@ -0,0 +1,211 @@
import { UploaderPlugin } from "discourse/lib/uppy-plugin-base";
import { next } from "@ember/runloop";
import getURL from "discourse-common/lib/get-url";
import { Promise } from "rsvp";
import UppyChunkedUpload from "discourse/lib/uppy-chunked-upload";
import EventTracker from "@uppy/utils/lib/EventTracker";
// Limited use uppy uploader function to replace Resumable.js, which
// is only used by the local backup uploader at this point in time,
// and has been that way for many years. Uses the skeleton of uppy's
// AwsS3Multipart uploader plugin to provide a similar API, with unnecessary
// code removed.
//
// See also UppyChunkedUpload class for more detail.
export default class UppyChunkedUploader extends UploaderPlugin {
static pluginId = "uppy-chunked-uploader";
constructor(uppy, opts) {
super(uppy, opts);
const defaultOptions = {
limit: 0,
retryDelays: [0, 1000, 3000, 5000],
};
this.opts = { ...defaultOptions, ...opts };
this.url = getURL(opts.url);
this.method = opts.method || "POST";
this.uploaders = Object.create(null);
this.uploaderEvents = Object.create(null);
}
_resetUploaderReferences(fileID, opts = {}) {
if (this.uploaders[fileID]) {
this.uploaders[fileID].abort({ really: opts.abort || false });
this.uploaders[fileID] = null;
}
if (this.uploaderEvents[fileID]) {
this.uploaderEvents[fileID].remove();
this.uploaderEvents[fileID] = null;
}
}
_uploadFile(file) {
return new Promise((resolve, reject) => {
const onStart = () => {
this.uppy.emit("upload-started", file);
};
const onProgress = (bytesUploaded, bytesTotal) => {
this.uppy.emit("upload-progress", file, {
uploader: this,
bytesUploaded,
bytesTotal,
});
};
const onError = (err) => {
this.uppy.log(err);
this.uppy.emit("upload-error", file, err);
this._resetUploaderReferences(file.id);
reject(err);
};
const onSuccess = () => {
this._resetUploaderReferences(file.id);
const cFile = this.uppy.getFile(file.id);
const uploadResponse = {};
this.uppy.emit("upload-success", cFile || file, uploadResponse);
resolve(upload);
};
const onChunkComplete = (chunk) => {
const cFile = this.uppy.getFile(file.id);
if (!cFile) {
return;
}
this.uppy.emit("chunk-uploaded", cFile, chunk);
};
const upload = new UppyChunkedUpload(file, {
getChunkSize: this.opts.getChunkSize
? this.opts.getChunkSize.bind(this)
: null,
onStart,
onProgress,
onChunkComplete,
onSuccess,
onError,
limit: this.opts.limit || 5,
retryDelays: this.opts.retryDelays || [],
method: this.method,
url: this.url,
headers: this.opts.headers,
});
this.uploaders[file.id] = upload;
this.uploaderEvents[file.id] = new EventTracker(this.uppy);
next(() => {
if (!file.isPaused) {
upload.start();
}
});
this._onFileRemove(file.id, (removed) => {
this._resetUploaderReferences(file.id, { abort: true });
resolve(`upload ${removed.id} was removed`);
});
this._onCancelAll(file.id, () => {
this._resetUploaderReferences(file.id, { abort: true });
resolve(`upload ${file.id} was canceled`);
});
this._onFilePause(file.id, (isPaused) => {
if (isPaused) {
upload.pause();
} else {
next(() => {
upload.start();
});
}
});
this._onPauseAll(file.id, () => {
upload.pause();
});
this._onResumeAll(file.id, () => {
if (file.error) {
upload.abort();
}
next(() => {
upload.start();
});
});
// Don't double-emit upload-started for restored files that were already started
if (!file.progress.uploadStarted || !file.isRestored) {
this.uppy.emit("upload-started", file);
}
});
}
_onFileRemove(fileID, cb) {
this.uploaderEvents[fileID].on("file-removed", (file) => {
if (fileID === file.id) {
cb(file.id);
}
});
}
_onFilePause(fileID, cb) {
this.uploaderEvents[fileID].on("upload-pause", (targetFileID, isPaused) => {
if (fileID === targetFileID) {
cb(isPaused);
}
});
}
_onPauseAll(fileID, cb) {
this.uploaderEvents[fileID].on("pause-all", () => {
if (!this.uppy.getFile(fileID)) {
return;
}
cb();
});
}
_onCancelAll(fileID, cb) {
this.uploaderEvents[fileID].on("cancel-all", () => {
if (!this.uppy.getFile(fileID)) {
return;
}
cb();
});
}
_onResumeAll(fileID, cb) {
this.uploaderEvents[fileID].on("resume-all", () => {
if (!this.uppy.getFile(fileID)) {
return;
}
cb();
});
}
_upload(fileIDs) {
const promises = fileIDs.map((id) => {
const file = this.uppy.getFile(id);
return this._uploadFile(file);
});
return Promise.all(promises);
}
install() {
this._install(this._upload.bind(this));
}
uninstall() {
this._uninstall(this._upload.bind(this));
}
}

View File

@ -30,32 +30,6 @@ export class UppyPluginBase extends BasePlugin {
_setFileState(fileId, state) {
this.uppy.setFileState(fileId, state);
}
}
export class UploadPreProcessorPlugin extends UppyPluginBase {
static pluginType = "preprocessor";
constructor(uppy, opts) {
super(uppy, opts);
this.type = this.constructor.pluginType;
}
_install(fn) {
this.uppy.addPreProcessor(fn);
}
_uninstall(fn) {
this.uppy.removePreProcessor(fn);
}
_emitProgress(file) {
this.uppy.emit("preprocess-progress", file, null, this.id);
}
_emitComplete(file, skipped = false) {
this.uppy.emit("preprocess-complete", file, skipped, this.id);
return Promise.resolve();
}
_emitAllComplete(fileIds, skipped = false) {
fileIds.forEach((fileId) => {
@ -82,3 +56,55 @@ export class UploadPreProcessorPlugin extends UppyPluginBase {
return this._emitAllComplete(file, true);
}
}
export class UploadPreProcessorPlugin extends UppyPluginBase {
static pluginType = "preprocessor";
constructor(uppy, opts) {
super(uppy, opts);
this.type = this.constructor.pluginType;
}
_install(fn) {
this.uppy.addPreProcessor(fn);
}
_uninstall(fn) {
this.uppy.removePreProcessor(fn);
}
_emitProgress(file) {
this.uppy.emit("preprocess-progress", file, null, this.id);
}
_emitComplete(file, skipped = false) {
this.uppy.emit("preprocess-complete", file, skipped, this.id);
return Promise.resolve();
}
}
export class UploaderPlugin extends UppyPluginBase {
static pluginType = "uploader";
constructor(uppy, opts) {
super(uppy, opts);
this.type = this.constructor.pluginType;
}
_install(fn) {
this.uppy.addUploader(fn);
}
_uninstall(fn) {
this.uppy.removeUploader(fn);
}
_emitProgress(file) {
this.uppy.emit("upload-progress", file, null, this.id);
}
_emitComplete(file, skipped = false) {
this.uppy.emit("upload-complete", file, skipped, this.id);
return Promise.resolve();
}
}

View File

@ -14,6 +14,7 @@ import XHRUpload from "@uppy/xhr-upload";
import AwsS3 from "@uppy/aws-s3";
import UppyChecksum from "discourse/lib/uppy-checksum-plugin";
import UppyS3Multipart from "discourse/mixins/uppy-s3-multipart";
import UppyChunkedUploader from "discourse/lib/uppy-chunked-uploader-plugin";
import { on } from "discourse-common/utils/decorators";
import { warn } from "@ember/debug";
import bootbox from "bootbox";
@ -152,7 +153,9 @@ export default Mixin.create(UppyS3Multipart, {
this.setProperties({ uploading: false, processing: true });
this._completeExternalUpload(file)
.then((completeResponse) => {
this.uploadDone(completeResponse);
this.uploadDone(
deepMerge(completeResponse, { file_name: file.name })
);
if (this._inProgressUploads === 0) {
this._reset();
@ -165,7 +168,9 @@ export default Mixin.create(UppyS3Multipart, {
}
});
} else {
this.uploadDone(response.body);
this.uploadDone(
deepMerge(response?.body || {}, { file_name: file.name })
);
if (this._inProgressUploads === 0) {
this._reset();
}
@ -185,16 +190,21 @@ export default Mixin.create(UppyS3Multipart, {
// allow these other uploaders to go direct to S3.
if (
this.siteSettings.enable_direct_s3_uploads &&
!this.preventDirectS3Uploads
!this.preventDirectS3Uploads &&
!this.useChunkedUploads
) {
if (this.useMultipartUploadsIfAvailable) {
this._useS3MultipartUploads();
} else {
this._useS3Uploads();
}
} else {
if (this.useChunkedUploads) {
this._useChunkedUploads();
} else {
this._useXHRUploads();
}
}
},
_useXHRUploads() {
@ -206,6 +216,16 @@ export default Mixin.create(UppyS3Multipart, {
});
},
_useChunkedUploads() {
this.set("usingChunkedUploads", true);
this._uppyInstance.use(UppyChunkedUploader, {
url: this._xhrUploadUrl(),
headers: {
"X-CSRF-Token": this.session.csrfToken,
},
});
},
_useS3Uploads() {
this.set("usingS3Uploads", true);
this._uppyInstance.use(AwsS3, {
@ -251,7 +271,7 @@ export default Mixin.create(UppyS3Multipart, {
_xhrUploadUrl() {
return (
getUrl(this.getWithDefault("uploadUrl", "/uploads")) +
getUrl(this.getWithDefault("uploadUrl", this.uploadRootPath)) +
".json?client_id=" +
this.messageBus?.clientId
);

View File

@ -22,6 +22,7 @@
"@ember/test-helpers": "^2.2.0",
"@glimmer/component": "^1.0.0",
"@popperjs/core": "2.10.2",
"@uppy/utils": "^4.0.3",
"@uppy/aws-s3": "^2.0.4",
"@uppy/aws-s3-multipart": "^2.1.0",
"@uppy/core": "^2.1.0",

View File

@ -180,13 +180,11 @@ class Admin::BackupsController < Admin::AdminController
current_chunk_size = params.fetch(:resumableCurrentChunkSize).to_i
previous_chunk_number = chunk_number - 1
# path to chunk file
chunk = BackupRestore::LocalBackupStore.chunk_path(identifier, filename, chunk_number)
# upload chunk
HandleChunkUpload.upload_chunk(chunk, file: file)
uploaded_file_size = previous_chunk_number * chunk_size
# when all chunks are uploaded
uploaded_file_size = previous_chunk_number * chunk_size
if uploaded_file_size + current_chunk_size >= total_size
# merge all the chunks in a background thread
Jobs.enqueue_in(5.seconds, :backup_chunks_merger, filename: filename, identifier: identifier, chunks: chunk_number)

View File

@ -12,6 +12,7 @@
"@json-editor/json-editor": "^2.5.2",
"@popperjs/core": "v2.10.2",
"@uppy/aws-s3": "^2.0.4",
"@uppy/utils": "^4.0.3",
"@uppy/aws-s3-multipart": "^2.1.0",
"@uppy/core": "^2.1.0",
"@uppy/drop-target": "^1.1.0",

View File

@ -9,3 +9,8 @@ Uppy.XHRUpload = require('@uppy/xhr-upload')
Uppy.AwsS3 = require('@uppy/aws-s3')
Uppy.AwsS3Multipart = require('@uppy/aws-s3-multipart')
Uppy.DropTarget = require('@uppy/drop-target')
Uppy.Utils = {
delay: require('@uppy/utils/lib/delay'),
EventTracker: require('@uppy/utils/lib/EventTracker'),
AbortControllerLib: require('@uppy/utils/lib/AbortController')
}

View File

@ -7578,5 +7578,10 @@ Uppy.XHRUpload = require('@uppy/xhr-upload')
Uppy.AwsS3 = require('@uppy/aws-s3')
Uppy.AwsS3Multipart = require('@uppy/aws-s3-multipart')
Uppy.DropTarget = require('@uppy/drop-target')
Uppy.Utils = {
delay: require('@uppy/utils/lib/delay'),
EventTracker: require('@uppy/utils/lib/EventTracker'),
AbortControllerLib: require('@uppy/utils/lib/AbortController')
}
},{"@uppy/aws-s3":5,"@uppy/aws-s3-multipart":3,"@uppy/core":18,"@uppy/drop-target":21,"@uppy/xhr-upload":49}]},{},[58]);
},{"@uppy/aws-s3":5,"@uppy/aws-s3-multipart":3,"@uppy/core":18,"@uppy/drop-target":21,"@uppy/utils/lib/AbortController":23,"@uppy/utils/lib/EventTracker":24,"@uppy/utils/lib/delay":29,"@uppy/xhr-upload":49}]},{},[58]);