PERF: new 'migrate_to_s3' rake task
This commit is contained in:
parent
8f65e4fb01
commit
5381096bfd
|
@ -153,7 +153,7 @@ class SiteSetting < ActiveRecord::Base
|
|||
|
||||
# cf. http://docs.aws.amazon.com/general/latest/gr/rande.html#s3_region
|
||||
if SiteSetting.s3_endpoint == "https://s3.amazonaws.com"
|
||||
if SiteSetting.Upload.s3_region == 'cn-north-1' || SiteSetting.Upload.s3_region == 'cn-northwest-1'
|
||||
if SiteSetting.Upload.s3_region.start_with?("cn-")
|
||||
"//#{bucket}.s3.#{SiteSetting.Upload.s3_region}.amazonaws.com.cn"
|
||||
else
|
||||
"//#{bucket}.s3.dualstack.#{SiteSetting.Upload.s3_region}.amazonaws.com"
|
||||
|
|
|
@ -1,60 +1,91 @@
|
|||
class DbHelper
|
||||
|
||||
REMAP_SQL ||= "
|
||||
REMAP_SQL ||= <<~SQL
|
||||
SELECT table_name, column_name
|
||||
FROM information_schema.columns
|
||||
WHERE table_schema = 'public'
|
||||
AND is_updatable = 'YES'
|
||||
AND (data_type LIKE 'char%' OR data_type LIKE 'text%')
|
||||
ORDER BY table_name, column_name"
|
||||
ORDER BY table_name, column_name
|
||||
SQL
|
||||
|
||||
def self.remap(from, to, anchor_left: false, anchor_right: false, exclude_tables: [])
|
||||
results = DB.query(REMAP_SQL).to_a
|
||||
def self.remap(from, to, anchor_left: false, anchor_right: false, excluded_tables: [])
|
||||
like = "#{anchor_left ? '' : "%"}#{from}#{anchor_right ? '' : "%"}"
|
||||
text_columns = Hash.new { |h, k| h[k] = [] }
|
||||
|
||||
remappable_columns = {}
|
||||
|
||||
results.each do |result|
|
||||
remappable_columns[result.table_name] ||= []
|
||||
remappable_columns[result.table_name] << result.column_name
|
||||
DB.query(REMAP_SQL).each do |r|
|
||||
text_columns[r.table_name] << r.column_name
|
||||
end
|
||||
|
||||
exclude_tables = exclude_tables.map(&:to_s)
|
||||
text_columns.each do |table, columns|
|
||||
next if excluded_tables.include?(table)
|
||||
|
||||
remappable_columns.each do |table_name, column_names|
|
||||
next if exclude_tables.include?(table_name)
|
||||
set_clause = column_names.map do |column_name|
|
||||
"#{column_name} = REPLACE(#{column_name}, :from, :to)"
|
||||
set = columns.map do |column|
|
||||
"#{column} = REPLACE(#{column}, :from, :to)"
|
||||
end.join(", ")
|
||||
|
||||
where_clause = column_names.map do |column_name|
|
||||
"#{column_name} LIKE :like"
|
||||
where = columns.map do |column|
|
||||
"#{column} IS NOT NULL AND #{column} LIKE :like"
|
||||
end.join(" OR ")
|
||||
|
||||
DB.exec(<<~SQL, from: from, to: to, like: like)
|
||||
UPDATE #{table_name}
|
||||
SET #{set_clause}
|
||||
WHERE #{where_clause}
|
||||
UPDATE #{table}
|
||||
SET #{set}
|
||||
WHERE #{where}
|
||||
SQL
|
||||
end
|
||||
|
||||
SiteSetting.refresh!
|
||||
end
|
||||
|
||||
def self.find(needle, anchor_left = false, anchor_right = false)
|
||||
connection = ActiveRecord::Base.connection.raw_connection
|
||||
text_columns = connection.async_exec(REMAP_SQL).to_a
|
||||
args = ["#{anchor_left ? '' : "%"}#{needle}#{anchor_right ? '' : "%"}"]
|
||||
found = {}
|
||||
def self.regexp_replace(pattern, replacement, flags: "gi", match: "~*", excluded_tables: [])
|
||||
text_columns = Hash.new { |h, k| h[k] = [] }
|
||||
|
||||
text_columns.each do |rc|
|
||||
table_name = rc["table_name"]
|
||||
column_name = rc["column_name"]
|
||||
result = connection.async_exec("SELECT #{column_name} FROM #{table_name} WHERE #{column_name} LIKE $1", args) rescue nil
|
||||
if result&.ntuples > 0
|
||||
found["#{table_name}.#{column_name}"] = result.map { |r| r[column_name] }
|
||||
DB.query(REMAP_SQL).each do |r|
|
||||
text_columns[r.table_name] << r.column_name
|
||||
end
|
||||
|
||||
text_columns.each do |table, columns|
|
||||
next if excluded_tables.include?(table)
|
||||
|
||||
set = columns.map do |column|
|
||||
"#{column} = REGEXP_REPLACE(#{column}, :pattern, :replacement, :flags)"
|
||||
end.join(", ")
|
||||
|
||||
where = columns.map do |column|
|
||||
"#{column} IS NOT NULL AND #{column} #{match} :pattern"
|
||||
end.join(" OR ")
|
||||
|
||||
puts pattern, replacement, flags, match
|
||||
|
||||
DB.exec(<<~SQL, pattern: pattern, replacement: replacement, flags: flags, match: match)
|
||||
UPDATE #{table}
|
||||
SET #{set}
|
||||
WHERE #{where}
|
||||
SQL
|
||||
end
|
||||
|
||||
SiteSetting.refresh!
|
||||
end
|
||||
|
||||
def self.find(needle, anchor_left: false, anchor_right: false, excluded_tables: [])
|
||||
found = {}
|
||||
like = "#{anchor_left ? '' : "%"}#{needle}#{anchor_right ? '' : "%"}"
|
||||
|
||||
DB.query(REMAP_SQL).each do |r|
|
||||
next if excluded_tables.include?(r.table_name)
|
||||
|
||||
rows = DB.query(<<~SQL, like: like)
|
||||
SELECT #{r.column_name}
|
||||
FROM #{r.table_name}
|
||||
WHERE #{r.column_name} LIKE :like
|
||||
SQL
|
||||
|
||||
if rows.size > 0
|
||||
found["#{r.table_name}.#{r.column_name}"] = rows.map { |row| row.send(r.column_name) }
|
||||
end
|
||||
end
|
||||
|
||||
found
|
||||
end
|
||||
|
||||
|
|
|
@ -98,7 +98,7 @@ module FileStore
|
|||
|
||||
def get_path_for(type, id, sha, extension)
|
||||
depth = get_depth_for(id)
|
||||
tree = File.join(*sha[0, depth].split(""), "")
|
||||
tree = File.join(*sha[0, depth].chars, "")
|
||||
"#{type}/#{depth + 1}X/#{tree}#{sha}#{extension}"
|
||||
end
|
||||
|
||||
|
@ -107,8 +107,7 @@ module FileStore
|
|||
if upload.extension
|
||||
".#{upload.extension}"
|
||||
else
|
||||
# Maintain backward compatibility before Jobs::MigrateUploadExtensions
|
||||
# runs
|
||||
# Maintain backward compatibility before Jobs::MigrateUploadExtensions runs
|
||||
File.extname(upload.original_filename)
|
||||
end
|
||||
|
||||
|
|
|
@ -24,8 +24,7 @@ class S3Helper
|
|||
|
||||
def upload(file, path, options = {})
|
||||
path = get_path_for_s3_upload(path)
|
||||
obj = s3_bucket.object(path)
|
||||
obj.upload_file(file, options)
|
||||
s3_bucket.object(path).upload_file(file, options)
|
||||
path
|
||||
end
|
||||
|
||||
|
@ -93,7 +92,6 @@ class S3Helper
|
|||
end
|
||||
|
||||
def update_lifecycle(id, days, prefix: nil, tag: nil)
|
||||
|
||||
filter = {}
|
||||
|
||||
if prefix
|
||||
|
@ -171,14 +169,15 @@ class S3Helper
|
|||
end
|
||||
|
||||
def object(path)
|
||||
path = get_path_for_s3_upload(path)
|
||||
s3_bucket.object(path)
|
||||
s3_bucket.object(get_path_for_s3_upload(path))
|
||||
end
|
||||
|
||||
def self.s3_options(obj)
|
||||
opts = { region: obj.s3_region,
|
||||
endpoint: SiteSetting.s3_endpoint,
|
||||
force_path_style: SiteSetting.s3_force_path_style }
|
||||
opts = {
|
||||
region: obj.s3_region,
|
||||
endpoint: SiteSetting.s3_endpoint,
|
||||
force_path_style: SiteSetting.s3_force_path_style
|
||||
}
|
||||
|
||||
unless obj.s3_use_iam_profile
|
||||
opts[:access_key_id] = obj.s3_access_key_id
|
||||
|
|
|
@ -203,9 +203,6 @@ end
|
|||
################################################################################
|
||||
|
||||
task "uploads:migrate_to_s3" => :environment do
|
||||
require "file_store/s3_store"
|
||||
require "file_store/local_store"
|
||||
|
||||
ENV["RAILS_DB"] ? migrate_to_s3 : migrate_to_s3_all_sites
|
||||
end
|
||||
|
||||
|
@ -214,93 +211,180 @@ def migrate_to_s3_all_sites
|
|||
end
|
||||
|
||||
def migrate_to_s3
|
||||
# make sure s3 is enabled
|
||||
if !SiteSetting.Upload.enable_s3_uploads
|
||||
puts "You must enable s3 uploads before running that task"
|
||||
return
|
||||
end
|
||||
|
||||
db = RailsMultisite::ConnectionManagement.current_db
|
||||
|
||||
puts "Migrating uploads to S3 (#{SiteSetting.Upload.s3_upload_bucket}) for '#{db}'..."
|
||||
dry_run = !!ENV["DRY_RUN"]
|
||||
|
||||
# will throw an exception if the bucket is missing
|
||||
s3 = FileStore::S3Store.new
|
||||
local = FileStore::LocalStore.new
|
||||
puts "*" * 30 + " DRY RUN " + "*" * 30 if dry_run
|
||||
puts "Migrating uploads to S3 for '#{db}'..."
|
||||
|
||||
exclude_tables = %i{
|
||||
incoming_emails
|
||||
stylesheet_cache
|
||||
search_logs
|
||||
post_search_data
|
||||
notifications
|
||||
email_logs
|
||||
}
|
||||
if Upload.where("url NOT LIKE '//%' AND url NOT LIKE '/uploads/#{db}/original/_X/%'").exists?
|
||||
puts <<~TEXT
|
||||
Some uploads were not migrated to the new scheme. Please run these commands in the rails console
|
||||
|
||||
# Migrate all uploads
|
||||
file_uploads = Upload.where.not(sha1: nil).where("url NOT LIKE '#{s3.absolute_base_url}%'")
|
||||
image_uploads = file_uploads.where("lower(extension) NOT IN (?)", FileHelper.supported_images.to_a)
|
||||
SiteSetting.migrate_to_new_scheme = true
|
||||
Jobs::MigrateUploadScheme.new.execute(nil)
|
||||
TEXT
|
||||
exit 1
|
||||
end
|
||||
|
||||
[image_uploads, file_uploads].each do |uploads|
|
||||
uploads.find_in_batches(batch_size: 100) do |batch|
|
||||
batch.each do |upload|
|
||||
now = Process.clock_gettime(Process::CLOCK_MONOTONIC)
|
||||
# remove invalid uploads
|
||||
if upload.url.blank?
|
||||
upload.destroy!
|
||||
next
|
||||
end
|
||||
# store the old url
|
||||
from = upload.url
|
||||
# retrieve the path to the local file
|
||||
path = local.path_for(upload)
|
||||
# make sure the file exists locally
|
||||
if !path || !File.exists?(path)
|
||||
puts "#{from} does not exist locally"
|
||||
next
|
||||
end
|
||||
unless GlobalSetting.use_s3?
|
||||
puts <<~TEXT
|
||||
Please provide the following environment variables
|
||||
- DISCOURSE_S3_BUCKET
|
||||
- DISCOURSE_S3_REGION
|
||||
- DISCOURSE_S3_ACCESS_KEY_ID
|
||||
- DISCOURSE_S3_SECRET_ACCESS_KEY
|
||||
TEXT
|
||||
exit 2
|
||||
end
|
||||
|
||||
begin
|
||||
file = File.open(path)
|
||||
content_type = `file --mime-type -b #{path}`.strip
|
||||
to = s3.store_upload(file, upload, content_type)
|
||||
rescue => e
|
||||
puts "Encountered an error while migrating #{upload.url}: #{e.class}: #{e.message}"
|
||||
next
|
||||
ensure
|
||||
file&.close
|
||||
end
|
||||
if SiteSetting.Upload.s3_cdn_url.blank?
|
||||
puts "Please provide the 'DISCOURSE_S3_CDN_URL' environment variable"
|
||||
exit 3
|
||||
end
|
||||
|
||||
# remap the URL
|
||||
DbHelper.remap(from, to, exclude_tables: exclude_tables)
|
||||
upload.optimized_images.destroy_all
|
||||
puts "Migrating #{from} --> #{to} took #{Process.clock_gettime(Process::CLOCK_MONOTONIC) - now} seconds"
|
||||
end
|
||||
s3 = Aws::S3::Client.new(
|
||||
region: GlobalSetting.s3_region,
|
||||
access_key_id: GlobalSetting.s3_access_key_id,
|
||||
secret_access_key: GlobalSetting.s3_secret_access_key,
|
||||
)
|
||||
|
||||
[
|
||||
Discourse.asset_host,
|
||||
Discourse.base_url_no_prefix
|
||||
].each do |from|
|
||||
now = Process.clock_gettime(Process::CLOCK_MONOTONIC)
|
||||
from = "#{from}#{SiteSetting.Upload.s3_base_url}"
|
||||
to = SiteSetting.s3_cdn_url
|
||||
DbHelper.remap(from, to, exclude_tables: exclude_tables)
|
||||
puts "Remapping #{from} --> #{to} took #{Process.clock_gettime(Process::CLOCK_MONOTONIC) - now} seconds"
|
||||
begin
|
||||
s3.head_bucket(bucket: GlobalSetting.s3_bucket)
|
||||
rescue Aws::S3::Errors::NotFound
|
||||
puts "Bucket '#{GlobalSetting.s3_bucket}' not found. Creating it..."
|
||||
s3.create_bucket(bucket: GlobalSetting.s3_bucket) unless dry_run
|
||||
end
|
||||
|
||||
puts "Uploading files to S3..."
|
||||
print " - Listing local files"
|
||||
|
||||
local_files = []
|
||||
IO.popen("cd public && find uploads/#{db}/original -type f").each do |file|
|
||||
local_files << file.chomp
|
||||
putc "." if local_files.size % 1000 == 0
|
||||
end
|
||||
|
||||
puts " => #{local_files.size} files"
|
||||
print " - Listing S3 files"
|
||||
|
||||
s3_objects = []
|
||||
prefix = Rails.configuration.multisite ? "#{db}/original/" : "original/"
|
||||
options = { bucket: GlobalSetting.s3_bucket, prefix: prefix }
|
||||
|
||||
loop do
|
||||
response = s3.list_objects_v2(options)
|
||||
s3_objects.concat(response.contents)
|
||||
putc "."
|
||||
break if response.next_continuation_token.blank?
|
||||
options[:continuation_token] = response.next_continuation_token
|
||||
end
|
||||
|
||||
puts " => #{s3_objects.size} files"
|
||||
print " - Syncing files to S3"
|
||||
|
||||
synced = 0
|
||||
failed = []
|
||||
|
||||
local_files.each do |file|
|
||||
path = File.join("public", file)
|
||||
name = File.basename(path)
|
||||
etag = Digest::MD5.file(path).hexdigest
|
||||
|
||||
if s3_object = s3_objects.find { |obj| file.ends_with?(obj.key) }
|
||||
next if File.size(path) == s3_object.size && s3_object.etag[etag]
|
||||
end
|
||||
|
||||
options = {
|
||||
acl: "public-read",
|
||||
body: File.open(path, "rb"),
|
||||
bucket: GlobalSetting.s3_bucket,
|
||||
content_type: MiniMime.lookup_by_filename(name)&.content_type,
|
||||
key: file[file.index(prefix)..-1],
|
||||
}
|
||||
|
||||
if !FileHelper.is_supported_image?(name)
|
||||
options[:content_disposition] = %Q{attachment; filename="#{name}"}
|
||||
end
|
||||
|
||||
if dry_run
|
||||
puts "#{file} => #{options[:key]}"
|
||||
synced += 1
|
||||
elsif s3.put_object(options).etag[etag]
|
||||
putc "."
|
||||
synced += 1
|
||||
else
|
||||
putc "X"
|
||||
failed << path
|
||||
end
|
||||
end
|
||||
|
||||
puts
|
||||
|
||||
if failed.size > 0
|
||||
puts "Failed to upload #{failed.size} files"
|
||||
puts failed.join("\n")
|
||||
elsif s3_objects.size + synced >= local_files.size
|
||||
puts "Updating the URLs in the database..."
|
||||
|
||||
excluded_tables = %w{
|
||||
email_logs
|
||||
incoming_emails
|
||||
notifications
|
||||
post_search_data
|
||||
search_logs
|
||||
stylesheet_cache
|
||||
user_auth_token_logs
|
||||
user_auth_tokens
|
||||
web_hooks_events
|
||||
}
|
||||
|
||||
from = "/uploads/#{db}/original/(\\dX/(?:[a-f0-9]/)*[a-f0-9]{40}[a-z0-9\\.]*)"
|
||||
to = "#{SiteSetting.Upload.s3_base_url}/#{prefix}\\1"
|
||||
|
||||
if dry_run
|
||||
puts "REPLACING '#{from}' WITH '#{to}'"
|
||||
else
|
||||
DbHelper.regexp_replace(from, to, excluded_tables: excluded_tables)
|
||||
end
|
||||
|
||||
# Uploads that were on base hostname will now be on S3 CDN
|
||||
from = "#{Discourse.base_url}#{SiteSetting.Upload.s3_base_url}"
|
||||
to = SiteSetting.Upload.s3_cdn_url
|
||||
|
||||
if dry_run
|
||||
puts "REMAPPING '#{from}' TO '#{to}'"
|
||||
else
|
||||
DbHelper.remap(from, to, excluded_tables: excluded_tables)
|
||||
end
|
||||
|
||||
if Discourse.asset_host.present?
|
||||
# Uploads that were on local CDN will now be on S3 CDN
|
||||
from = "#{Discourse.asset_host}#{SiteSetting.Upload.s3_base_url}"
|
||||
to = SiteSetting.Upload.s3_cdn_url
|
||||
|
||||
if dry_run
|
||||
puts "REMAPPING '#{from}' TO '#{to}'"
|
||||
else
|
||||
DbHelper.remap(from, to, excluded_tables: excluded_tables)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
puts "Done!"
|
||||
end
|
||||
|
||||
################################################################################
|
||||
# clean_up #
|
||||
# clean_up #
|
||||
################################################################################
|
||||
|
||||
task "uploads:clean_up" => :environment do
|
||||
if ENV["RAILS_DB"]
|
||||
clean_up_uploads
|
||||
else
|
||||
RailsMultisite::ConnectionManagement.each_connection { clean_up_uploads }
|
||||
end
|
||||
ENV["RAILS_DB"] ? clean_up_uploads : clean_up_uploads_all_sites
|
||||
end
|
||||
|
||||
def clean_up_uploads_all_sites
|
||||
RailsMultisite::ConnectionManagement.each_connection { clean_up_uploads }
|
||||
end
|
||||
|
||||
def clean_up_uploads
|
||||
|
|
|
@ -36,7 +36,7 @@ RSpec.describe DbHelper do
|
|||
it 'allows tables to be excluded from scanning' do
|
||||
post = Fabricate(:post, cooked: "test")
|
||||
|
||||
DbHelper.remap("test", "something else", exclude_tables: %w{posts})
|
||||
DbHelper.remap("test", "something else", excluded_tables: %w{posts})
|
||||
|
||||
expect(post.reload.cooked).to eq('test')
|
||||
end
|
||||
|
|
Loading…
Reference in New Issue