FEATURE: Upload to s3 in parallel to speed up backup restores (#13391)

Uploading lots of small files can be made significantly faster by parallelizing the `s3.put_object` calls. In testing, an UPLOAD_CONCURRENCY of 10 made a large restore 10x faster. An UPLOAD_CONCURRENCY of 20 made the same restore 18x faster.

This commit is careful to parallelize as little as possible, to reduce the chance of concurrency issues. In the worker threads, no database transactions are performed. All modification of shared objects is controlled with a mutex.

Unfortunately we do not have any existing tests for the `ToS3Migration` class. This change has been tested with a large site backup (120k uploads totalling 45GB)
This commit is contained in:
David Taylor 2021-06-16 10:34:39 +01:00 committed by GitHub
parent 03fc31e23b
commit b0416cb1c1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 21 additions and 5 deletions

View File

@ -7,6 +7,7 @@ module FileStore
class ToS3Migration class ToS3Migration
MISSING_UPLOADS_RAKE_TASK_NAME ||= 'posts:missing_uploads' MISSING_UPLOADS_RAKE_TASK_NAME ||= 'posts:missing_uploads'
UPLOAD_CONCURRENCY ||= 20
def initialize(s3_options:, dry_run: false, migrate_to_multisite: false, skip_etag_verify: false) def initialize(s3_options:, dry_run: false, migrate_to_multisite: false, skip_etag_verify: false)
@ -197,9 +198,25 @@ module FileStore
log " => #{s3_objects.size} files" log " => #{s3_objects.size} files"
log " - Syncing files to S3" log " - Syncing files to S3"
queue = Queue.new
synced = 0 synced = 0
failed = [] failed = []
lock = Mutex.new
upload_threads = UPLOAD_CONCURRENCY.times.map do
Thread.new do
while obj = queue.pop
if s3.put_object(obj[:options]).etag[obj[:etag]]
putc "."
lock.synchronize { synced += 1 }
else
putc "X"
lock.synchronize { failed << obj[:path] }
end
end
end
end
local_files.each do |file| local_files.each do |file|
path = File.join(public_directory, file) path = File.join(public_directory, file)
name = File.basename(path) name = File.basename(path)
@ -244,15 +261,14 @@ module FileStore
if @dry_run if @dry_run
log "#{file} => #{options[:key]}" log "#{file} => #{options[:key]}"
synced += 1 synced += 1
elsif s3.put_object(options).etag[etag]
putc "."
synced += 1
else else
putc "X" queue << { path: path, options: options, etag: etag }
failed << path
end end
end end
queue.close
upload_threads.each(&:join)
puts puts
failure_message = "S3 migration failed for db '#{@current_db}'." failure_message = "S3 migration failed for db '#{@current_db}'."