From d725b3ca9e6c4dee1313f733716f11374a8396b0 Mon Sep 17 00:00:00 2001 From: Gerhard Schlager Date: Sun, 10 Dec 2023 22:38:15 +0100 Subject: [PATCH] DEV: Add script for preprocessing uploads as part of a migration This script preprocesses all uploads within a intermediate DB (output of converters) and uploads those files to S3. It does the same for optimized images. This speeds up migrations when you have to run them multiple times, because you only have to preprocess and upload the files once. This script is very hacky and mostly undocumented for now. That will change in the future. --- app/models/optimized_image.rb | 4 +- script/bulk_import/uploads_importer.rb | 666 ++++++++++++++++++++++++ script/bulk_import/uploads_importer.yml | 43 ++ 3 files changed, 712 insertions(+), 1 deletion(-) create mode 100644 script/bulk_import/uploads_importer.rb create mode 100644 script/bulk_import/uploads_importer.yml diff --git a/app/models/optimized_image.rb b/app/models/optimized_image.rb index ec1827e36c6..2041a1669b8 100644 --- a/app/models/optimized_image.rb +++ b/app/models/optimized_image.rb @@ -192,7 +192,9 @@ class OptimizedImage < ActiveRecord::Base extension = File.extname(opts[:filename] || ext_path || path)[1..-1] end - raise Discourse::InvalidAccess if !extension || !extension.match?(IM_DECODERS) + if !extension || !extension.match?(IM_DECODERS) + raise Discourse::InvalidAccess.new("Unsupported extension: #{extension}") + end "#{extension}:#{path}" end diff --git a/script/bulk_import/uploads_importer.rb b/script/bulk_import/uploads_importer.rb new file mode 100644 index 00000000000..99e1cd42cca --- /dev/null +++ b/script/bulk_import/uploads_importer.rb @@ -0,0 +1,666 @@ +# frozen_string_literal: true +puts "Loading application..." +require_relative "../../config/environment" + +require "etc" +require "sqlite3" +require "colored2" + +# hack so that OptimizedImage.lock beliefs that it's running in a Sidekiq job +module Sidekiq + def self.server? + true + end +end + +module BulkImport + class UploadsImporter + TRANSACTION_SIZE = 1000 + QUEUE_SIZE = 1000 + + def initialize(settings_path) + @settings = YAML.load_file(settings_path, symbolize_names: true) + @settings[:path_replacements] ||= [] + + @root_paths = @settings[:root_paths] + @output_db = create_connection(@settings[:output_db_path]) + + initialize_output_db + configure_site_settings + end + + def run + # disable logging for EXIFR which is used by ImageOptim + EXIFR.logger = Logger.new(nil) + + if @settings[:fix_missing] + @source_db = create_connection(@settings[:output_db_path]) + + puts "Fixing missing uploads..." + fix_missing + else + @source_db = create_connection(@settings[:source_db_path]) + + puts "Uploading uploads..." + upload_files + + puts "", "Creating optimized images..." + create_optimized_images if @settings[:create_optimized_images] + end + puts "" + ensure + close + end + + def upload_files + queue = SizedQueue.new(QUEUE_SIZE) + consumer_threads = [] + + if @settings[:delete_missing_uploads] + puts "Deleting missing uploads from output database..." + @output_db.execute(<<~SQL) + DELETE FROM uploads + WHERE upload IS NULL + SQL + end + + output_existing_ids = Set.new + query("SELECT id FROM uploads", @output_db).tap do |result_set| + result_set.each { |row| output_existing_ids << row["id"] } + result_set.close + end + + source_existing_ids = Set.new + query("SELECT id FROM uploads", @source_db).tap do |result_set| + result_set.each { |row| source_existing_ids << row["id"] } + result_set.close + end + + if (surplus_upload_ids = output_existing_ids - source_existing_ids).any? + if @settings[:delete_surplus_uploads] + puts "Deleting #{surplus_upload_ids.size} uploads from output database..." + + surplus_upload_ids.each_slice(TRANSACTION_SIZE) do |ids| + placeholders = (["?"] * ids.size).join(",") + @output_db.execute(<<~SQL, ids) + DELETE FROM uploads + WHERE id IN (#{placeholders}) + SQL + end + + output_existing_ids -= surplus_upload_ids + else + puts "Found #{surplus_upload_ids.size} surplus uploads in output database. " \ + "Run with `delete_surplus_uploads: true` to delete them." + end + + surplus_upload_ids = nil + end + + max_count = (source_existing_ids - output_existing_ids).size + source_existing_ids = nil + puts "Found #{output_existing_ids.size} existing uploads. #{max_count} are missing." + + producer_thread = + Thread.new do + query("SELECT * FROM uploads", @source_db).tap do |result_set| + result_set.each { |row| queue << row unless output_existing_ids.include?(row["id"]) } + result_set.close + end + end + + status_queue = SizedQueue.new(QUEUE_SIZE) + status_thread = + Thread.new do + error_count = 0 + skipped_count = 0 + current_count = 0 + + while !(params = status_queue.pop).nil? + begin + if params.delete(:skipped) == true + skipped_count += 1 + elsif (error_message = params.delete(:error)) || params[:upload].nil? + error_count += 1 + puts "", "Failed to create upload: #{params[:id]} (#{error_message})", "" + end + + @output_db.execute(<<~SQL, params) + INSERT INTO uploads (id, upload, markdown, skip_reason) + VALUES (:id, :upload, :markdown, :skip_reason) + SQL + rescue StandardError => e + puts "", "Failed to insert upload: #{params[:id]} (#{e.message}))", "" + error_count += 1 + end + + current_count += 1 + error_count_text = error_count > 0 ? "#{error_count} errors".red : "0 errors" + + print "\r%7d / %7d (%s, %d skipped)" % + [current_count, max_count, error_count_text, skipped_count] + end + end + + (Etc.nprocessors * @settings[:thread_count_factor]).to_i.times do |index| + consumer_threads << Thread.new do + Thread.current.name = "worker-#{index}" + + store = Discourse.store + + while (row = queue.pop) + begin + data_file = nil + path = nil + + if row["data"].present? + data_file = Tempfile.new("discourse-upload", binmode: true) + data_file.write(row["data"]) + data_file.rewind + path = data_file.path + else + relative_path = row["relative_path"] + file_exists = false + + @root_paths.each do |root_path| + path = File.join(root_path, relative_path, row["filename"]) + break if (file_exists = File.exist?(path)) + + @settings[:path_replacements].each do |from, to| + path = File.join(root_path, relative_path.sub(from, to), row["filename"]) + break if (file_exists = File.exist?(path)) + end + end + + if !file_exists + status_queue << { + id: row["id"], + upload: nil, + skipped: true, + skip_reason: "file not found", + } + next + end + end + + retry_count = 0 + + loop do + error_message = nil + upload = + copy_to_tempfile(path) do |file| + begin + UploadCreator.new(file, row["filename"], type: row["type"]).create_for( + Discourse::SYSTEM_USER_ID, + ) + rescue StandardError => e + error_message = e.message + nil + end + end + + if (upload_okay = upload.present? && upload.persisted? && upload.errors.blank?) + upload_path = store.get_path_for_upload(upload) + + file_exists = + if store.external? + store.object_from_path(upload_path).exists? + else + File.exist?(File.join(store.public_dir, upload_path)) + end + + unless file_exists + upload.destroy + upload = nil + upload_okay = false + end + end + + if upload_okay + status_queue << { + id: row["id"], + upload: upload.attributes.to_json, + markdown: UploadMarkdown.new(upload).to_markdown, + skip_reason: nil, + } + break + elsif retry_count >= 3 + error_message ||= upload&.errors&.full_messages&.join(", ") || "unknown error" + status_queue << { + id: row["id"], + upload: nil, + markdown: nil, + error: "too many retries: #{error_message}", + skip_reason: "too many retries", + } + break + end + + retry_count += 1 + sleep 0.25 * retry_count + end + rescue StandardError => e + status_queue << { + id: row["id"], + upload: nil, + markdown: nil, + error: e.message, + skip_reason: "error", + } + ensure + data_file&.close! + end + end + end + end + + producer_thread.join + queue.close + consumer_threads.each(&:join) + status_queue.close + status_thread.join + end + + def fix_missing + queue = SizedQueue.new(QUEUE_SIZE) + consumer_threads = [] + + max_count = + @source_db.get_first_value("SELECT COUNT(*) FROM uploads WHERE upload IS NOT NULL") + + producer_thread = + Thread.new do + query( + "SELECT id, upload FROM uploads WHERE upload IS NOT NULL ORDER BY rowid DESC", + @source_db, + ).tap do |result_set| + result_set.each { |row| queue << row } + result_set.close + end + end + + status_queue = SizedQueue.new(QUEUE_SIZE) + status_thread = + Thread.new do + error_count = 0 + current_count = 0 + missing_count = 0 + + while !(result = status_queue.pop).nil? + current_count += 1 + + case result[:status] + when :ok + # ignore + when :error + error_count += 1 + puts "Error in #{result[:id]}" + when :missing + missing_count += 1 + puts "Missing #{result[:id]}" + + @output_db.execute("DELETE FROM uploads WHERE id = ?", result[:id]) + Upload.delete_by(id: result[:upload_id]) + end + + error_count_text = error_count > 0 ? "#{error_count} errors".red : "0 errors" + + print "\r%7d / %7d (%s, %s missing)" % + [current_count, max_count, error_count_text, missing_count] + end + end + + store = Discourse.store + + (Etc.nprocessors * @settings[:thread_count_factor] * 2).to_i.times do |index| + consumer_threads << Thread.new do + Thread.current.name = "worker-#{index}" + fake_upload = OpenStruct.new(url: "") + while (row = queue.pop) + begin + upload = JSON.parse(row["upload"]) + fake_upload.url = upload["url"] + path = store.get_path_for_upload(fake_upload) + + file_exists = + if store.external? + store.object_from_path(path).exists? + else + File.exist?(File.join(store.public_dir, path)) + end + + if file_exists + status_queue << { id: row["id"], upload_id: upload["id"], status: :ok } + else + status_queue << { id: row["id"], upload_id: upload["id"], status: :missing } + end + rescue StandardError => e + puts e.message + status_queue << { id: row["id"], upload_id: upload["id"], status: :error } + end + end + end + end + + producer_thread.join + queue.close + consumer_threads.each(&:join) + status_queue.close + status_thread.join + end + + def create_optimized_images + init_threads = [] + optimized_upload_ids = Set.new + post_upload_ids = Set.new + avatar_upload_ids = Set.new + max_count = 0 + + init_threads << Thread.new do + query("SELECT id FROM optimized_images", @output_db).tap do |result_set| + result_set.each { |row| optimized_upload_ids << row["id"] } + result_set.close + end + end + + init_threads << Thread.new do + sql = <<~SQL + SELECT upload_ids + FROM posts + WHERE upload_ids IS NOT NULL + SQL + query(sql, @source_db).tap do |result_set| + result_set.each do |row| + JSON.parse(row["upload_ids"]).each { |id| post_upload_ids << id } + end + result_set.close + end + end + + init_threads << Thread.new do + sql = <<~SQL + SELECT avatar_upload_id + FROM users + WHERE avatar_upload_id IS NOT NULL + SQL + query(sql, @source_db).tap do |result_set| + result_set.each { |row| avatar_upload_ids << row["avatar_upload_id"] } + result_set.close + end + end + + init_threads << Thread.new do + max_count = + @output_db.get_first_value("SELECT COUNT(*) FROM uploads WHERE upload IS NOT NULL") + end + + init_threads.each(&:join) + + status_queue = SizedQueue.new(QUEUE_SIZE) + status_thread = + Thread.new do + error_count = 0 + current_count = 0 + skipped_count = 0 + + while !(params = status_queue.pop).nil? + current_count += 1 + + case params.delete(:status) + when :ok + @output_db.execute(<<~SQL, params) + INSERT INTO optimized_images (id, optimized_images) + VALUES (:id, :optimized_images) + SQL + when :error + error_count += 1 + when :skipped + skipped_count += 1 + end + + error_count_text = error_count > 0 ? "#{error_count} errors".red : "0 errors" + + print "\r%7d / %7d (%s, %d skipped)" % + [current_count, max_count, error_count_text, skipped_count] + end + end + + queue = SizedQueue.new(QUEUE_SIZE) + consumer_threads = [] + + producer_thread = + Thread.new do + sql = <<~SQL + SELECT id AS upload_id, upload ->> 'sha1' AS upload_sha1, markdown + FROM uploads + WHERE upload IS NOT NULL + ORDER BY rowid + SQL + + query(sql, @output_db).tap do |result_set| + result_set.each do |row| + upload_id = row["upload_id"] + + if optimized_upload_ids.include?(upload_id) || !row["markdown"].start_with?("![") + status_queue << { id: row["upload_id"], status: :skipped } + next + end + + if post_upload_ids.include?(upload_id) + row["type"] = "post" + queue << row + elsif avatar_upload_ids.include?(upload_id) + row["type"] = "avatar" + queue << row + else + status_queue << { id: row["upload_id"], status: :skipped } + end + end + result_set.close + end + end + + avatar_sizes = Discourse.avatar_sizes + store = Discourse.store + remote_factor = store.external? ? 2 : 1 + + Jobs.run_immediately! + + (Etc.nprocessors * @settings[:thread_count_factor] * remote_factor).to_i.times do |index| + consumer_threads << Thread.new do + Thread.current.name = "worker-#{index}" + + post = + PostCreator.new( + Discourse.system_user, + raw: "Topic created by uploads_importer", + acting_user: Discourse.system_user, + skip_validations: true, + title: "Topic created by uploads_importer - #{SecureRandom.hex}", + archetype: Archetype.default, + category: Category.last.id, + ).create! + + while (row = queue.pop) + retry_count = 0 + + loop do + upload = Upload.find_by(sha1: row["upload_sha1"]) + + optimized_images = + begin + case row["type"] + when "post" + post.update_columns(baked_at: nil, cooked: "", raw: row["markdown"]) + post.reload + post.rebake! + OptimizedImage.where(upload_id: upload.id).to_a + when "avatar" + avatar_sizes.map { |size| OptimizedImage.create_for(upload, size, size) } + end + rescue StandardError => e + puts e.message + puts e.stacktrace + nil + end + + begin + if optimized_images.present? + optimized_images.map! do |optimized_image| + next unless optimized_image.present? + optimized_image_path = store.get_path_for_optimized_image(optimized_image) + + file_exists = + if store.external? + store.object_from_path(optimized_image_path).exists? + else + File.exist?(File.join(store.public_dir, optimized_image_path)) + end + + unless file_exists + optimized_image.destroy + optimized_image = nil + end + + optimized_image + end + end + rescue StandardError + optimized_images = nil + end + + optimized_images_okay = + !optimized_images.nil? && optimized_images.all?(&:present?) && + optimized_images.all?(&:persisted?) && + optimized_images.all? { |o| o.errors.blank? } + + if optimized_images_okay + status_queue << { + id: row["upload_id"], + optimized_images: optimized_images.presence&.to_json, + status: :ok, + } + break + elsif retry_count >= 3 + status_queue << { id: row["upload_id"], status: :error } + break + end + + retry_count += 1 + sleep 0.25 * retry_count + end + end + end + end + + producer_thread.join + queue.close + consumer_threads.each(&:join) + status_queue.close + status_thread.join + end + + private + + def create_connection(path) + sqlite = SQLite3::Database.new(path, results_as_hash: true) + sqlite.busy_timeout = 60_000 # 60 seconds + sqlite.journal_mode = "WAL" + sqlite.synchronous = "off" + sqlite + end + + def query(sql, db) + db.prepare(sql).execute + end + + def initialize_output_db + @statement_counter = 0 + + @output_db.execute(<<~SQL) + CREATE TABLE IF NOT EXISTS uploads ( + id TEXT PRIMARY KEY NOT NULL, + upload JSON_TEXT, + markdown TEXT, + skip_reason TEXT + ) + SQL + + @output_db.execute(<<~SQL) + CREATE TABLE IF NOT EXISTS optimized_images ( + id TEXT PRIMARY KEY NOT NULL, + optimized_images JSON_TEXT + ) + SQL + end + + def insert(sql, bind_vars = []) + @output_db.transaction if @statement_counter == 0 + @output_db.execute(sql, bind_vars) + + if (@statement_counter += 1) > TRANSACTION_SIZE + @output_db.commit + @statement_counter = 0 + end + end + + def close + @source_db.close if @source_db + + if @output_db + @output_db.commit if @output_db.transaction_active? + @output_db.close + end + end + + def copy_to_tempfile(source_path) + extension = File.extname(source_path) + + Tempfile.open(["discourse-upload", extension]) do |tmpfile| + File.open(source_path, "rb") { |source_stream| IO.copy_stream(source_stream, tmpfile) } + tmpfile.rewind + yield(tmpfile) + end + end + + def configure_site_settings + settings = @settings[:site_settings] + + SiteSetting.clean_up_uploads = false + SiteSetting.authorized_extensions = settings[:authorized_extensions] + SiteSetting.max_attachment_size_kb = settings[:max_attachment_size_kb] + SiteSetting.max_image_size_kb = settings[:max_image_size_kb] + + if settings[:enable_s3_uploads] + SiteSetting.s3_access_key_id = settings[:s3_access_key_id] + SiteSetting.s3_secret_access_key = settings[:s3_secret_access_key] + SiteSetting.s3_upload_bucket = settings[:s3_upload_bucket] + SiteSetting.s3_region = settings[:s3_region] + SiteSetting.s3_cdn_url = settings[:s3_cdn_url] + SiteSetting.enable_s3_uploads = true + + raise "Failed to enable S3 uploads" if SiteSetting.enable_s3_uploads != true + + Tempfile.open("discourse-s3-test") do |tmpfile| + tmpfile.write("test") + tmpfile.rewind + + upload = + UploadCreator.new(tmpfile, "discourse-s3-test.txt").create_for( + Discourse::SYSTEM_USER_ID, + ) + + unless upload.present? && upload.persisted? && upload.errors.blank? && + upload.url.start_with?("//") + raise "Failed to upload to S3" + end + + upload.destroy + end + end + end + end +end + +# bundle exec ruby script/bulk_import/uploads_importer.rb /path/to/uploads_importer.yml +BulkImport::UploadsImporter.new(ARGV.first).run diff --git a/script/bulk_import/uploads_importer.yml b/script/bulk_import/uploads_importer.yml new file mode 100644 index 00000000000..18070ede8ed --- /dev/null +++ b/script/bulk_import/uploads_importer.yml @@ -0,0 +1,43 @@ +source_db_path: "/path/to/your/db.sqlite3" +output_db_path: "/path/to/your/uploads.sqlite3" + +root_paths: + - "/path/to/your/files" + - "/path/to/more/files" + +# The number of threads to use for processing uploads is calculated as: +# thread_count = [number of cores] * [thread_count_factor] +# The thread count will be doubled if uploads are stored on S3 because there's a higher latency. +thread_count_factor: 1.5 + +# Delete uploads from the output database that are not found in the source database. +delete_surplus_uploads: false + +# Delete uploads from the output database that do not have a Discourse upload record. +delete_missing_uploads: false + +# Check if files are missing in the upload store and update the database accordingly. +# Set to false and re-run the script afterwards if you want to create new uploads for missing files. +fix_missing: false + +# Create optimized images for post uploads and avatars. +create_optimized_images: false + +site_settings: + authorized_extensions: "*" + max_attachment_size_kb: 102_400 + max_image_size_kb: 102_400 + + enable_s3_uploads: true + s3_upload_bucket: "your-bucket-name" + s3_region: "your-region" + s3_access_key_id: "your-access-key-id" + s3_secret_access_key: "your-secret-access-key" + s3_cdn_url: "https://your-cdn-url.com" + +# Sometimes a file can be found at one of many locations. Here's a list of transformations that can +# be applied to the path to try and find the file. The first transformation that results in a file +# being found will be used. +path_replacements: +# - ["/foo/", "/bar"] +# - ["/foo/", "/bar/baz/"]