discourse/lib/file_store/to_s3_migration.rb

382 lines
12 KiB
Ruby

# frozen_string_literal: true
require 'aws-sdk-s3'
module FileStore
ToS3MigrationError = Class.new(RuntimeError)
class ToS3Migration
MISSING_UPLOADS_RAKE_TASK_NAME ||= 'posts:missing_uploads'
UPLOAD_CONCURRENCY ||= 20
def initialize(s3_options:, dry_run: false, migrate_to_multisite: false, skip_etag_verify: false)
@s3_bucket = s3_options[:bucket]
@s3_client_options = s3_options[:client_options]
@dry_run = dry_run
@migrate_to_multisite = migrate_to_multisite
@skip_etag_verify = skip_etag_verify
@current_db = RailsMultisite::ConnectionManagement.current_db
end
def self.s3_options_from_site_settings
{
client_options: S3Helper.s3_options(SiteSetting),
bucket: SiteSetting.Upload.s3_upload_bucket
}
end
def self.s3_options_from_env
unless ENV["DISCOURSE_S3_BUCKET"].present? &&
ENV["DISCOURSE_S3_REGION"].present? &&
(
(
ENV["DISCOURSE_S3_ACCESS_KEY_ID"].present? &&
ENV["DISCOURSE_S3_SECRET_ACCESS_KEY"].present?
) || ENV["DISCOURSE_S3_USE_IAM_PROFILE"].present?
)
raise ToS3MigrationError.new(<<~TEXT)
Please provide the following environment variables:
- DISCOURSE_S3_BUCKET
- DISCOURSE_S3_REGION
and either
- DISCOURSE_S3_ACCESS_KEY_ID
- DISCOURSE_S3_SECRET_ACCESS_KEY
or
- DISCOURSE_S3_USE_IAM_PROFILE
TEXT
end
opts = { region: ENV["DISCOURSE_S3_REGION"] }
opts[:endpoint] = ENV["DISCOURSE_S3_ENDPOINT"] if ENV["DISCOURSE_S3_ENDPOINT"].present?
if ENV["DISCOURSE_S3_USE_IAM_PROFILE"].blank?
opts[:access_key_id] = ENV["DISCOURSE_S3_ACCESS_KEY_ID"]
opts[:secret_access_key] = ENV["DISCOURSE_S3_SECRET_ACCESS_KEY"]
end
{
client_options: opts,
bucket: ENV["DISCOURSE_S3_BUCKET"]
}
end
def migrate
migrate_to_s3
end
def migration_successful?(should_raise: false)
success = true
failure_message = "S3 migration failed for db '#{@current_db}'."
prefix = @migrate_to_multisite ? "uploads/#{@current_db}/original/" : "original/"
base_url = File.join(SiteSetting.Upload.s3_base_url, prefix)
count = Upload.by_users.where("url NOT LIKE '#{base_url}%'").count
if count > 0
error_message = "#{count} of #{Upload.count} uploads are not migrated to S3. #{failure_message}"
raise_or_log(error_message, should_raise)
success = false
end
cdn_path = SiteSetting.cdn_path("/uploads/#{@current_db}/original").sub(/https?:/, "")
count = Post.where("cooked LIKE '%#{cdn_path}%'").count
if count > 0
error_message = "#{count} posts are not remapped to new S3 upload URL. #{failure_message}"
raise_or_log(error_message, should_raise)
success = false
end
Discourse::Application.load_tasks unless Rake::Task.task_defined?(MISSING_UPLOADS_RAKE_TASK_NAME)
Rake::Task[MISSING_UPLOADS_RAKE_TASK_NAME]
count = DB.query_single(<<~SQL, Post::MISSING_UPLOADS, Post::MISSING_UPLOADS_IGNORED).first
SELECT COUNT(1)
FROM posts p
WHERE EXISTS (
SELECT 1
FROM post_custom_fields f
WHERE f.post_id = p.id AND f.name = ?
) AND NOT EXISTS (
SELECT 1
FROM post_custom_fields f
WHERE f.post_id = p.id AND f.name = ?
)
SQL
if count > 0
error_message = "rake posts:missing_uploads identified #{count} issues. #{failure_message}"
raise_or_log(error_message, should_raise)
success = false
end
count = Post.where('baked_version <> ? OR baked_version IS NULL', Post::BAKED_VERSION).count
if count > 0
log("#{count} posts still require rebaking and will be rebaked during regular job")
log("To speed up migrations of posts we recommend you run 'rake posts:rebake_uncooked_posts'") if count > 100
success = false
else
log("No posts require rebaking")
end
success
end
protected
def log(message)
puts message
end
def raise_or_log(message, should_raise)
if should_raise
raise ToS3MigrationError.new(message)
else
log(message)
end
end
def uploads_migrated_to_new_scheme?
seeded_image_url = "uploads/#{@current_db}/original/_X/"
!Upload.by_users.where("url NOT LIKE '//%' AND url NOT LIKE '/%#{seeded_image_url}%'").exists?
end
def migrate_to_s3
# we don't want have migrated state, ensure we run all jobs here
Jobs.run_immediately!
log "*" * 30 + " DRY RUN " + "*" * 30 if @dry_run
log "Migrating uploads to S3 for '#{@current_db}'..."
if !uploads_migrated_to_new_scheme?
log "Some uploads were not migrated to the new scheme. Running the migration, this may take a while..."
SiteSetting.migrate_to_new_scheme = true
Upload.migrate_to_new_scheme
if !uploads_migrated_to_new_scheme?
raise ToS3MigrationError.new("Some uploads could not be migrated to the new scheme. " \
"You need to fix this manually.")
end
end
bucket_has_folder_path = true if @s3_bucket.include? "/"
public_directory = Rails.root.join("public").to_s
s3 = Aws::S3::Client.new(@s3_client_options)
if bucket_has_folder_path
bucket, folder = S3Helper.get_bucket_and_folder_path(@s3_bucket)
folder = File.join(folder, "/")
else
bucket, folder = @s3_bucket, ""
end
log "Uploading files to S3..."
log " - Listing local files"
local_files = []
IO.popen("cd #{public_directory} && find uploads/#{@current_db}/original -type f").each do |file|
local_files << file.chomp
putc "." if local_files.size % 1000 == 0
end
log " => #{local_files.size} files"
log " - Listing S3 files"
s3_objects = []
prefix = @migrate_to_multisite ? "uploads/#{@current_db}/original/" : "original/"
options = { bucket: bucket, prefix: folder + prefix }
loop do
response = s3.list_objects_v2(options)
s3_objects.concat(response.contents)
putc "."
break if response.next_continuation_token.blank?
options[:continuation_token] = response.next_continuation_token
end
log " => #{s3_objects.size} files"
log " - Syncing files to S3"
queue = Queue.new
synced = 0
failed = []
lock = Mutex.new
upload_threads = UPLOAD_CONCURRENCY.times.map do
Thread.new do
while obj = queue.pop
if s3.put_object(obj[:options]).etag[obj[:etag]]
putc "."
lock.synchronize { synced += 1 }
else
putc "X"
lock.synchronize { failed << obj[:path] }
end
end
end
end
local_files.each do |file|
path = File.join(public_directory, file)
name = File.basename(path)
etag = Digest::MD5.file(path).hexdigest unless @skip_etag_verify
key = file[file.index(prefix)..-1]
key.prepend(folder) if bucket_has_folder_path
original_path = file.sub("uploads/#{@current_db}", "")
if s3_object = s3_objects.find { |obj| obj.key.ends_with?(original_path) }
next if File.size(path) == s3_object.size && (@skip_etag_verify || s3_object.etag[etag])
end
options = {
acl: "public-read",
body: File.open(path, "rb"),
bucket: bucket,
content_type: MiniMime.lookup_by_filename(name)&.content_type,
key: key,
}
if !FileHelper.is_supported_image?(name)
upload = Upload.find_by(url: "/#{file}")
if upload&.original_filename
options[:content_disposition] = ActionDispatch::Http::ContentDisposition.format(
disposition: "attachment", filename: upload.original_filename
)
end
if upload&.secure
options[:acl] = "private"
end
elsif !FileHelper.is_inline_image?(name)
upload = Upload.find_by(url: "/#{file}")
options[:content_disposition] = ActionDispatch::Http::ContentDisposition.format(
disposition: "attachment", filename: upload&.original_filename || name
)
end
etag ||= Digest::MD5.file(path).hexdigest
if @dry_run
log "#{file} => #{options[:key]}"
synced += 1
else
queue << { path: path, options: options, etag: etag }
end
end
queue.close
upload_threads.each(&:join)
puts
failure_message = "S3 migration failed for db '#{@current_db}'."
if failed.size > 0
log "Failed to upload #{failed.size} files"
log failed.join("\n")
raise failure_message
elsif s3_objects.size + synced >= local_files.size
log "Updating the URLs in the database..."
from = "/uploads/#{@current_db}/original/"
to = "#{SiteSetting.Upload.s3_base_url}/#{prefix}"
if @dry_run
log "REPLACING '#{from}' WITH '#{to}'"
else
DbHelper.remap(from, to, anchor_left: true)
end
[
[
"src=\"/uploads/#{@current_db}/original/(\\dX/(?:[a-f0-9]/)*[a-f0-9]{40}[a-z0-9\\.]*)",
"src=\"#{SiteSetting.Upload.s3_base_url}/#{prefix}\\1"
],
[
"src='/uploads/#{@current_db}/original/(\\dX/(?:[a-f0-9]/)*[a-f0-9]{40}[a-z0-9\\.]*)",
"src='#{SiteSetting.Upload.s3_base_url}/#{prefix}\\1"
],
[
"href=\"/uploads/#{@current_db}/original/(\\dX/(?:[a-f0-9]/)*[a-f0-9]{40}[a-z0-9\\.]*)",
"href=\"#{SiteSetting.Upload.s3_base_url}/#{prefix}\\1"
],
[
"href='/uploads/#{@current_db}/original/(\\dX/(?:[a-f0-9]/)*[a-f0-9]{40}[a-z0-9\\.]*)",
"href='#{SiteSetting.Upload.s3_base_url}/#{prefix}\\1"
],
[
"\\[img\\]/uploads/#{@current_db}/original/(\\dX/(?:[a-f0-9]/)*[a-f0-9]{40}[a-z0-9\\.]*)\\[/img\\]",
"[img]#{SiteSetting.Upload.s3_base_url}/#{prefix}\\1[/img]"
]
].each do |from_url, to_url|
if @dry_run
log "REPLACING '#{from_url}' WITH '#{to_url}'"
else
DbHelper.regexp_replace(from_url, to_url)
end
end
unless @dry_run
# Legacy inline image format
Post.where("raw LIKE '%![](/uploads/default/original/%)%'").each do |post|
regexp = /!\[\](\/uploads\/#{@current_db}\/original\/(\dX\/(?:[a-f0-9]\/)*[a-f0-9]{40}[a-z0-9\.]*))/
post.raw.scan(regexp).each do |upload_url, _|
upload = Upload.get_from_url(upload_url)
post.raw = post.raw.gsub("![](#{upload_url})", "![](#{upload.short_url})")
end
post.save!(validate: false)
end
end
if Discourse.asset_host.present?
# Uploads that were on local CDN will now be on S3 CDN
from = "#{Discourse.asset_host}/uploads/#{@current_db}/original/"
to = "#{SiteSetting.Upload.s3_cdn_url}/#{prefix}"
if @dry_run
log "REMAPPING '#{from}' TO '#{to}'"
else
DbHelper.remap(from, to)
end
end
# Uploads that were on base hostname will now be on S3 CDN
from = "#{Discourse.base_url}/uploads/#{@current_db}/original/"
to = "#{SiteSetting.Upload.s3_cdn_url}/#{prefix}"
if @dry_run
log "REMAPPING '#{from}' TO '#{to}'"
else
DbHelper.remap(from, to)
end
unless @dry_run
log "Removing old optimized images..."
OptimizedImage
.joins("LEFT JOIN uploads u ON optimized_images.upload_id = u.id")
.where("u.id IS NOT NULL AND u.url LIKE '//%' AND optimized_images.url NOT LIKE '//%'")
.delete_all
log "Flagging all posts containing lightboxes for rebake..."
count = Post.where("cooked LIKE '%class=\"lightbox\"%'").update_all(baked_version: nil)
log "#{count} posts were flagged for a rebake"
end
end
migration_successful?(should_raise: true)
log "Done!"
ensure
Jobs.run_later!
end
end
end