From b0416cb1c11ba762da63ebdb297dbd95da334df9 Mon Sep 17 00:00:00 2001 From: David Taylor Date: Wed, 16 Jun 2021 10:34:39 +0100 Subject: [PATCH] FEATURE: Upload to s3 in parallel to speed up backup restores (#13391) Uploading lots of small files can be made significantly faster by parallelizing the `s3.put_object` calls. In testing, an UPLOAD_CONCURRENCY of 10 made a large restore 10x faster. An UPLOAD_CONCURRENCY of 20 made the same restore 18x faster. This commit is careful to parallelize as little as possible, to reduce the chance of concurrency issues. In the worker threads, no database transactions are performed. All modification of shared objects is controlled with a mutex. Unfortunately we do not have any existing tests for the `ToS3Migration` class. This change has been tested with a large site backup (120k uploads totalling 45GB) --- lib/file_store/to_s3_migration.rb | 26 +++++++++++++++++++++----- 1 file changed, 21 insertions(+), 5 deletions(-) diff --git a/lib/file_store/to_s3_migration.rb b/lib/file_store/to_s3_migration.rb index 9af8aa3994d..0d6c098327f 100644 --- a/lib/file_store/to_s3_migration.rb +++ b/lib/file_store/to_s3_migration.rb @@ -7,6 +7,7 @@ module FileStore class ToS3Migration MISSING_UPLOADS_RAKE_TASK_NAME ||= 'posts:missing_uploads' + UPLOAD_CONCURRENCY ||= 20 def initialize(s3_options:, dry_run: false, migrate_to_multisite: false, skip_etag_verify: false) @@ -197,9 +198,25 @@ module FileStore log " => #{s3_objects.size} files" log " - Syncing files to S3" + queue = Queue.new synced = 0 failed = [] + lock = Mutex.new + upload_threads = UPLOAD_CONCURRENCY.times.map do + Thread.new do + while obj = queue.pop + if s3.put_object(obj[:options]).etag[obj[:etag]] + putc "." + lock.synchronize { synced += 1 } + else + putc "X" + lock.synchronize { failed << obj[:path] } + end + end + end + end + local_files.each do |file| path = File.join(public_directory, file) name = File.basename(path) @@ -244,15 +261,14 @@ module FileStore if @dry_run log "#{file} => #{options[:key]}" synced += 1 - elsif s3.put_object(options).etag[etag] - putc "." - synced += 1 else - putc "X" - failed << path + queue << { path: path, options: options, etag: etag } end end + queue.close + upload_threads.each(&:join) + puts failure_message = "S3 migration failed for db '#{@current_db}'."