diff --git a/migrations/.gitignore b/migrations/.gitignore index e12bca7ffbf..376d1c8e92f 100644 --- a/migrations/.gitignore +++ b/migrations/.gitignore @@ -2,3 +2,5 @@ tmp/* Gemfile.lock + +/config/process_uploads.yml diff --git a/migrations/bin/process_uploads b/migrations/bin/process_uploads index d79fbb4cb4f..3114a2ab715 100755 --- a/migrations/bin/process_uploads +++ b/migrations/bin/process_uploads @@ -1,688 +1,7 @@ #!/usr/bin/env ruby # frozen_string_literal: true -require "etc" -require_relative "../lib/migrations" +require_relative "../lib/uploads/cli" -module Migrations - load_rails_environment - - load_gemfiles("common") - configure_zeitwerk("lib/common") - - class ProcessUploads - TRANSACTION_SIZE = 1000 - QUEUE_SIZE = 1000 - - def initialize(settings_path) - @settings = YAML.load_file(settings_path, symbolize_names: true) - @settings[:path_replacements] ||= [] - - @root_paths = @settings[:root_paths] - @output_db = create_connection(@settings[:output_db_path]) - - initialize_output_db - configure_site_settings - end - - def run - # disable logging for EXIFR which is used by ImageOptim - EXIFR.logger = Logger.new(nil) - - if @settings[:fix_missing] - @source_db = create_connection(@settings[:output_db_path]) - - puts "Fixing missing uploads..." - fix_missing - else - @source_db = create_connection(@settings[:source_db_path]) - - puts "Uploading uploads..." - upload_files - - puts "", "Creating optimized images..." - create_optimized_images if @settings[:create_optimized_images] - end - puts "" - ensure - close - end - - def upload_files - queue = SizedQueue.new(QUEUE_SIZE) - consumer_threads = [] - - if @settings[:delete_missing_uploads] - puts "Deleting missing uploads from output database..." - @output_db.execute(<<~SQL) - DELETE FROM uploads - WHERE upload IS NULL - SQL - end - - output_existing_ids = Set.new - query("SELECT id FROM uploads", @output_db).tap do |result_set| - result_set.each { |row| output_existing_ids << row["id"] } - result_set.close - end - - source_existing_ids = Set.new - query("SELECT id FROM uploads", @source_db).tap do |result_set| - result_set.each { |row| source_existing_ids << row["id"] } - result_set.close - end - - if (surplus_upload_ids = output_existing_ids - source_existing_ids).any? - if @settings[:delete_surplus_uploads] - puts "Deleting #{surplus_upload_ids.size} uploads from output database..." - - surplus_upload_ids.each_slice(TRANSACTION_SIZE) do |ids| - placeholders = (["?"] * ids.size).join(",") - @output_db.execute(<<~SQL, ids) - DELETE FROM uploads - WHERE id IN (#{placeholders}) - SQL - end - - output_existing_ids -= surplus_upload_ids - else - puts "Found #{surplus_upload_ids.size} surplus uploads in output database. " \ - "Run with `delete_surplus_uploads: true` to delete them." - end - - surplus_upload_ids = nil - end - - max_count = (source_existing_ids - output_existing_ids).size - source_existing_ids = nil - puts "Found #{output_existing_ids.size} existing uploads. #{max_count} are missing." - - producer_thread = - Thread.new do - query("SELECT * FROM uploads", @source_db).tap do |result_set| - result_set.each { |row| queue << row unless output_existing_ids.include?(row["id"]) } - result_set.close - end - end - - status_queue = SizedQueue.new(QUEUE_SIZE) - status_thread = - Thread.new do - error_count = 0 - skipped_count = 0 - current_count = 0 - - while !(params = status_queue.pop).nil? - begin - if params.delete(:skipped) == true - skipped_count += 1 - elsif (error_message = params.delete(:error)) || params[:upload].nil? - error_count += 1 - puts "", "Failed to create upload: #{params[:id]} (#{error_message})", "" - end - - @output_db.execute(<<~SQL, params) - INSERT INTO uploads (id, upload, markdown, skip_reason) - VALUES (:id, :upload, :markdown, :skip_reason) - SQL - rescue StandardError => e - puts "", "Failed to insert upload: #{params[:id]} (#{e.message}))", "" - error_count += 1 - end - - current_count += 1 - error_count_text = error_count > 0 ? "#{error_count} errors".red : "0 errors" - - print "\r%7d / %7d (%s, %d skipped)" % - [current_count, max_count, error_count_text, skipped_count] - end - end - - (Etc.nprocessors * @settings[:thread_count_factor]).to_i.times do |index| - consumer_threads << Thread.new do - Thread.current.name = "worker-#{index}" - - store = Discourse.store - - while (row = queue.pop) - begin - data_file = nil - path = nil - - if row["data"].present? - data_file = Tempfile.new("discourse-upload", binmode: true) - data_file.write(row["data"]) - data_file.rewind - path = data_file.path - else - relative_path = row["relative_path"] - file_exists = false - - @root_paths.each do |root_path| - path = File.join(root_path, relative_path, row["filename"]) - break if (file_exists = File.exist?(path)) - - @settings[:path_replacements].each do |from, to| - path = File.join(root_path, relative_path.sub(from, to), row["filename"]) - break if (file_exists = File.exist?(path)) - end - end - - if !file_exists - status_queue << { - id: row["id"], - upload: nil, - skipped: true, - skip_reason: "file not found", - } - next - end - end - - retry_count = 0 - - loop do - error_message = nil - upload = - copy_to_tempfile(path) do |file| - begin - UploadCreator.new( - file, - row["display_filename"] || row["filename"], - type: row["type"], - ).create_for(Discourse::SYSTEM_USER_ID) - rescue StandardError => e - error_message = e.message - nil - end - end - - if (upload_okay = upload.present? && upload.persisted? && upload.errors.blank?) - upload_path = add_multisite_prefix(store.get_path_for_upload(upload)) - - file_exists = - if store.external? - store.object_from_path(upload_path).exists? - else - File.exist?(File.join(store.public_dir, upload_path)) - end - - unless file_exists - upload.destroy - upload = nil - upload_okay = false - end - end - - if upload_okay - status_queue << { - id: row["id"], - upload: upload.attributes.to_json, - markdown: UploadMarkdown.new(upload).to_markdown, - skip_reason: nil, - } - break - elsif retry_count >= 3 - error_message ||= upload&.errors&.full_messages&.join(", ") || "unknown error" - status_queue << { - id: row["id"], - upload: nil, - markdown: nil, - error: "too many retries: #{error_message}", - skip_reason: "too many retries", - } - break - end - - retry_count += 1 - sleep 0.25 * retry_count - end - rescue StandardError => e - status_queue << { - id: row["id"], - upload: nil, - markdown: nil, - error: e.message, - skip_reason: "error", - } - ensure - data_file&.close! - end - end - end - end - - producer_thread.join - queue.close - consumer_threads.each(&:join) - status_queue.close - status_thread.join - end - - def fix_missing - queue = SizedQueue.new(QUEUE_SIZE) - consumer_threads = [] - - max_count = - @source_db.get_first_value("SELECT COUNT(*) FROM uploads WHERE upload IS NOT NULL") - - producer_thread = - Thread.new do - query( - "SELECT id, upload FROM uploads WHERE upload IS NOT NULL ORDER BY rowid DESC", - @source_db, - ).tap do |result_set| - result_set.each { |row| queue << row } - result_set.close - end - end - - status_queue = SizedQueue.new(QUEUE_SIZE) - status_thread = - Thread.new do - error_count = 0 - current_count = 0 - missing_count = 0 - - while !(result = status_queue.pop).nil? - current_count += 1 - - case result[:status] - when :ok - # ignore - when :error - error_count += 1 - puts "Error in #{result[:id]}" - when :missing - missing_count += 1 - puts "Missing #{result[:id]}" - - @output_db.execute("DELETE FROM uploads WHERE id = ?", result[:id]) - Upload.delete_by(id: result[:upload_id]) - end - - error_count_text = error_count > 0 ? "#{error_count} errors".red : "0 errors" - - print "\r%7d / %7d (%s, %s missing)" % - [current_count, max_count, error_count_text, missing_count] - end - end - - store = Discourse.store - - (Etc.nprocessors * @settings[:thread_count_factor] * 2).to_i.times do |index| - consumer_threads << Thread.new do - Thread.current.name = "worker-#{index}" - fake_upload = OpenStruct.new(url: "") - while (row = queue.pop) - begin - upload = JSON.parse(row["upload"]) - fake_upload.url = upload["url"] - path = add_multisite_prefix(store.get_path_for_upload(fake_upload)) - - file_exists = - if store.external? - store.object_from_path(path).exists? - else - File.exist?(File.join(store.public_dir, path)) - end - - if file_exists - status_queue << { id: row["id"], upload_id: upload["id"], status: :ok } - else - status_queue << { id: row["id"], upload_id: upload["id"], status: :missing } - end - rescue StandardError => e - puts e.message - status_queue << { id: row["id"], upload_id: upload["id"], status: :error } - end - end - end - end - - producer_thread.join - queue.close - consumer_threads.each(&:join) - status_queue.close - status_thread.join - end - - def create_optimized_images - init_threads = [] - optimized_upload_ids = Set.new - post_upload_ids = Set.new - avatar_upload_ids = Set.new - max_count = 0 - - # allow more than 1 thread to optimized images at the same time - OptimizedImage.lock_per_machine = false - - init_threads << Thread.new do - query("SELECT id FROM optimized_images", @output_db).tap do |result_set| - result_set.each { |row| optimized_upload_ids << row["id"] } - result_set.close - end - end - - init_threads << Thread.new do - sql = <<~SQL - SELECT upload_ids - FROM posts - WHERE upload_ids IS NOT NULL - SQL - query(sql, @source_db).tap do |result_set| - result_set.each do |row| - JSON.parse(row["upload_ids"]).each { |id| post_upload_ids << id } - end - result_set.close - end - end - - init_threads << Thread.new do - sql = <<~SQL - SELECT avatar_upload_id - FROM users - WHERE avatar_upload_id IS NOT NULL - SQL - query(sql, @source_db).tap do |result_set| - result_set.each { |row| avatar_upload_ids << row["avatar_upload_id"] } - result_set.close - end - end - - init_threads << Thread.new do - max_count = - @output_db.get_first_value("SELECT COUNT(*) FROM uploads WHERE upload IS NOT NULL") - end - - init_threads.each(&:join) - - status_queue = SizedQueue.new(QUEUE_SIZE) - status_thread = - Thread.new do - error_count = 0 - current_count = 0 - skipped_count = 0 - - while !(params = status_queue.pop).nil? - current_count += 1 - - case params.delete(:status) - when :ok - @output_db.execute(<<~SQL, params) - INSERT INTO optimized_images (id, optimized_images) - VALUES (:id, :optimized_images) - SQL - when :error - error_count += 1 - when :skipped - skipped_count += 1 - end - - error_count_text = error_count > 0 ? "#{error_count} errors".red : "0 errors" - - print "\r%7d / %7d (%s, %d skipped)" % - [current_count, max_count, error_count_text, skipped_count] - end - end - - queue = SizedQueue.new(QUEUE_SIZE) - consumer_threads = [] - - producer_thread = - Thread.new do - sql = <<~SQL - SELECT id AS upload_id, upload ->> 'sha1' AS upload_sha1, markdown - FROM uploads - WHERE upload IS NOT NULL - ORDER BY rowid - SQL - - query(sql, @output_db).tap do |result_set| - result_set.each do |row| - upload_id = row["upload_id"] - - if optimized_upload_ids.include?(upload_id) || !row["markdown"].start_with?("![") - status_queue << { id: row["upload_id"], status: :skipped } - next - end - - if post_upload_ids.include?(upload_id) - row["type"] = "post" - queue << row - elsif avatar_upload_ids.include?(upload_id) - row["type"] = "avatar" - queue << row - else - status_queue << { id: row["upload_id"], status: :skipped } - end - end - result_set.close - end - end - - avatar_sizes = Discourse.avatar_sizes - store = Discourse.store - remote_factor = store.external? ? 2 : 1 - - Jobs.run_immediately! - - (Etc.nprocessors * @settings[:thread_count_factor] * remote_factor).to_i.times do |index| - consumer_threads << Thread.new do - Thread.current.name = "worker-#{index}" - - post = - PostCreator.new( - Discourse.system_user, - raw: "Topic created by uploads_importer", - acting_user: Discourse.system_user, - skip_validations: true, - title: "Topic created by uploads_importer - #{SecureRandom.hex}", - archetype: Archetype.default, - category: Category.last.id, - ).create! - - while (row = queue.pop) - retry_count = 0 - - loop do - upload = Upload.find_by(sha1: row["upload_sha1"]) - - optimized_images = - begin - case row["type"] - when "post" - post.update_columns(baked_at: nil, cooked: "", raw: row["markdown"]) - post.reload - post.rebake! - OptimizedImage.where(upload_id: upload.id).to_a - when "avatar" - avatar_sizes.map { |size| OptimizedImage.create_for(upload, size, size) } - end - rescue StandardError => e - puts e.message - puts e.stacktrace - nil - end - - begin - if optimized_images.present? - optimized_images.map! do |optimized_image| - next unless optimized_image.present? - optimized_image_path = - add_multisite_prefix(store.get_path_for_optimized_image(optimized_image)) - - file_exists = - if store.external? - store.object_from_path(optimized_image_path).exists? - else - File.exist?(File.join(store.public_dir, optimized_image_path)) - end - - unless file_exists - optimized_image.destroy - optimized_image = nil - end - - optimized_image - end - end - rescue StandardError - optimized_images = nil - end - - optimized_images_okay = - !optimized_images.nil? && optimized_images.all?(&:present?) && - optimized_images.all?(&:persisted?) && - optimized_images.all? { |o| o.errors.blank? } - - if optimized_images_okay - status_queue << { - id: row["upload_id"], - optimized_images: optimized_images.presence&.to_json, - status: :ok, - } - break - elsif retry_count >= 3 - status_queue << { id: row["upload_id"], status: :error } - break - end - - retry_count += 1 - sleep 0.25 * retry_count - end - end - end - end - - producer_thread.join - queue.close - consumer_threads.each(&:join) - status_queue.close - status_thread.join - end - - private - - def create_connection(path) - ::Migrations::IntermediateDatabase.create_connection(path: path) - end - - def query(sql, db) - db.prepare(sql).execute - end - - def initialize_output_db - @statement_counter = 0 - - @output_db.execute(<<~SQL) - CREATE TABLE IF NOT EXISTS uploads ( - id TEXT PRIMARY KEY NOT NULL, - upload JSON_TEXT, - markdown TEXT, - skip_reason TEXT - ) - SQL - - @output_db.execute(<<~SQL) - CREATE TABLE IF NOT EXISTS optimized_images ( - id TEXT PRIMARY KEY NOT NULL, - optimized_images JSON_TEXT - ) - SQL - end - - def insert(sql, bind_vars = []) - @output_db.transaction if @statement_counter == 0 - @output_db.execute(sql, bind_vars) - - if (@statement_counter += 1) > TRANSACTION_SIZE - @output_db.commit - @statement_counter = 0 - end - end - - def close - @source_db.close if @source_db - - if @output_db - @output_db.commit if @output_db.transaction_active? - @output_db.close - end - end - - def copy_to_tempfile(source_path) - extension = File.extname(source_path) - - Tempfile.open(["discourse-upload", extension]) do |tmpfile| - File.open(source_path, "rb") { |source_stream| IO.copy_stream(source_stream, tmpfile) } - tmpfile.rewind - yield(tmpfile) - end - end - - def configure_site_settings - settings = @settings[:site_settings] - - SiteSetting.clean_up_uploads = false - SiteSetting.authorized_extensions = settings[:authorized_extensions] - SiteSetting.max_attachment_size_kb = settings[:max_attachment_size_kb] - SiteSetting.max_image_size_kb = settings[:max_image_size_kb] - - if settings[:multisite] - # rubocop:disable Discourse/NoDirectMultisiteManipulation - Rails.configuration.multisite = true - # rubocop:enable Discourse/NoDirectMultisiteManipulation - - RailsMultisite::ConnectionManagement.class_eval do - def self.current_db_override=(value) - @current_db_override = value - end - def self.current_db - @current_db_override - end - end - RailsMultisite::ConnectionManagement.current_db_override = settings[:multisite_db_name] - end - - if settings[:enable_s3_uploads] - SiteSetting.s3_access_key_id = settings[:s3_access_key_id] - SiteSetting.s3_secret_access_key = settings[:s3_secret_access_key] - SiteSetting.s3_upload_bucket = settings[:s3_upload_bucket] - SiteSetting.s3_region = settings[:s3_region] - SiteSetting.s3_cdn_url = settings[:s3_cdn_url] - SiteSetting.enable_s3_uploads = true - - raise "Failed to enable S3 uploads" if SiteSetting.enable_s3_uploads != true - - Tempfile.open("discourse-s3-test") do |tmpfile| - tmpfile.write("test") - tmpfile.rewind - - upload = - UploadCreator.new(tmpfile, "discourse-s3-test.txt").create_for( - Discourse::SYSTEM_USER_ID, - ) - - unless upload.present? && upload.persisted? && upload.errors.blank? && - upload.url.start_with?("//") - raise "Failed to upload to S3" - end - - upload.destroy - end - end - end - - def add_multisite_prefix(path) - if Rails.configuration.multisite - File.join("uploads", RailsMultisite::ConnectionManagement.current_db, path) - else - path - end - end - end -end - -# ./migrations/bin/process_uploads migrations/config/process_uploads.yml -Migrations::ProcessUploads.new(ARGV.first).run +# ./migrations/bin/process_uploads [--settings=migrations/config/process_uploads.yml] +Migrations::Uploads::CLI.start(ARGV) diff --git a/migrations/config/process_uploads.yml b/migrations/config/process_uploads.yml.sample similarity index 100% rename from migrations/config/process_uploads.yml rename to migrations/config/process_uploads.yml.sample diff --git a/migrations/lib/uploads/base.rb b/migrations/lib/uploads/base.rb new file mode 100644 index 00000000000..480ada8b933 --- /dev/null +++ b/migrations/lib/uploads/base.rb @@ -0,0 +1,26 @@ +# frozen_string_literal: true + +require "etc" +require "sqlite3" + +module Migrations + module Uploads + class Base + TRANSACTION_SIZE = 1000 + QUEUE_SIZE = 1000 + + # TODO: Use IntermediateDatabase instead + def create_connection(path) + sqlite = SQLite3::Database.new(path, results_as_hash: true) + sqlite.busy_timeout = 60_000 # 60 seconds + sqlite.journal_mode = "WAL" + sqlite.synchronous = "off" + sqlite + end + + def query(sql, db) + db.prepare(sql).execute + end + end + end +end diff --git a/migrations/lib/uploads/cli.rb b/migrations/lib/uploads/cli.rb new file mode 100644 index 00000000000..90024cc500b --- /dev/null +++ b/migrations/lib/uploads/cli.rb @@ -0,0 +1,67 @@ +# frozen_string_literal: true + +require_relative "../migrations" +require_relative "./settings" +require_relative "./fixer" +require_relative "./uploader" +require_relative "./optimizer" + +module Migrations + load_rails_environment + + load_gemfiles("common") + configure_zeitwerk("lib/common") + + module Uploads + class CLI < Thor + default_task :execute + + class_option :settings, + type: :string, + aliases: "-s", + default: "./migrations/config/process_uploads.yml", + banner: "SETTINGS_FILE", + desc: "Upload settings file" + + def initialize(*args) + super + + EXIFR.logger = Logger.new(nil) + @settings = Settings.from_file(options[:settings]) + end + + def self.exit_on_failure? + true + end + + desc "execute [--settings=SETTINGS_FILE]", "Process uploads" + def execute + return run_fixer! if @settings[:fix_missing] + + Uploader.run!(@settings) + + run_optimizer! if @settings[:create_optimized_images] + end + + desc "fix-missing [--settings=SETTINGS_FILE]", "Fix missing uploads" + def fix_missing + run_fixer! + end + + desc "optimize [--settings=SETTINGS_FILE]", "Optimize uploads" + def optimize + run_optimize! + end + + private + + def run_fixer! + Fixer.run!(@settings) + end + + def run_optimizer! + Optimizer.run!(@settings) + end + end + end +end diff --git a/migrations/lib/uploads/fixer.rb b/migrations/lib/uploads/fixer.rb new file mode 100644 index 00000000000..fce8ac36036 --- /dev/null +++ b/migrations/lib/uploads/fixer.rb @@ -0,0 +1,110 @@ +# frozen_string_literal: true + +require_relative "./base" + +module Migrations + module Uploads + class Fixer < Base + def initialize(settings) + @settings = settings + + @source_db = create_connection(settings[:output_db_path]) + end + + def self.run!(settings) + puts "Fixing missing uploads..." + + new(settings).run! + end + + def run! + queue = SizedQueue.new(QUEUE_SIZE) + consumer_threads = [] + + max_count = + @source_db.get_first_value("SELECT COUNT(*) FROM uploads WHERE upload IS NOT NULL") + + binding + producer_thread = + Thread.new do + query( + "SELECT id, upload FROM uploads WHERE upload IS NOT NULL ORDER BY rowid DESC", + @source_db, + ).tap do |result_set| + result_set.each { |row| queue << row } + result_set.close + end + end + + status_queue = SizedQueue.new(QUEUE_SIZE) + status_thread = + Thread.new do + error_count = 0 + current_count = 0 + missing_count = 0 + + while !(result = status_queue.pop).nil? + current_count += 1 + + case result[:status] + when :ok + # ignore + when :error + error_count += 1 + puts "Error in #{result[:id]}" + when :missing + missing_count += 1 + puts "Missing #{result[:id]}" + + @output_db.execute("DELETE FROM uploads WHERE id = ?", result[:id]) + Upload.delete_by(id: result[:upload_id]) + end + + error_count_text = error_count > 0 ? "#{error_count} errors".red : "0 errors" + + print "\r%7d / %7d (%s, %s missing)" % + [current_count, max_count, error_count_text, missing_count] + end + end + + store = Discourse.store + + (Etc.nprocessors * @settings[:thread_count_factor] * 2).to_i.times do |index| + consumer_threads << Thread.new do + Thread.current.name = "worker-#{index}" + fake_upload = OpenStruct.new(url: "") + while (row = queue.pop) + begin + upload = JSON.parse(row["upload"]) + fake_upload.url = upload["url"] + path = add_multisite_prefix(store.get_path_for_upload(fake_upload)) + + file_exists = + if store.external? + store.object_from_path(path).exists? + else + File.exist?(File.join(store.public_dir, path)) + end + + if file_exists + status_queue << { id: row["id"], upload_id: upload["id"], status: :ok } + else + status_queue << { id: row["id"], upload_id: upload["id"], status: :missing } + end + rescue StandardError => e + puts e.message + status_queue << { id: row["id"], upload_id: upload["id"], status: :error } + end + end + end + end + + producer_thread.join + queue.close + consumer_threads.each(&:join) + status_queue.close + status_thread.join + end + end + end +end diff --git a/migrations/lib/uploads/optimizer.rb b/migrations/lib/uploads/optimizer.rb new file mode 100644 index 00000000000..7b25db44b76 --- /dev/null +++ b/migrations/lib/uploads/optimizer.rb @@ -0,0 +1,237 @@ +# frozen_string_literal: true + +require_relative "./base" + +module Migrations + module Uploads + class Optimizer < Base + def initialize(settings) + @settings = settings + + @source_db = create_connection(@settings[:source_db_path]) + @output_db = settings.output_db + end + + def self.run!(settings) + puts "Creating optimized images..." + + new(settings).run! + end + + def run! + init_threads = [] + optimized_upload_ids = Set.new + post_upload_ids = Set.new + avatar_upload_ids = Set.new + max_count = 0 + + # allow more than 1 thread to optimized images at the same time + OptimizedImage.lock_per_machine = false + + init_threads << Thread.new do + query("SELECT id FROM optimized_images", @output_db).tap do |result_set| + result_set.each { |row| optimized_upload_ids << row["id"] } + result_set.close + end + end + + init_threads << Thread.new do + sql = <<~SQL + SELECT upload_ids + FROM posts + WHERE upload_ids IS NOT NULL + SQL + query(sql, @source_db).tap do |result_set| + result_set.each do |row| + JSON.parse(row["upload_ids"]).each { |id| post_upload_ids << id } + end + result_set.close + end + end + + init_threads << Thread.new do + sql = <<~SQL + SELECT avatar_upload_id + FROM users + WHERE avatar_upload_id IS NOT NULL + SQL + query(sql, @source_db).tap do |result_set| + result_set.each { |row| avatar_upload_ids << row["avatar_upload_id"] } + result_set.close + end + end + + init_threads << Thread.new do + max_count = + @output_db.get_first_value("SELECT COUNT(*) FROM uploads WHERE upload IS NOT NULL") + end + + init_threads.each(&:join) + + status_queue = SizedQueue.new(QUEUE_SIZE) + status_thread = + Thread.new do + error_count = 0 + current_count = 0 + skipped_count = 0 + + while !(params = status_queue.pop).nil? + current_count += 1 + + case params.delete(:status) + when :ok + @output_db.execute(<<~SQL, params) + INSERT INTO optimized_images (id, optimized_images) + VALUES (:id, :optimized_images) + SQL + when :error + error_count += 1 + when :skipped + skipped_count += 1 + end + + error_count_text = error_count > 0 ? "#{error_count} errors".red : "0 errors" + + print "\r%7d / %7d (%s, %d skipped)" % + [current_count, max_count, error_count_text, skipped_count] + end + end + + queue = SizedQueue.new(QUEUE_SIZE) + consumer_threads = [] + + producer_thread = + Thread.new do + sql = <<~SQL + SELECT id AS upload_id, upload ->> 'sha1' AS upload_sha1, markdown + FROM uploads + WHERE upload IS NOT NULL + ORDER BY rowid + SQL + + query(sql, @output_db).tap do |result_set| + result_set.each do |row| + upload_id = row["upload_id"] + + if optimized_upload_ids.include?(upload_id) || !row["markdown"].start_with?("![") + status_queue << { id: row["upload_id"], status: :skipped } + next + end + + if post_upload_ids.include?(upload_id) + row["type"] = "post" + queue << row + elsif avatar_upload_ids.include?(upload_id) + row["type"] = "avatar" + queue << row + else + status_queue << { id: row["upload_id"], status: :skipped } + end + end + result_set.close + end + end + + avatar_sizes = Discourse.avatar_sizes + store = Discourse.store + remote_factor = store.external? ? 2 : 1 + + Jobs.run_immediately! + + (Etc.nprocessors * @settings[:thread_count_factor] * remote_factor).to_i.times do |index| + consumer_threads << Thread.new do + Thread.current.name = "worker-#{index}" + + post = + PostCreator.new( + Discourse.system_user, + raw: "Topic created by uploads_importer", + acting_user: Discourse.system_user, + skip_validations: true, + title: "Topic created by uploads_importer - #{SecureRandom.hex}", + archetype: Archetype.default, + category: Category.last.id, + ).create! + + while (row = queue.pop) + retry_count = 0 + + loop do + upload = Upload.find_by(sha1: row["upload_sha1"]) + + optimized_images = + begin + case row["type"] + when "post" + post.update_columns(baked_at: nil, cooked: "", raw: row["markdown"]) + post.reload + post.rebake! + OptimizedImage.where(upload_id: upload.id).to_a + when "avatar" + avatar_sizes.map { |size| OptimizedImage.create_for(upload, size, size) } + end + rescue StandardError => e + puts e.message + puts e.stacktrace + nil + end + + begin + if optimized_images.present? + optimized_images.map! do |optimized_image| + next unless optimized_image.present? + optimized_image_path = + add_multisite_prefix(store.get_path_for_optimized_image(optimized_image)) + + file_exists = + if store.external? + store.object_from_path(optimized_image_path).exists? + else + File.exist?(File.join(store.public_dir, optimized_image_path)) + end + + unless file_exists + optimized_image.destroy + optimized_image = nil + end + + optimized_image + end + end + rescue StandardError + optimized_images = nil + end + + optimized_images_okay = + !optimized_images.nil? && optimized_images.all?(&:present?) && + optimized_images.all?(&:persisted?) && + optimized_images.all? { |o| o.errors.blank? } + + if optimized_images_okay + status_queue << { + id: row["upload_id"], + optimized_images: optimized_images.presence&.to_json, + status: :ok, + } + break + elsif retry_count >= 3 + status_queue << { id: row["upload_id"], status: :error } + break + end + + retry_count += 1 + sleep 0.25 * retry_count + end + end + end + end + + producer_thread.join + queue.close + consumer_threads.each(&:join) + status_queue.close + status_thread.join + end + end + end +end diff --git a/migrations/lib/uploads/settings.rb b/migrations/lib/uploads/settings.rb new file mode 100644 index 00000000000..1e028a3c1f8 --- /dev/null +++ b/migrations/lib/uploads/settings.rb @@ -0,0 +1,116 @@ +# frozen_string_literal: true + +require "sqlite3" + +module Migrations + module Uploads + class Settings + attr_reader :config, :output_db + + def initialize(options) + options[:path_replacements] ||= [] + + @root_paths = options[:root_paths] + @output_db = create_connection(options[:output_db_path]) + @options = options + + initialize_output_db + configure_site_settings + end + + def self.from_file(path) + new(YAML.load_file(path, symbolize_names: true)) + end + + # TODO: compare against dynamically defining getter methods for + # each top-level setting + def [](key) + @options[key] + end + + private + + # TODO: Use IntermediateDatabase instead + def create_connection(path) + sqlite = SQLite3::Database.new(path, results_as_hash: true) + sqlite.busy_timeout = 60_000 # 60 seconds + sqlite.journal_mode = "WAL" + sqlite.synchronous = "off" + sqlite + end + + def initialize_output_db + @statement_counter = 0 + + @output_db.execute(<<~SQL) + CREATE TABLE IF NOT EXISTS uploads ( + id TEXT PRIMARY KEY NOT NULL, + upload JSON_TEXT, + markdown TEXT, + skip_reason TEXT + ) + SQL + + @output_db.execute(<<~SQL) + CREATE TABLE IF NOT EXISTS optimized_images ( + id TEXT PRIMARY KEY NOT NULL, + optimized_images JSON_TEXT + ) + SQL + end + + def configure_site_settings + settings = @options[:site_settings] + + SiteSetting.clean_up_uploads = false + SiteSetting.authorized_extensions = settings[:authorized_extensions] + SiteSetting.max_attachment_size_kb = settings[:max_attachment_size_kb] + SiteSetting.max_image_size_kb = settings[:max_image_size_kb] + + if settings[:multisite] + # rubocop:disable Discourse/NoDirectMultisiteManipulation + Rails.configuration.multisite = true + # rubocop:enable Discourse/NoDirectMultisiteManipulation + + RailsMultisite::ConnectionManagement.class_eval do + def self.current_db_override=(value) + @current_db_override = value + end + def self.current_db + @current_db_override + end + end + RailsMultisite::ConnectionManagement.current_db_override = settings[:multisite_db_name] + end + + if settings[:enable_s3_uploads] + SiteSetting.s3_access_key_id = settings[:s3_access_key_id] + SiteSetting.s3_secret_access_key = settings[:s3_secret_access_key] + SiteSetting.s3_upload_bucket = settings[:s3_upload_bucket] + SiteSetting.s3_region = settings[:s3_region] + SiteSetting.s3_cdn_url = settings[:s3_cdn_url] + SiteSetting.enable_s3_uploads = true + + raise "Failed to enable S3 uploads" if SiteSetting.enable_s3_uploads != true + + Tempfile.open("discourse-s3-test") do |tmpfile| + tmpfile.write("test") + tmpfile.rewind + + upload = + UploadCreator.new(tmpfile, "discourse-s3-test.txt").create_for( + Discourse::SYSTEM_USER_ID, + ) + + unless upload.present? && upload.persisted? && upload.errors.blank? && + upload.url.start_with?("//") + raise "Failed to upload to S3" + end + + upload.destroy + end + end + end + end + end +end diff --git a/migrations/lib/uploads/uploader.rb b/migrations/lib/uploads/uploader.rb new file mode 100644 index 00000000000..b7db3f3dca4 --- /dev/null +++ b/migrations/lib/uploads/uploader.rb @@ -0,0 +1,233 @@ +# frozen_string_literal: true + +require_relative "./base" + +module Migrations + module Uploads + class Uploader < Base + def initialize(settings) + @settings = settings + + @source_db = create_connection(@settings[:source_db_path]) + @output_db = settings.output_db + end + + def self.run!(settings) + puts "Uploading uploads..." + + new(settings).run! + end + + def run! + queue = SizedQueue.new(QUEUE_SIZE) + consumer_threads = [] + + if @settings[:delete_missing_uploads] + puts "Deleting missing uploads from output database..." + @output_db.execute(<<~SQL) + DELETE FROM uploads + WHERE upload IS NULL + SQL + end + + output_existing_ids = Set.new + query("SELECT id FROM uploads", @output_db).tap do |result_set| + result_set.each { |row| output_existing_ids << row["id"] } + result_set.close + end + + source_existing_ids = Set.new + query("SELECT id FROM uploads", @source_db).tap do |result_set| + result_set.each { |row| source_existing_ids << row["id"] } + result_set.close + end + + if (surplus_upload_ids = output_existing_ids - source_existing_ids).any? + if @settings[:delete_surplus_uploads] + puts "Deleting #{surplus_upload_ids.size} uploads from output database..." + + surplus_upload_ids.each_slice(TRANSACTION_SIZE) do |ids| + placeholders = (["?"] * ids.size).join(",") + @output_db.execute(<<~SQL, ids) + DELETE FROM uploads + WHERE id IN (#{placeholders}) + SQL + end + + output_existing_ids -= surplus_upload_ids + else + puts "Found #{surplus_upload_ids.size} surplus uploads in output database. " \ + "Run with `delete_surplus_uploads: true` to delete them." + end + + surplus_upload_ids = nil + end + + max_count = (source_existing_ids - output_existing_ids).size + source_existing_ids = nil + puts "Found #{output_existing_ids.size} existing uploads. #{max_count} are missing." + + producer_thread = + Thread.new do + query("SELECT * FROM uploads", @source_db).tap do |result_set| + result_set.each { |row| queue << row unless output_existing_ids.include?(row["id"]) } + result_set.close + end + end + + status_queue = SizedQueue.new(QUEUE_SIZE) + status_thread = + Thread.new do + error_count = 0 + skipped_count = 0 + current_count = 0 + + while !(params = status_queue.pop).nil? + begin + if params.delete(:skipped) == true + skipped_count += 1 + elsif (error_message = params.delete(:error)) || params[:upload].nil? + error_count += 1 + puts "", "Failed to create upload: #{params[:id]} (#{error_message})", "" + end + + @output_db.execute(<<~SQL, params) + INSERT INTO uploads (id, upload, markdown, skip_reason) + VALUES (:id, :upload, :markdown, :skip_reason) + SQL + rescue StandardError => e + puts "", "Failed to insert upload: #{params[:id]} (#{e.message}))", "" + error_count += 1 + end + + current_count += 1 + error_count_text = error_count > 0 ? "#{error_count} errors".red : "0 errors" + + print "\r%7d / %7d (%s, %d skipped)" % + [current_count, max_count, error_count_text, skipped_count] + end + end + + (Etc.nprocessors * @settings[:thread_count_factor]).to_i.times do |index| + consumer_threads << Thread.new do + Thread.current.name = "worker-#{index}" + + store = Discourse.store + + while (row = queue.pop) + begin + data_file = nil + path = nil + + if row["data"].present? + data_file = Tempfile.new("discourse-upload", binmode: true) + data_file.write(row["data"]) + data_file.rewind + path = data_file.path + else + relative_path = row["relative_path"] + file_exists = false + + @root_paths.each do |root_path| + path = File.join(root_path, relative_path, row["filename"]) + break if (file_exists = File.exist?(path)) + + @settings[:path_replacements].each do |from, to| + path = File.join(root_path, relative_path.sub(from, to), row["filename"]) + break if (file_exists = File.exist?(path)) + end + end + + if !file_exists + status_queue << { + id: row["id"], + upload: nil, + skipped: true, + skip_reason: "file not found", + } + next + end + end + + retry_count = 0 + + loop do + error_message = nil + upload = + copy_to_tempfile(path) do |file| + begin + UploadCreator.new( + file, + row["display_filename"] || row["filename"], + type: row["type"], + ).create_for(Discourse::SYSTEM_USER_ID) + rescue StandardError => e + error_message = e.message + nil + end + end + + if (upload_okay = upload.present? && upload.persisted? && upload.errors.blank?) + upload_path = add_multisite_prefix(store.get_path_for_upload(upload)) + + file_exists = + if store.external? + store.object_from_path(upload_path).exists? + else + File.exist?(File.join(store.public_dir, upload_path)) + end + + unless file_exists + upload.destroy + upload = nil + upload_okay = false + end + end + + if upload_okay + status_queue << { + id: row["id"], + upload: upload.attributes.to_json, + markdown: UploadMarkdown.new(upload).to_markdown, + skip_reason: nil, + } + break + elsif retry_count >= 3 + error_message ||= upload&.errors&.full_messages&.join(", ") || "unknown error" + status_queue << { + id: row["id"], + upload: nil, + markdown: nil, + error: "too many retries: #{error_message}", + skip_reason: "too many retries", + } + break + end + + retry_count += 1 + sleep 0.25 * retry_count + end + rescue StandardError => e + status_queue << { + id: row["id"], + upload: nil, + markdown: nil, + error: e.message, + skip_reason: "error", + } + ensure + data_file&.close! + end + end + end + end + + producer_thread.join + queue.close + consumer_threads.each(&:join) + status_queue.close + status_thread.join + end + end + end +end