DEV: First pass at modularizing uploads processing
This change mostly splits up various parts of the existing implementation with a lot of duplications. The idea is more to test out a potential structure for making these tasks granular and composable.
This commit is contained in:
parent
12ee333f3f
commit
b5c89fb114
|
@ -2,3 +2,5 @@
|
|||
|
||||
tmp/*
|
||||
Gemfile.lock
|
||||
|
||||
/config/process_uploads.yml
|
||||
|
|
|
@ -1,688 +1,7 @@
|
|||
#!/usr/bin/env ruby
|
||||
# frozen_string_literal: true
|
||||
|
||||
require "etc"
|
||||
require_relative "../lib/migrations"
|
||||
require_relative "../lib/uploads/cli"
|
||||
|
||||
module Migrations
|
||||
load_rails_environment
|
||||
|
||||
load_gemfiles("common")
|
||||
configure_zeitwerk("lib/common")
|
||||
|
||||
class ProcessUploads
|
||||
TRANSACTION_SIZE = 1000
|
||||
QUEUE_SIZE = 1000
|
||||
|
||||
def initialize(settings_path)
|
||||
@settings = YAML.load_file(settings_path, symbolize_names: true)
|
||||
@settings[:path_replacements] ||= []
|
||||
|
||||
@root_paths = @settings[:root_paths]
|
||||
@output_db = create_connection(@settings[:output_db_path])
|
||||
|
||||
initialize_output_db
|
||||
configure_site_settings
|
||||
end
|
||||
|
||||
def run
|
||||
# disable logging for EXIFR which is used by ImageOptim
|
||||
EXIFR.logger = Logger.new(nil)
|
||||
|
||||
if @settings[:fix_missing]
|
||||
@source_db = create_connection(@settings[:output_db_path])
|
||||
|
||||
puts "Fixing missing uploads..."
|
||||
fix_missing
|
||||
else
|
||||
@source_db = create_connection(@settings[:source_db_path])
|
||||
|
||||
puts "Uploading uploads..."
|
||||
upload_files
|
||||
|
||||
puts "", "Creating optimized images..."
|
||||
create_optimized_images if @settings[:create_optimized_images]
|
||||
end
|
||||
puts ""
|
||||
ensure
|
||||
close
|
||||
end
|
||||
|
||||
def upload_files
|
||||
queue = SizedQueue.new(QUEUE_SIZE)
|
||||
consumer_threads = []
|
||||
|
||||
if @settings[:delete_missing_uploads]
|
||||
puts "Deleting missing uploads from output database..."
|
||||
@output_db.execute(<<~SQL)
|
||||
DELETE FROM uploads
|
||||
WHERE upload IS NULL
|
||||
SQL
|
||||
end
|
||||
|
||||
output_existing_ids = Set.new
|
||||
query("SELECT id FROM uploads", @output_db).tap do |result_set|
|
||||
result_set.each { |row| output_existing_ids << row["id"] }
|
||||
result_set.close
|
||||
end
|
||||
|
||||
source_existing_ids = Set.new
|
||||
query("SELECT id FROM uploads", @source_db).tap do |result_set|
|
||||
result_set.each { |row| source_existing_ids << row["id"] }
|
||||
result_set.close
|
||||
end
|
||||
|
||||
if (surplus_upload_ids = output_existing_ids - source_existing_ids).any?
|
||||
if @settings[:delete_surplus_uploads]
|
||||
puts "Deleting #{surplus_upload_ids.size} uploads from output database..."
|
||||
|
||||
surplus_upload_ids.each_slice(TRANSACTION_SIZE) do |ids|
|
||||
placeholders = (["?"] * ids.size).join(",")
|
||||
@output_db.execute(<<~SQL, ids)
|
||||
DELETE FROM uploads
|
||||
WHERE id IN (#{placeholders})
|
||||
SQL
|
||||
end
|
||||
|
||||
output_existing_ids -= surplus_upload_ids
|
||||
else
|
||||
puts "Found #{surplus_upload_ids.size} surplus uploads in output database. " \
|
||||
"Run with `delete_surplus_uploads: true` to delete them."
|
||||
end
|
||||
|
||||
surplus_upload_ids = nil
|
||||
end
|
||||
|
||||
max_count = (source_existing_ids - output_existing_ids).size
|
||||
source_existing_ids = nil
|
||||
puts "Found #{output_existing_ids.size} existing uploads. #{max_count} are missing."
|
||||
|
||||
producer_thread =
|
||||
Thread.new do
|
||||
query("SELECT * FROM uploads", @source_db).tap do |result_set|
|
||||
result_set.each { |row| queue << row unless output_existing_ids.include?(row["id"]) }
|
||||
result_set.close
|
||||
end
|
||||
end
|
||||
|
||||
status_queue = SizedQueue.new(QUEUE_SIZE)
|
||||
status_thread =
|
||||
Thread.new do
|
||||
error_count = 0
|
||||
skipped_count = 0
|
||||
current_count = 0
|
||||
|
||||
while !(params = status_queue.pop).nil?
|
||||
begin
|
||||
if params.delete(:skipped) == true
|
||||
skipped_count += 1
|
||||
elsif (error_message = params.delete(:error)) || params[:upload].nil?
|
||||
error_count += 1
|
||||
puts "", "Failed to create upload: #{params[:id]} (#{error_message})", ""
|
||||
end
|
||||
|
||||
@output_db.execute(<<~SQL, params)
|
||||
INSERT INTO uploads (id, upload, markdown, skip_reason)
|
||||
VALUES (:id, :upload, :markdown, :skip_reason)
|
||||
SQL
|
||||
rescue StandardError => e
|
||||
puts "", "Failed to insert upload: #{params[:id]} (#{e.message}))", ""
|
||||
error_count += 1
|
||||
end
|
||||
|
||||
current_count += 1
|
||||
error_count_text = error_count > 0 ? "#{error_count} errors".red : "0 errors"
|
||||
|
||||
print "\r%7d / %7d (%s, %d skipped)" %
|
||||
[current_count, max_count, error_count_text, skipped_count]
|
||||
end
|
||||
end
|
||||
|
||||
(Etc.nprocessors * @settings[:thread_count_factor]).to_i.times do |index|
|
||||
consumer_threads << Thread.new do
|
||||
Thread.current.name = "worker-#{index}"
|
||||
|
||||
store = Discourse.store
|
||||
|
||||
while (row = queue.pop)
|
||||
begin
|
||||
data_file = nil
|
||||
path = nil
|
||||
|
||||
if row["data"].present?
|
||||
data_file = Tempfile.new("discourse-upload", binmode: true)
|
||||
data_file.write(row["data"])
|
||||
data_file.rewind
|
||||
path = data_file.path
|
||||
else
|
||||
relative_path = row["relative_path"]
|
||||
file_exists = false
|
||||
|
||||
@root_paths.each do |root_path|
|
||||
path = File.join(root_path, relative_path, row["filename"])
|
||||
break if (file_exists = File.exist?(path))
|
||||
|
||||
@settings[:path_replacements].each do |from, to|
|
||||
path = File.join(root_path, relative_path.sub(from, to), row["filename"])
|
||||
break if (file_exists = File.exist?(path))
|
||||
end
|
||||
end
|
||||
|
||||
if !file_exists
|
||||
status_queue << {
|
||||
id: row["id"],
|
||||
upload: nil,
|
||||
skipped: true,
|
||||
skip_reason: "file not found",
|
||||
}
|
||||
next
|
||||
end
|
||||
end
|
||||
|
||||
retry_count = 0
|
||||
|
||||
loop do
|
||||
error_message = nil
|
||||
upload =
|
||||
copy_to_tempfile(path) do |file|
|
||||
begin
|
||||
UploadCreator.new(
|
||||
file,
|
||||
row["display_filename"] || row["filename"],
|
||||
type: row["type"],
|
||||
).create_for(Discourse::SYSTEM_USER_ID)
|
||||
rescue StandardError => e
|
||||
error_message = e.message
|
||||
nil
|
||||
end
|
||||
end
|
||||
|
||||
if (upload_okay = upload.present? && upload.persisted? && upload.errors.blank?)
|
||||
upload_path = add_multisite_prefix(store.get_path_for_upload(upload))
|
||||
|
||||
file_exists =
|
||||
if store.external?
|
||||
store.object_from_path(upload_path).exists?
|
||||
else
|
||||
File.exist?(File.join(store.public_dir, upload_path))
|
||||
end
|
||||
|
||||
unless file_exists
|
||||
upload.destroy
|
||||
upload = nil
|
||||
upload_okay = false
|
||||
end
|
||||
end
|
||||
|
||||
if upload_okay
|
||||
status_queue << {
|
||||
id: row["id"],
|
||||
upload: upload.attributes.to_json,
|
||||
markdown: UploadMarkdown.new(upload).to_markdown,
|
||||
skip_reason: nil,
|
||||
}
|
||||
break
|
||||
elsif retry_count >= 3
|
||||
error_message ||= upload&.errors&.full_messages&.join(", ") || "unknown error"
|
||||
status_queue << {
|
||||
id: row["id"],
|
||||
upload: nil,
|
||||
markdown: nil,
|
||||
error: "too many retries: #{error_message}",
|
||||
skip_reason: "too many retries",
|
||||
}
|
||||
break
|
||||
end
|
||||
|
||||
retry_count += 1
|
||||
sleep 0.25 * retry_count
|
||||
end
|
||||
rescue StandardError => e
|
||||
status_queue << {
|
||||
id: row["id"],
|
||||
upload: nil,
|
||||
markdown: nil,
|
||||
error: e.message,
|
||||
skip_reason: "error",
|
||||
}
|
||||
ensure
|
||||
data_file&.close!
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
producer_thread.join
|
||||
queue.close
|
||||
consumer_threads.each(&:join)
|
||||
status_queue.close
|
||||
status_thread.join
|
||||
end
|
||||
|
||||
def fix_missing
|
||||
queue = SizedQueue.new(QUEUE_SIZE)
|
||||
consumer_threads = []
|
||||
|
||||
max_count =
|
||||
@source_db.get_first_value("SELECT COUNT(*) FROM uploads WHERE upload IS NOT NULL")
|
||||
|
||||
producer_thread =
|
||||
Thread.new do
|
||||
query(
|
||||
"SELECT id, upload FROM uploads WHERE upload IS NOT NULL ORDER BY rowid DESC",
|
||||
@source_db,
|
||||
).tap do |result_set|
|
||||
result_set.each { |row| queue << row }
|
||||
result_set.close
|
||||
end
|
||||
end
|
||||
|
||||
status_queue = SizedQueue.new(QUEUE_SIZE)
|
||||
status_thread =
|
||||
Thread.new do
|
||||
error_count = 0
|
||||
current_count = 0
|
||||
missing_count = 0
|
||||
|
||||
while !(result = status_queue.pop).nil?
|
||||
current_count += 1
|
||||
|
||||
case result[:status]
|
||||
when :ok
|
||||
# ignore
|
||||
when :error
|
||||
error_count += 1
|
||||
puts "Error in #{result[:id]}"
|
||||
when :missing
|
||||
missing_count += 1
|
||||
puts "Missing #{result[:id]}"
|
||||
|
||||
@output_db.execute("DELETE FROM uploads WHERE id = ?", result[:id])
|
||||
Upload.delete_by(id: result[:upload_id])
|
||||
end
|
||||
|
||||
error_count_text = error_count > 0 ? "#{error_count} errors".red : "0 errors"
|
||||
|
||||
print "\r%7d / %7d (%s, %s missing)" %
|
||||
[current_count, max_count, error_count_text, missing_count]
|
||||
end
|
||||
end
|
||||
|
||||
store = Discourse.store
|
||||
|
||||
(Etc.nprocessors * @settings[:thread_count_factor] * 2).to_i.times do |index|
|
||||
consumer_threads << Thread.new do
|
||||
Thread.current.name = "worker-#{index}"
|
||||
fake_upload = OpenStruct.new(url: "")
|
||||
while (row = queue.pop)
|
||||
begin
|
||||
upload = JSON.parse(row["upload"])
|
||||
fake_upload.url = upload["url"]
|
||||
path = add_multisite_prefix(store.get_path_for_upload(fake_upload))
|
||||
|
||||
file_exists =
|
||||
if store.external?
|
||||
store.object_from_path(path).exists?
|
||||
else
|
||||
File.exist?(File.join(store.public_dir, path))
|
||||
end
|
||||
|
||||
if file_exists
|
||||
status_queue << { id: row["id"], upload_id: upload["id"], status: :ok }
|
||||
else
|
||||
status_queue << { id: row["id"], upload_id: upload["id"], status: :missing }
|
||||
end
|
||||
rescue StandardError => e
|
||||
puts e.message
|
||||
status_queue << { id: row["id"], upload_id: upload["id"], status: :error }
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
producer_thread.join
|
||||
queue.close
|
||||
consumer_threads.each(&:join)
|
||||
status_queue.close
|
||||
status_thread.join
|
||||
end
|
||||
|
||||
def create_optimized_images
|
||||
init_threads = []
|
||||
optimized_upload_ids = Set.new
|
||||
post_upload_ids = Set.new
|
||||
avatar_upload_ids = Set.new
|
||||
max_count = 0
|
||||
|
||||
# allow more than 1 thread to optimized images at the same time
|
||||
OptimizedImage.lock_per_machine = false
|
||||
|
||||
init_threads << Thread.new do
|
||||
query("SELECT id FROM optimized_images", @output_db).tap do |result_set|
|
||||
result_set.each { |row| optimized_upload_ids << row["id"] }
|
||||
result_set.close
|
||||
end
|
||||
end
|
||||
|
||||
init_threads << Thread.new do
|
||||
sql = <<~SQL
|
||||
SELECT upload_ids
|
||||
FROM posts
|
||||
WHERE upload_ids IS NOT NULL
|
||||
SQL
|
||||
query(sql, @source_db).tap do |result_set|
|
||||
result_set.each do |row|
|
||||
JSON.parse(row["upload_ids"]).each { |id| post_upload_ids << id }
|
||||
end
|
||||
result_set.close
|
||||
end
|
||||
end
|
||||
|
||||
init_threads << Thread.new do
|
||||
sql = <<~SQL
|
||||
SELECT avatar_upload_id
|
||||
FROM users
|
||||
WHERE avatar_upload_id IS NOT NULL
|
||||
SQL
|
||||
query(sql, @source_db).tap do |result_set|
|
||||
result_set.each { |row| avatar_upload_ids << row["avatar_upload_id"] }
|
||||
result_set.close
|
||||
end
|
||||
end
|
||||
|
||||
init_threads << Thread.new do
|
||||
max_count =
|
||||
@output_db.get_first_value("SELECT COUNT(*) FROM uploads WHERE upload IS NOT NULL")
|
||||
end
|
||||
|
||||
init_threads.each(&:join)
|
||||
|
||||
status_queue = SizedQueue.new(QUEUE_SIZE)
|
||||
status_thread =
|
||||
Thread.new do
|
||||
error_count = 0
|
||||
current_count = 0
|
||||
skipped_count = 0
|
||||
|
||||
while !(params = status_queue.pop).nil?
|
||||
current_count += 1
|
||||
|
||||
case params.delete(:status)
|
||||
when :ok
|
||||
@output_db.execute(<<~SQL, params)
|
||||
INSERT INTO optimized_images (id, optimized_images)
|
||||
VALUES (:id, :optimized_images)
|
||||
SQL
|
||||
when :error
|
||||
error_count += 1
|
||||
when :skipped
|
||||
skipped_count += 1
|
||||
end
|
||||
|
||||
error_count_text = error_count > 0 ? "#{error_count} errors".red : "0 errors"
|
||||
|
||||
print "\r%7d / %7d (%s, %d skipped)" %
|
||||
[current_count, max_count, error_count_text, skipped_count]
|
||||
end
|
||||
end
|
||||
|
||||
queue = SizedQueue.new(QUEUE_SIZE)
|
||||
consumer_threads = []
|
||||
|
||||
producer_thread =
|
||||
Thread.new do
|
||||
sql = <<~SQL
|
||||
SELECT id AS upload_id, upload ->> 'sha1' AS upload_sha1, markdown
|
||||
FROM uploads
|
||||
WHERE upload IS NOT NULL
|
||||
ORDER BY rowid
|
||||
SQL
|
||||
|
||||
query(sql, @output_db).tap do |result_set|
|
||||
result_set.each do |row|
|
||||
upload_id = row["upload_id"]
|
||||
|
||||
if optimized_upload_ids.include?(upload_id) || !row["markdown"].start_with?("![")
|
||||
status_queue << { id: row["upload_id"], status: :skipped }
|
||||
next
|
||||
end
|
||||
|
||||
if post_upload_ids.include?(upload_id)
|
||||
row["type"] = "post"
|
||||
queue << row
|
||||
elsif avatar_upload_ids.include?(upload_id)
|
||||
row["type"] = "avatar"
|
||||
queue << row
|
||||
else
|
||||
status_queue << { id: row["upload_id"], status: :skipped }
|
||||
end
|
||||
end
|
||||
result_set.close
|
||||
end
|
||||
end
|
||||
|
||||
avatar_sizes = Discourse.avatar_sizes
|
||||
store = Discourse.store
|
||||
remote_factor = store.external? ? 2 : 1
|
||||
|
||||
Jobs.run_immediately!
|
||||
|
||||
(Etc.nprocessors * @settings[:thread_count_factor] * remote_factor).to_i.times do |index|
|
||||
consumer_threads << Thread.new do
|
||||
Thread.current.name = "worker-#{index}"
|
||||
|
||||
post =
|
||||
PostCreator.new(
|
||||
Discourse.system_user,
|
||||
raw: "Topic created by uploads_importer",
|
||||
acting_user: Discourse.system_user,
|
||||
skip_validations: true,
|
||||
title: "Topic created by uploads_importer - #{SecureRandom.hex}",
|
||||
archetype: Archetype.default,
|
||||
category: Category.last.id,
|
||||
).create!
|
||||
|
||||
while (row = queue.pop)
|
||||
retry_count = 0
|
||||
|
||||
loop do
|
||||
upload = Upload.find_by(sha1: row["upload_sha1"])
|
||||
|
||||
optimized_images =
|
||||
begin
|
||||
case row["type"]
|
||||
when "post"
|
||||
post.update_columns(baked_at: nil, cooked: "", raw: row["markdown"])
|
||||
post.reload
|
||||
post.rebake!
|
||||
OptimizedImage.where(upload_id: upload.id).to_a
|
||||
when "avatar"
|
||||
avatar_sizes.map { |size| OptimizedImage.create_for(upload, size, size) }
|
||||
end
|
||||
rescue StandardError => e
|
||||
puts e.message
|
||||
puts e.stacktrace
|
||||
nil
|
||||
end
|
||||
|
||||
begin
|
||||
if optimized_images.present?
|
||||
optimized_images.map! do |optimized_image|
|
||||
next unless optimized_image.present?
|
||||
optimized_image_path =
|
||||
add_multisite_prefix(store.get_path_for_optimized_image(optimized_image))
|
||||
|
||||
file_exists =
|
||||
if store.external?
|
||||
store.object_from_path(optimized_image_path).exists?
|
||||
else
|
||||
File.exist?(File.join(store.public_dir, optimized_image_path))
|
||||
end
|
||||
|
||||
unless file_exists
|
||||
optimized_image.destroy
|
||||
optimized_image = nil
|
||||
end
|
||||
|
||||
optimized_image
|
||||
end
|
||||
end
|
||||
rescue StandardError
|
||||
optimized_images = nil
|
||||
end
|
||||
|
||||
optimized_images_okay =
|
||||
!optimized_images.nil? && optimized_images.all?(&:present?) &&
|
||||
optimized_images.all?(&:persisted?) &&
|
||||
optimized_images.all? { |o| o.errors.blank? }
|
||||
|
||||
if optimized_images_okay
|
||||
status_queue << {
|
||||
id: row["upload_id"],
|
||||
optimized_images: optimized_images.presence&.to_json,
|
||||
status: :ok,
|
||||
}
|
||||
break
|
||||
elsif retry_count >= 3
|
||||
status_queue << { id: row["upload_id"], status: :error }
|
||||
break
|
||||
end
|
||||
|
||||
retry_count += 1
|
||||
sleep 0.25 * retry_count
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
producer_thread.join
|
||||
queue.close
|
||||
consumer_threads.each(&:join)
|
||||
status_queue.close
|
||||
status_thread.join
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def create_connection(path)
|
||||
::Migrations::IntermediateDatabase.create_connection(path: path)
|
||||
end
|
||||
|
||||
def query(sql, db)
|
||||
db.prepare(sql).execute
|
||||
end
|
||||
|
||||
def initialize_output_db
|
||||
@statement_counter = 0
|
||||
|
||||
@output_db.execute(<<~SQL)
|
||||
CREATE TABLE IF NOT EXISTS uploads (
|
||||
id TEXT PRIMARY KEY NOT NULL,
|
||||
upload JSON_TEXT,
|
||||
markdown TEXT,
|
||||
skip_reason TEXT
|
||||
)
|
||||
SQL
|
||||
|
||||
@output_db.execute(<<~SQL)
|
||||
CREATE TABLE IF NOT EXISTS optimized_images (
|
||||
id TEXT PRIMARY KEY NOT NULL,
|
||||
optimized_images JSON_TEXT
|
||||
)
|
||||
SQL
|
||||
end
|
||||
|
||||
def insert(sql, bind_vars = [])
|
||||
@output_db.transaction if @statement_counter == 0
|
||||
@output_db.execute(sql, bind_vars)
|
||||
|
||||
if (@statement_counter += 1) > TRANSACTION_SIZE
|
||||
@output_db.commit
|
||||
@statement_counter = 0
|
||||
end
|
||||
end
|
||||
|
||||
def close
|
||||
@source_db.close if @source_db
|
||||
|
||||
if @output_db
|
||||
@output_db.commit if @output_db.transaction_active?
|
||||
@output_db.close
|
||||
end
|
||||
end
|
||||
|
||||
def copy_to_tempfile(source_path)
|
||||
extension = File.extname(source_path)
|
||||
|
||||
Tempfile.open(["discourse-upload", extension]) do |tmpfile|
|
||||
File.open(source_path, "rb") { |source_stream| IO.copy_stream(source_stream, tmpfile) }
|
||||
tmpfile.rewind
|
||||
yield(tmpfile)
|
||||
end
|
||||
end
|
||||
|
||||
def configure_site_settings
|
||||
settings = @settings[:site_settings]
|
||||
|
||||
SiteSetting.clean_up_uploads = false
|
||||
SiteSetting.authorized_extensions = settings[:authorized_extensions]
|
||||
SiteSetting.max_attachment_size_kb = settings[:max_attachment_size_kb]
|
||||
SiteSetting.max_image_size_kb = settings[:max_image_size_kb]
|
||||
|
||||
if settings[:multisite]
|
||||
# rubocop:disable Discourse/NoDirectMultisiteManipulation
|
||||
Rails.configuration.multisite = true
|
||||
# rubocop:enable Discourse/NoDirectMultisiteManipulation
|
||||
|
||||
RailsMultisite::ConnectionManagement.class_eval do
|
||||
def self.current_db_override=(value)
|
||||
@current_db_override = value
|
||||
end
|
||||
def self.current_db
|
||||
@current_db_override
|
||||
end
|
||||
end
|
||||
RailsMultisite::ConnectionManagement.current_db_override = settings[:multisite_db_name]
|
||||
end
|
||||
|
||||
if settings[:enable_s3_uploads]
|
||||
SiteSetting.s3_access_key_id = settings[:s3_access_key_id]
|
||||
SiteSetting.s3_secret_access_key = settings[:s3_secret_access_key]
|
||||
SiteSetting.s3_upload_bucket = settings[:s3_upload_bucket]
|
||||
SiteSetting.s3_region = settings[:s3_region]
|
||||
SiteSetting.s3_cdn_url = settings[:s3_cdn_url]
|
||||
SiteSetting.enable_s3_uploads = true
|
||||
|
||||
raise "Failed to enable S3 uploads" if SiteSetting.enable_s3_uploads != true
|
||||
|
||||
Tempfile.open("discourse-s3-test") do |tmpfile|
|
||||
tmpfile.write("test")
|
||||
tmpfile.rewind
|
||||
|
||||
upload =
|
||||
UploadCreator.new(tmpfile, "discourse-s3-test.txt").create_for(
|
||||
Discourse::SYSTEM_USER_ID,
|
||||
)
|
||||
|
||||
unless upload.present? && upload.persisted? && upload.errors.blank? &&
|
||||
upload.url.start_with?("//")
|
||||
raise "Failed to upload to S3"
|
||||
end
|
||||
|
||||
upload.destroy
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def add_multisite_prefix(path)
|
||||
if Rails.configuration.multisite
|
||||
File.join("uploads", RailsMultisite::ConnectionManagement.current_db, path)
|
||||
else
|
||||
path
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
# ./migrations/bin/process_uploads migrations/config/process_uploads.yml
|
||||
Migrations::ProcessUploads.new(ARGV.first).run
|
||||
# ./migrations/bin/process_uploads [--settings=migrations/config/process_uploads.yml]
|
||||
Migrations::Uploads::CLI.start(ARGV)
|
||||
|
|
|
@ -0,0 +1,26 @@
|
|||
# frozen_string_literal: true
|
||||
|
||||
require "etc"
|
||||
require "sqlite3"
|
||||
|
||||
module Migrations
|
||||
module Uploads
|
||||
class Base
|
||||
TRANSACTION_SIZE = 1000
|
||||
QUEUE_SIZE = 1000
|
||||
|
||||
# TODO: Use IntermediateDatabase instead
|
||||
def create_connection(path)
|
||||
sqlite = SQLite3::Database.new(path, results_as_hash: true)
|
||||
sqlite.busy_timeout = 60_000 # 60 seconds
|
||||
sqlite.journal_mode = "WAL"
|
||||
sqlite.synchronous = "off"
|
||||
sqlite
|
||||
end
|
||||
|
||||
def query(sql, db)
|
||||
db.prepare(sql).execute
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
|
@ -0,0 +1,67 @@
|
|||
# frozen_string_literal: true
|
||||
|
||||
require_relative "../migrations"
|
||||
require_relative "./settings"
|
||||
require_relative "./fixer"
|
||||
require_relative "./uploader"
|
||||
require_relative "./optimizer"
|
||||
|
||||
module Migrations
|
||||
load_rails_environment
|
||||
|
||||
load_gemfiles("common")
|
||||
configure_zeitwerk("lib/common")
|
||||
|
||||
module Uploads
|
||||
class CLI < Thor
|
||||
default_task :execute
|
||||
|
||||
class_option :settings,
|
||||
type: :string,
|
||||
aliases: "-s",
|
||||
default: "./migrations/config/process_uploads.yml",
|
||||
banner: "SETTINGS_FILE",
|
||||
desc: "Upload settings file"
|
||||
|
||||
def initialize(*args)
|
||||
super
|
||||
|
||||
EXIFR.logger = Logger.new(nil)
|
||||
@settings = Settings.from_file(options[:settings])
|
||||
end
|
||||
|
||||
def self.exit_on_failure?
|
||||
true
|
||||
end
|
||||
|
||||
desc "execute [--settings=SETTINGS_FILE]", "Process uploads"
|
||||
def execute
|
||||
return run_fixer! if @settings[:fix_missing]
|
||||
|
||||
Uploader.run!(@settings)
|
||||
|
||||
run_optimizer! if @settings[:create_optimized_images]
|
||||
end
|
||||
|
||||
desc "fix-missing [--settings=SETTINGS_FILE]", "Fix missing uploads"
|
||||
def fix_missing
|
||||
run_fixer!
|
||||
end
|
||||
|
||||
desc "optimize [--settings=SETTINGS_FILE]", "Optimize uploads"
|
||||
def optimize
|
||||
run_optimize!
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def run_fixer!
|
||||
Fixer.run!(@settings)
|
||||
end
|
||||
|
||||
def run_optimizer!
|
||||
Optimizer.run!(@settings)
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
|
@ -0,0 +1,110 @@
|
|||
# frozen_string_literal: true
|
||||
|
||||
require_relative "./base"
|
||||
|
||||
module Migrations
|
||||
module Uploads
|
||||
class Fixer < Base
|
||||
def initialize(settings)
|
||||
@settings = settings
|
||||
|
||||
@source_db = create_connection(settings[:output_db_path])
|
||||
end
|
||||
|
||||
def self.run!(settings)
|
||||
puts "Fixing missing uploads..."
|
||||
|
||||
new(settings).run!
|
||||
end
|
||||
|
||||
def run!
|
||||
queue = SizedQueue.new(QUEUE_SIZE)
|
||||
consumer_threads = []
|
||||
|
||||
max_count =
|
||||
@source_db.get_first_value("SELECT COUNT(*) FROM uploads WHERE upload IS NOT NULL")
|
||||
|
||||
binding
|
||||
producer_thread =
|
||||
Thread.new do
|
||||
query(
|
||||
"SELECT id, upload FROM uploads WHERE upload IS NOT NULL ORDER BY rowid DESC",
|
||||
@source_db,
|
||||
).tap do |result_set|
|
||||
result_set.each { |row| queue << row }
|
||||
result_set.close
|
||||
end
|
||||
end
|
||||
|
||||
status_queue = SizedQueue.new(QUEUE_SIZE)
|
||||
status_thread =
|
||||
Thread.new do
|
||||
error_count = 0
|
||||
current_count = 0
|
||||
missing_count = 0
|
||||
|
||||
while !(result = status_queue.pop).nil?
|
||||
current_count += 1
|
||||
|
||||
case result[:status]
|
||||
when :ok
|
||||
# ignore
|
||||
when :error
|
||||
error_count += 1
|
||||
puts "Error in #{result[:id]}"
|
||||
when :missing
|
||||
missing_count += 1
|
||||
puts "Missing #{result[:id]}"
|
||||
|
||||
@output_db.execute("DELETE FROM uploads WHERE id = ?", result[:id])
|
||||
Upload.delete_by(id: result[:upload_id])
|
||||
end
|
||||
|
||||
error_count_text = error_count > 0 ? "#{error_count} errors".red : "0 errors"
|
||||
|
||||
print "\r%7d / %7d (%s, %s missing)" %
|
||||
[current_count, max_count, error_count_text, missing_count]
|
||||
end
|
||||
end
|
||||
|
||||
store = Discourse.store
|
||||
|
||||
(Etc.nprocessors * @settings[:thread_count_factor] * 2).to_i.times do |index|
|
||||
consumer_threads << Thread.new do
|
||||
Thread.current.name = "worker-#{index}"
|
||||
fake_upload = OpenStruct.new(url: "")
|
||||
while (row = queue.pop)
|
||||
begin
|
||||
upload = JSON.parse(row["upload"])
|
||||
fake_upload.url = upload["url"]
|
||||
path = add_multisite_prefix(store.get_path_for_upload(fake_upload))
|
||||
|
||||
file_exists =
|
||||
if store.external?
|
||||
store.object_from_path(path).exists?
|
||||
else
|
||||
File.exist?(File.join(store.public_dir, path))
|
||||
end
|
||||
|
||||
if file_exists
|
||||
status_queue << { id: row["id"], upload_id: upload["id"], status: :ok }
|
||||
else
|
||||
status_queue << { id: row["id"], upload_id: upload["id"], status: :missing }
|
||||
end
|
||||
rescue StandardError => e
|
||||
puts e.message
|
||||
status_queue << { id: row["id"], upload_id: upload["id"], status: :error }
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
producer_thread.join
|
||||
queue.close
|
||||
consumer_threads.each(&:join)
|
||||
status_queue.close
|
||||
status_thread.join
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
|
@ -0,0 +1,237 @@
|
|||
# frozen_string_literal: true
|
||||
|
||||
require_relative "./base"
|
||||
|
||||
module Migrations
|
||||
module Uploads
|
||||
class Optimizer < Base
|
||||
def initialize(settings)
|
||||
@settings = settings
|
||||
|
||||
@source_db = create_connection(@settings[:source_db_path])
|
||||
@output_db = settings.output_db
|
||||
end
|
||||
|
||||
def self.run!(settings)
|
||||
puts "Creating optimized images..."
|
||||
|
||||
new(settings).run!
|
||||
end
|
||||
|
||||
def run!
|
||||
init_threads = []
|
||||
optimized_upload_ids = Set.new
|
||||
post_upload_ids = Set.new
|
||||
avatar_upload_ids = Set.new
|
||||
max_count = 0
|
||||
|
||||
# allow more than 1 thread to optimized images at the same time
|
||||
OptimizedImage.lock_per_machine = false
|
||||
|
||||
init_threads << Thread.new do
|
||||
query("SELECT id FROM optimized_images", @output_db).tap do |result_set|
|
||||
result_set.each { |row| optimized_upload_ids << row["id"] }
|
||||
result_set.close
|
||||
end
|
||||
end
|
||||
|
||||
init_threads << Thread.new do
|
||||
sql = <<~SQL
|
||||
SELECT upload_ids
|
||||
FROM posts
|
||||
WHERE upload_ids IS NOT NULL
|
||||
SQL
|
||||
query(sql, @source_db).tap do |result_set|
|
||||
result_set.each do |row|
|
||||
JSON.parse(row["upload_ids"]).each { |id| post_upload_ids << id }
|
||||
end
|
||||
result_set.close
|
||||
end
|
||||
end
|
||||
|
||||
init_threads << Thread.new do
|
||||
sql = <<~SQL
|
||||
SELECT avatar_upload_id
|
||||
FROM users
|
||||
WHERE avatar_upload_id IS NOT NULL
|
||||
SQL
|
||||
query(sql, @source_db).tap do |result_set|
|
||||
result_set.each { |row| avatar_upload_ids << row["avatar_upload_id"] }
|
||||
result_set.close
|
||||
end
|
||||
end
|
||||
|
||||
init_threads << Thread.new do
|
||||
max_count =
|
||||
@output_db.get_first_value("SELECT COUNT(*) FROM uploads WHERE upload IS NOT NULL")
|
||||
end
|
||||
|
||||
init_threads.each(&:join)
|
||||
|
||||
status_queue = SizedQueue.new(QUEUE_SIZE)
|
||||
status_thread =
|
||||
Thread.new do
|
||||
error_count = 0
|
||||
current_count = 0
|
||||
skipped_count = 0
|
||||
|
||||
while !(params = status_queue.pop).nil?
|
||||
current_count += 1
|
||||
|
||||
case params.delete(:status)
|
||||
when :ok
|
||||
@output_db.execute(<<~SQL, params)
|
||||
INSERT INTO optimized_images (id, optimized_images)
|
||||
VALUES (:id, :optimized_images)
|
||||
SQL
|
||||
when :error
|
||||
error_count += 1
|
||||
when :skipped
|
||||
skipped_count += 1
|
||||
end
|
||||
|
||||
error_count_text = error_count > 0 ? "#{error_count} errors".red : "0 errors"
|
||||
|
||||
print "\r%7d / %7d (%s, %d skipped)" %
|
||||
[current_count, max_count, error_count_text, skipped_count]
|
||||
end
|
||||
end
|
||||
|
||||
queue = SizedQueue.new(QUEUE_SIZE)
|
||||
consumer_threads = []
|
||||
|
||||
producer_thread =
|
||||
Thread.new do
|
||||
sql = <<~SQL
|
||||
SELECT id AS upload_id, upload ->> 'sha1' AS upload_sha1, markdown
|
||||
FROM uploads
|
||||
WHERE upload IS NOT NULL
|
||||
ORDER BY rowid
|
||||
SQL
|
||||
|
||||
query(sql, @output_db).tap do |result_set|
|
||||
result_set.each do |row|
|
||||
upload_id = row["upload_id"]
|
||||
|
||||
if optimized_upload_ids.include?(upload_id) || !row["markdown"].start_with?("![")
|
||||
status_queue << { id: row["upload_id"], status: :skipped }
|
||||
next
|
||||
end
|
||||
|
||||
if post_upload_ids.include?(upload_id)
|
||||
row["type"] = "post"
|
||||
queue << row
|
||||
elsif avatar_upload_ids.include?(upload_id)
|
||||
row["type"] = "avatar"
|
||||
queue << row
|
||||
else
|
||||
status_queue << { id: row["upload_id"], status: :skipped }
|
||||
end
|
||||
end
|
||||
result_set.close
|
||||
end
|
||||
end
|
||||
|
||||
avatar_sizes = Discourse.avatar_sizes
|
||||
store = Discourse.store
|
||||
remote_factor = store.external? ? 2 : 1
|
||||
|
||||
Jobs.run_immediately!
|
||||
|
||||
(Etc.nprocessors * @settings[:thread_count_factor] * remote_factor).to_i.times do |index|
|
||||
consumer_threads << Thread.new do
|
||||
Thread.current.name = "worker-#{index}"
|
||||
|
||||
post =
|
||||
PostCreator.new(
|
||||
Discourse.system_user,
|
||||
raw: "Topic created by uploads_importer",
|
||||
acting_user: Discourse.system_user,
|
||||
skip_validations: true,
|
||||
title: "Topic created by uploads_importer - #{SecureRandom.hex}",
|
||||
archetype: Archetype.default,
|
||||
category: Category.last.id,
|
||||
).create!
|
||||
|
||||
while (row = queue.pop)
|
||||
retry_count = 0
|
||||
|
||||
loop do
|
||||
upload = Upload.find_by(sha1: row["upload_sha1"])
|
||||
|
||||
optimized_images =
|
||||
begin
|
||||
case row["type"]
|
||||
when "post"
|
||||
post.update_columns(baked_at: nil, cooked: "", raw: row["markdown"])
|
||||
post.reload
|
||||
post.rebake!
|
||||
OptimizedImage.where(upload_id: upload.id).to_a
|
||||
when "avatar"
|
||||
avatar_sizes.map { |size| OptimizedImage.create_for(upload, size, size) }
|
||||
end
|
||||
rescue StandardError => e
|
||||
puts e.message
|
||||
puts e.stacktrace
|
||||
nil
|
||||
end
|
||||
|
||||
begin
|
||||
if optimized_images.present?
|
||||
optimized_images.map! do |optimized_image|
|
||||
next unless optimized_image.present?
|
||||
optimized_image_path =
|
||||
add_multisite_prefix(store.get_path_for_optimized_image(optimized_image))
|
||||
|
||||
file_exists =
|
||||
if store.external?
|
||||
store.object_from_path(optimized_image_path).exists?
|
||||
else
|
||||
File.exist?(File.join(store.public_dir, optimized_image_path))
|
||||
end
|
||||
|
||||
unless file_exists
|
||||
optimized_image.destroy
|
||||
optimized_image = nil
|
||||
end
|
||||
|
||||
optimized_image
|
||||
end
|
||||
end
|
||||
rescue StandardError
|
||||
optimized_images = nil
|
||||
end
|
||||
|
||||
optimized_images_okay =
|
||||
!optimized_images.nil? && optimized_images.all?(&:present?) &&
|
||||
optimized_images.all?(&:persisted?) &&
|
||||
optimized_images.all? { |o| o.errors.blank? }
|
||||
|
||||
if optimized_images_okay
|
||||
status_queue << {
|
||||
id: row["upload_id"],
|
||||
optimized_images: optimized_images.presence&.to_json,
|
||||
status: :ok,
|
||||
}
|
||||
break
|
||||
elsif retry_count >= 3
|
||||
status_queue << { id: row["upload_id"], status: :error }
|
||||
break
|
||||
end
|
||||
|
||||
retry_count += 1
|
||||
sleep 0.25 * retry_count
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
producer_thread.join
|
||||
queue.close
|
||||
consumer_threads.each(&:join)
|
||||
status_queue.close
|
||||
status_thread.join
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
|
@ -0,0 +1,116 @@
|
|||
# frozen_string_literal: true
|
||||
|
||||
require "sqlite3"
|
||||
|
||||
module Migrations
|
||||
module Uploads
|
||||
class Settings
|
||||
attr_reader :config, :output_db
|
||||
|
||||
def initialize(options)
|
||||
options[:path_replacements] ||= []
|
||||
|
||||
@root_paths = options[:root_paths]
|
||||
@output_db = create_connection(options[:output_db_path])
|
||||
@options = options
|
||||
|
||||
initialize_output_db
|
||||
configure_site_settings
|
||||
end
|
||||
|
||||
def self.from_file(path)
|
||||
new(YAML.load_file(path, symbolize_names: true))
|
||||
end
|
||||
|
||||
# TODO: compare against dynamically defining getter methods for
|
||||
# each top-level setting
|
||||
def [](key)
|
||||
@options[key]
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
# TODO: Use IntermediateDatabase instead
|
||||
def create_connection(path)
|
||||
sqlite = SQLite3::Database.new(path, results_as_hash: true)
|
||||
sqlite.busy_timeout = 60_000 # 60 seconds
|
||||
sqlite.journal_mode = "WAL"
|
||||
sqlite.synchronous = "off"
|
||||
sqlite
|
||||
end
|
||||
|
||||
def initialize_output_db
|
||||
@statement_counter = 0
|
||||
|
||||
@output_db.execute(<<~SQL)
|
||||
CREATE TABLE IF NOT EXISTS uploads (
|
||||
id TEXT PRIMARY KEY NOT NULL,
|
||||
upload JSON_TEXT,
|
||||
markdown TEXT,
|
||||
skip_reason TEXT
|
||||
)
|
||||
SQL
|
||||
|
||||
@output_db.execute(<<~SQL)
|
||||
CREATE TABLE IF NOT EXISTS optimized_images (
|
||||
id TEXT PRIMARY KEY NOT NULL,
|
||||
optimized_images JSON_TEXT
|
||||
)
|
||||
SQL
|
||||
end
|
||||
|
||||
def configure_site_settings
|
||||
settings = @options[:site_settings]
|
||||
|
||||
SiteSetting.clean_up_uploads = false
|
||||
SiteSetting.authorized_extensions = settings[:authorized_extensions]
|
||||
SiteSetting.max_attachment_size_kb = settings[:max_attachment_size_kb]
|
||||
SiteSetting.max_image_size_kb = settings[:max_image_size_kb]
|
||||
|
||||
if settings[:multisite]
|
||||
# rubocop:disable Discourse/NoDirectMultisiteManipulation
|
||||
Rails.configuration.multisite = true
|
||||
# rubocop:enable Discourse/NoDirectMultisiteManipulation
|
||||
|
||||
RailsMultisite::ConnectionManagement.class_eval do
|
||||
def self.current_db_override=(value)
|
||||
@current_db_override = value
|
||||
end
|
||||
def self.current_db
|
||||
@current_db_override
|
||||
end
|
||||
end
|
||||
RailsMultisite::ConnectionManagement.current_db_override = settings[:multisite_db_name]
|
||||
end
|
||||
|
||||
if settings[:enable_s3_uploads]
|
||||
SiteSetting.s3_access_key_id = settings[:s3_access_key_id]
|
||||
SiteSetting.s3_secret_access_key = settings[:s3_secret_access_key]
|
||||
SiteSetting.s3_upload_bucket = settings[:s3_upload_bucket]
|
||||
SiteSetting.s3_region = settings[:s3_region]
|
||||
SiteSetting.s3_cdn_url = settings[:s3_cdn_url]
|
||||
SiteSetting.enable_s3_uploads = true
|
||||
|
||||
raise "Failed to enable S3 uploads" if SiteSetting.enable_s3_uploads != true
|
||||
|
||||
Tempfile.open("discourse-s3-test") do |tmpfile|
|
||||
tmpfile.write("test")
|
||||
tmpfile.rewind
|
||||
|
||||
upload =
|
||||
UploadCreator.new(tmpfile, "discourse-s3-test.txt").create_for(
|
||||
Discourse::SYSTEM_USER_ID,
|
||||
)
|
||||
|
||||
unless upload.present? && upload.persisted? && upload.errors.blank? &&
|
||||
upload.url.start_with?("//")
|
||||
raise "Failed to upload to S3"
|
||||
end
|
||||
|
||||
upload.destroy
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
|
@ -0,0 +1,233 @@
|
|||
# frozen_string_literal: true
|
||||
|
||||
require_relative "./base"
|
||||
|
||||
module Migrations
|
||||
module Uploads
|
||||
class Uploader < Base
|
||||
def initialize(settings)
|
||||
@settings = settings
|
||||
|
||||
@source_db = create_connection(@settings[:source_db_path])
|
||||
@output_db = settings.output_db
|
||||
end
|
||||
|
||||
def self.run!(settings)
|
||||
puts "Uploading uploads..."
|
||||
|
||||
new(settings).run!
|
||||
end
|
||||
|
||||
def run!
|
||||
queue = SizedQueue.new(QUEUE_SIZE)
|
||||
consumer_threads = []
|
||||
|
||||
if @settings[:delete_missing_uploads]
|
||||
puts "Deleting missing uploads from output database..."
|
||||
@output_db.execute(<<~SQL)
|
||||
DELETE FROM uploads
|
||||
WHERE upload IS NULL
|
||||
SQL
|
||||
end
|
||||
|
||||
output_existing_ids = Set.new
|
||||
query("SELECT id FROM uploads", @output_db).tap do |result_set|
|
||||
result_set.each { |row| output_existing_ids << row["id"] }
|
||||
result_set.close
|
||||
end
|
||||
|
||||
source_existing_ids = Set.new
|
||||
query("SELECT id FROM uploads", @source_db).tap do |result_set|
|
||||
result_set.each { |row| source_existing_ids << row["id"] }
|
||||
result_set.close
|
||||
end
|
||||
|
||||
if (surplus_upload_ids = output_existing_ids - source_existing_ids).any?
|
||||
if @settings[:delete_surplus_uploads]
|
||||
puts "Deleting #{surplus_upload_ids.size} uploads from output database..."
|
||||
|
||||
surplus_upload_ids.each_slice(TRANSACTION_SIZE) do |ids|
|
||||
placeholders = (["?"] * ids.size).join(",")
|
||||
@output_db.execute(<<~SQL, ids)
|
||||
DELETE FROM uploads
|
||||
WHERE id IN (#{placeholders})
|
||||
SQL
|
||||
end
|
||||
|
||||
output_existing_ids -= surplus_upload_ids
|
||||
else
|
||||
puts "Found #{surplus_upload_ids.size} surplus uploads in output database. " \
|
||||
"Run with `delete_surplus_uploads: true` to delete them."
|
||||
end
|
||||
|
||||
surplus_upload_ids = nil
|
||||
end
|
||||
|
||||
max_count = (source_existing_ids - output_existing_ids).size
|
||||
source_existing_ids = nil
|
||||
puts "Found #{output_existing_ids.size} existing uploads. #{max_count} are missing."
|
||||
|
||||
producer_thread =
|
||||
Thread.new do
|
||||
query("SELECT * FROM uploads", @source_db).tap do |result_set|
|
||||
result_set.each { |row| queue << row unless output_existing_ids.include?(row["id"]) }
|
||||
result_set.close
|
||||
end
|
||||
end
|
||||
|
||||
status_queue = SizedQueue.new(QUEUE_SIZE)
|
||||
status_thread =
|
||||
Thread.new do
|
||||
error_count = 0
|
||||
skipped_count = 0
|
||||
current_count = 0
|
||||
|
||||
while !(params = status_queue.pop).nil?
|
||||
begin
|
||||
if params.delete(:skipped) == true
|
||||
skipped_count += 1
|
||||
elsif (error_message = params.delete(:error)) || params[:upload].nil?
|
||||
error_count += 1
|
||||
puts "", "Failed to create upload: #{params[:id]} (#{error_message})", ""
|
||||
end
|
||||
|
||||
@output_db.execute(<<~SQL, params)
|
||||
INSERT INTO uploads (id, upload, markdown, skip_reason)
|
||||
VALUES (:id, :upload, :markdown, :skip_reason)
|
||||
SQL
|
||||
rescue StandardError => e
|
||||
puts "", "Failed to insert upload: #{params[:id]} (#{e.message}))", ""
|
||||
error_count += 1
|
||||
end
|
||||
|
||||
current_count += 1
|
||||
error_count_text = error_count > 0 ? "#{error_count} errors".red : "0 errors"
|
||||
|
||||
print "\r%7d / %7d (%s, %d skipped)" %
|
||||
[current_count, max_count, error_count_text, skipped_count]
|
||||
end
|
||||
end
|
||||
|
||||
(Etc.nprocessors * @settings[:thread_count_factor]).to_i.times do |index|
|
||||
consumer_threads << Thread.new do
|
||||
Thread.current.name = "worker-#{index}"
|
||||
|
||||
store = Discourse.store
|
||||
|
||||
while (row = queue.pop)
|
||||
begin
|
||||
data_file = nil
|
||||
path = nil
|
||||
|
||||
if row["data"].present?
|
||||
data_file = Tempfile.new("discourse-upload", binmode: true)
|
||||
data_file.write(row["data"])
|
||||
data_file.rewind
|
||||
path = data_file.path
|
||||
else
|
||||
relative_path = row["relative_path"]
|
||||
file_exists = false
|
||||
|
||||
@root_paths.each do |root_path|
|
||||
path = File.join(root_path, relative_path, row["filename"])
|
||||
break if (file_exists = File.exist?(path))
|
||||
|
||||
@settings[:path_replacements].each do |from, to|
|
||||
path = File.join(root_path, relative_path.sub(from, to), row["filename"])
|
||||
break if (file_exists = File.exist?(path))
|
||||
end
|
||||
end
|
||||
|
||||
if !file_exists
|
||||
status_queue << {
|
||||
id: row["id"],
|
||||
upload: nil,
|
||||
skipped: true,
|
||||
skip_reason: "file not found",
|
||||
}
|
||||
next
|
||||
end
|
||||
end
|
||||
|
||||
retry_count = 0
|
||||
|
||||
loop do
|
||||
error_message = nil
|
||||
upload =
|
||||
copy_to_tempfile(path) do |file|
|
||||
begin
|
||||
UploadCreator.new(
|
||||
file,
|
||||
row["display_filename"] || row["filename"],
|
||||
type: row["type"],
|
||||
).create_for(Discourse::SYSTEM_USER_ID)
|
||||
rescue StandardError => e
|
||||
error_message = e.message
|
||||
nil
|
||||
end
|
||||
end
|
||||
|
||||
if (upload_okay = upload.present? && upload.persisted? && upload.errors.blank?)
|
||||
upload_path = add_multisite_prefix(store.get_path_for_upload(upload))
|
||||
|
||||
file_exists =
|
||||
if store.external?
|
||||
store.object_from_path(upload_path).exists?
|
||||
else
|
||||
File.exist?(File.join(store.public_dir, upload_path))
|
||||
end
|
||||
|
||||
unless file_exists
|
||||
upload.destroy
|
||||
upload = nil
|
||||
upload_okay = false
|
||||
end
|
||||
end
|
||||
|
||||
if upload_okay
|
||||
status_queue << {
|
||||
id: row["id"],
|
||||
upload: upload.attributes.to_json,
|
||||
markdown: UploadMarkdown.new(upload).to_markdown,
|
||||
skip_reason: nil,
|
||||
}
|
||||
break
|
||||
elsif retry_count >= 3
|
||||
error_message ||= upload&.errors&.full_messages&.join(", ") || "unknown error"
|
||||
status_queue << {
|
||||
id: row["id"],
|
||||
upload: nil,
|
||||
markdown: nil,
|
||||
error: "too many retries: #{error_message}",
|
||||
skip_reason: "too many retries",
|
||||
}
|
||||
break
|
||||
end
|
||||
|
||||
retry_count += 1
|
||||
sleep 0.25 * retry_count
|
||||
end
|
||||
rescue StandardError => e
|
||||
status_queue << {
|
||||
id: row["id"],
|
||||
upload: nil,
|
||||
markdown: nil,
|
||||
error: e.message,
|
||||
skip_reason: "error",
|
||||
}
|
||||
ensure
|
||||
data_file&.close!
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
producer_thread.join
|
||||
queue.close
|
||||
consumer_threads.each(&:join)
|
||||
status_queue.close
|
||||
status_thread.join
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
Loading…
Reference in New Issue