discourse/script/bulk_import/uploads_importer.rb

663 lines
20 KiB
Ruby

# frozen_string_literal: true
puts "Loading application..."
require_relative "../../config/environment"
require "etc"
require "sqlite3"
require "colored2"
module BulkImport
class UploadsImporter
TRANSACTION_SIZE = 1000
QUEUE_SIZE = 1000
def initialize(settings_path)
@settings = YAML.load_file(settings_path, symbolize_names: true)
@settings[:path_replacements] ||= []
@root_paths = @settings[:root_paths]
@output_db = create_connection(@settings[:output_db_path])
initialize_output_db
configure_site_settings
end
def run
# disable logging for EXIFR which is used by ImageOptim
EXIFR.logger = Logger.new(nil)
if @settings[:fix_missing]
@source_db = create_connection(@settings[:output_db_path])
puts "Fixing missing uploads..."
fix_missing
else
@source_db = create_connection(@settings[:source_db_path])
puts "Uploading uploads..."
upload_files
puts "", "Creating optimized images..."
create_optimized_images if @settings[:create_optimized_images]
end
puts ""
ensure
close
end
def upload_files
queue = SizedQueue.new(QUEUE_SIZE)
consumer_threads = []
if @settings[:delete_missing_uploads]
puts "Deleting missing uploads from output database..."
@output_db.execute(<<~SQL)
DELETE FROM uploads
WHERE upload IS NULL
SQL
end
output_existing_ids = Set.new
query("SELECT id FROM uploads", @output_db).tap do |result_set|
result_set.each { |row| output_existing_ids << row["id"] }
result_set.close
end
source_existing_ids = Set.new
query("SELECT id FROM uploads", @source_db).tap do |result_set|
result_set.each { |row| source_existing_ids << row["id"] }
result_set.close
end
if (surplus_upload_ids = output_existing_ids - source_existing_ids).any?
if @settings[:delete_surplus_uploads]
puts "Deleting #{surplus_upload_ids.size} uploads from output database..."
surplus_upload_ids.each_slice(TRANSACTION_SIZE) do |ids|
placeholders = (["?"] * ids.size).join(",")
@output_db.execute(<<~SQL, ids)
DELETE FROM uploads
WHERE id IN (#{placeholders})
SQL
end
output_existing_ids -= surplus_upload_ids
else
puts "Found #{surplus_upload_ids.size} surplus uploads in output database. " \
"Run with `delete_surplus_uploads: true` to delete them."
end
surplus_upload_ids = nil
end
max_count = (source_existing_ids - output_existing_ids).size
source_existing_ids = nil
puts "Found #{output_existing_ids.size} existing uploads. #{max_count} are missing."
producer_thread =
Thread.new do
query("SELECT * FROM uploads", @source_db).tap do |result_set|
result_set.each { |row| queue << row unless output_existing_ids.include?(row["id"]) }
result_set.close
end
end
status_queue = SizedQueue.new(QUEUE_SIZE)
status_thread =
Thread.new do
error_count = 0
skipped_count = 0
current_count = 0
while !(params = status_queue.pop).nil?
begin
if params.delete(:skipped) == true
skipped_count += 1
elsif (error_message = params.delete(:error)) || params[:upload].nil?
error_count += 1
puts "", "Failed to create upload: #{params[:id]} (#{error_message})", ""
end
@output_db.execute(<<~SQL, params)
INSERT INTO uploads (id, upload, markdown, skip_reason)
VALUES (:id, :upload, :markdown, :skip_reason)
SQL
rescue StandardError => e
puts "", "Failed to insert upload: #{params[:id]} (#{e.message}))", ""
error_count += 1
end
current_count += 1
error_count_text = error_count > 0 ? "#{error_count} errors".red : "0 errors"
print "\r%7d / %7d (%s, %d skipped)" %
[current_count, max_count, error_count_text, skipped_count]
end
end
(Etc.nprocessors * @settings[:thread_count_factor]).to_i.times do |index|
consumer_threads << Thread.new do
Thread.current.name = "worker-#{index}"
store = Discourse.store
while (row = queue.pop)
begin
data_file = nil
path = nil
if row["data"].present?
data_file = Tempfile.new("discourse-upload", binmode: true)
data_file.write(row["data"])
data_file.rewind
path = data_file.path
else
relative_path = row["relative_path"]
file_exists = false
@root_paths.each do |root_path|
path = File.join(root_path, relative_path, row["filename"])
break if (file_exists = File.exist?(path))
@settings[:path_replacements].each do |from, to|
path = File.join(root_path, relative_path.sub(from, to), row["filename"])
break if (file_exists = File.exist?(path))
end
end
if !file_exists
status_queue << {
id: row["id"],
upload: nil,
skipped: true,
skip_reason: "file not found",
}
next
end
end
retry_count = 0
loop do
error_message = nil
upload =
copy_to_tempfile(path) do |file|
begin
UploadCreator.new(file, row["filename"], type: row["type"]).create_for(
Discourse::SYSTEM_USER_ID,
)
rescue StandardError => e
error_message = e.message
nil
end
end
if (upload_okay = upload.present? && upload.persisted? && upload.errors.blank?)
upload_path = store.get_path_for_upload(upload)
file_exists =
if store.external?
store.object_from_path(upload_path).exists?
else
File.exist?(File.join(store.public_dir, upload_path))
end
unless file_exists
upload.destroy
upload = nil
upload_okay = false
end
end
if upload_okay
status_queue << {
id: row["id"],
upload: upload.attributes.to_json,
markdown: UploadMarkdown.new(upload).to_markdown,
skip_reason: nil,
}
break
elsif retry_count >= 3
error_message ||= upload&.errors&.full_messages&.join(", ") || "unknown error"
status_queue << {
id: row["id"],
upload: nil,
markdown: nil,
error: "too many retries: #{error_message}",
skip_reason: "too many retries",
}
break
end
retry_count += 1
sleep 0.25 * retry_count
end
rescue StandardError => e
status_queue << {
id: row["id"],
upload: nil,
markdown: nil,
error: e.message,
skip_reason: "error",
}
ensure
data_file&.close!
end
end
end
end
producer_thread.join
queue.close
consumer_threads.each(&:join)
status_queue.close
status_thread.join
end
def fix_missing
queue = SizedQueue.new(QUEUE_SIZE)
consumer_threads = []
max_count =
@source_db.get_first_value("SELECT COUNT(*) FROM uploads WHERE upload IS NOT NULL")
producer_thread =
Thread.new do
query(
"SELECT id, upload FROM uploads WHERE upload IS NOT NULL ORDER BY rowid DESC",
@source_db,
).tap do |result_set|
result_set.each { |row| queue << row }
result_set.close
end
end
status_queue = SizedQueue.new(QUEUE_SIZE)
status_thread =
Thread.new do
error_count = 0
current_count = 0
missing_count = 0
while !(result = status_queue.pop).nil?
current_count += 1
case result[:status]
when :ok
# ignore
when :error
error_count += 1
puts "Error in #{result[:id]}"
when :missing
missing_count += 1
puts "Missing #{result[:id]}"
@output_db.execute("DELETE FROM uploads WHERE id = ?", result[:id])
Upload.delete_by(id: result[:upload_id])
end
error_count_text = error_count > 0 ? "#{error_count} errors".red : "0 errors"
print "\r%7d / %7d (%s, %s missing)" %
[current_count, max_count, error_count_text, missing_count]
end
end
store = Discourse.store
(Etc.nprocessors * @settings[:thread_count_factor] * 2).to_i.times do |index|
consumer_threads << Thread.new do
Thread.current.name = "worker-#{index}"
fake_upload = OpenStruct.new(url: "")
while (row = queue.pop)
begin
upload = JSON.parse(row["upload"])
fake_upload.url = upload["url"]
path = store.get_path_for_upload(fake_upload)
file_exists =
if store.external?
store.object_from_path(path).exists?
else
File.exist?(File.join(store.public_dir, path))
end
if file_exists
status_queue << { id: row["id"], upload_id: upload["id"], status: :ok }
else
status_queue << { id: row["id"], upload_id: upload["id"], status: :missing }
end
rescue StandardError => e
puts e.message
status_queue << { id: row["id"], upload_id: upload["id"], status: :error }
end
end
end
end
producer_thread.join
queue.close
consumer_threads.each(&:join)
status_queue.close
status_thread.join
end
def create_optimized_images
init_threads = []
optimized_upload_ids = Set.new
post_upload_ids = Set.new
avatar_upload_ids = Set.new
max_count = 0
# allow more than 1 thread to optimized images at the same time
OptimizedImage.lock_per_machine = false
init_threads << Thread.new do
query("SELECT id FROM optimized_images", @output_db).tap do |result_set|
result_set.each { |row| optimized_upload_ids << row["id"] }
result_set.close
end
end
init_threads << Thread.new do
sql = <<~SQL
SELECT upload_ids
FROM posts
WHERE upload_ids IS NOT NULL
SQL
query(sql, @source_db).tap do |result_set|
result_set.each do |row|
JSON.parse(row["upload_ids"]).each { |id| post_upload_ids << id }
end
result_set.close
end
end
init_threads << Thread.new do
sql = <<~SQL
SELECT avatar_upload_id
FROM users
WHERE avatar_upload_id IS NOT NULL
SQL
query(sql, @source_db).tap do |result_set|
result_set.each { |row| avatar_upload_ids << row["avatar_upload_id"] }
result_set.close
end
end
init_threads << Thread.new do
max_count =
@output_db.get_first_value("SELECT COUNT(*) FROM uploads WHERE upload IS NOT NULL")
end
init_threads.each(&:join)
status_queue = SizedQueue.new(QUEUE_SIZE)
status_thread =
Thread.new do
error_count = 0
current_count = 0
skipped_count = 0
while !(params = status_queue.pop).nil?
current_count += 1
case params.delete(:status)
when :ok
@output_db.execute(<<~SQL, params)
INSERT INTO optimized_images (id, optimized_images)
VALUES (:id, :optimized_images)
SQL
when :error
error_count += 1
when :skipped
skipped_count += 1
end
error_count_text = error_count > 0 ? "#{error_count} errors".red : "0 errors"
print "\r%7d / %7d (%s, %d skipped)" %
[current_count, max_count, error_count_text, skipped_count]
end
end
queue = SizedQueue.new(QUEUE_SIZE)
consumer_threads = []
producer_thread =
Thread.new do
sql = <<~SQL
SELECT id AS upload_id, upload ->> 'sha1' AS upload_sha1, markdown
FROM uploads
WHERE upload IS NOT NULL
ORDER BY rowid
SQL
query(sql, @output_db).tap do |result_set|
result_set.each do |row|
upload_id = row["upload_id"]
if optimized_upload_ids.include?(upload_id) || !row["markdown"].start_with?("![")
status_queue << { id: row["upload_id"], status: :skipped }
next
end
if post_upload_ids.include?(upload_id)
row["type"] = "post"
queue << row
elsif avatar_upload_ids.include?(upload_id)
row["type"] = "avatar"
queue << row
else
status_queue << { id: row["upload_id"], status: :skipped }
end
end
result_set.close
end
end
avatar_sizes = Discourse.avatar_sizes
store = Discourse.store
remote_factor = store.external? ? 2 : 1
Jobs.run_immediately!
(Etc.nprocessors * @settings[:thread_count_factor] * remote_factor).to_i.times do |index|
consumer_threads << Thread.new do
Thread.current.name = "worker-#{index}"
post =
PostCreator.new(
Discourse.system_user,
raw: "Topic created by uploads_importer",
acting_user: Discourse.system_user,
skip_validations: true,
title: "Topic created by uploads_importer - #{SecureRandom.hex}",
archetype: Archetype.default,
category: Category.last.id,
).create!
while (row = queue.pop)
retry_count = 0
loop do
upload = Upload.find_by(sha1: row["upload_sha1"])
optimized_images =
begin
case row["type"]
when "post"
post.update_columns(baked_at: nil, cooked: "", raw: row["markdown"])
post.reload
post.rebake!
OptimizedImage.where(upload_id: upload.id).to_a
when "avatar"
avatar_sizes.map { |size| OptimizedImage.create_for(upload, size, size) }
end
rescue StandardError => e
puts e.message
puts e.stacktrace
nil
end
begin
if optimized_images.present?
optimized_images.map! do |optimized_image|
next unless optimized_image.present?
optimized_image_path = store.get_path_for_optimized_image(optimized_image)
file_exists =
if store.external?
store.object_from_path(optimized_image_path).exists?
else
File.exist?(File.join(store.public_dir, optimized_image_path))
end
unless file_exists
optimized_image.destroy
optimized_image = nil
end
optimized_image
end
end
rescue StandardError
optimized_images = nil
end
optimized_images_okay =
!optimized_images.nil? && optimized_images.all?(&:present?) &&
optimized_images.all?(&:persisted?) &&
optimized_images.all? { |o| o.errors.blank? }
if optimized_images_okay
status_queue << {
id: row["upload_id"],
optimized_images: optimized_images.presence&.to_json,
status: :ok,
}
break
elsif retry_count >= 3
status_queue << { id: row["upload_id"], status: :error }
break
end
retry_count += 1
sleep 0.25 * retry_count
end
end
end
end
producer_thread.join
queue.close
consumer_threads.each(&:join)
status_queue.close
status_thread.join
end
private
def create_connection(path)
sqlite = SQLite3::Database.new(path, results_as_hash: true)
sqlite.busy_timeout = 60_000 # 60 seconds
sqlite.journal_mode = "WAL"
sqlite.synchronous = "off"
sqlite
end
def query(sql, db)
db.prepare(sql).execute
end
def initialize_output_db
@statement_counter = 0
@output_db.execute(<<~SQL)
CREATE TABLE IF NOT EXISTS uploads (
id TEXT PRIMARY KEY NOT NULL,
upload JSON_TEXT,
markdown TEXT,
skip_reason TEXT
)
SQL
@output_db.execute(<<~SQL)
CREATE TABLE IF NOT EXISTS optimized_images (
id TEXT PRIMARY KEY NOT NULL,
optimized_images JSON_TEXT
)
SQL
end
def insert(sql, bind_vars = [])
@output_db.transaction if @statement_counter == 0
@output_db.execute(sql, bind_vars)
if (@statement_counter += 1) > TRANSACTION_SIZE
@output_db.commit
@statement_counter = 0
end
end
def close
@source_db.close if @source_db
if @output_db
@output_db.commit if @output_db.transaction_active?
@output_db.close
end
end
def copy_to_tempfile(source_path)
extension = File.extname(source_path)
Tempfile.open(["discourse-upload", extension]) do |tmpfile|
File.open(source_path, "rb") { |source_stream| IO.copy_stream(source_stream, tmpfile) }
tmpfile.rewind
yield(tmpfile)
end
end
def configure_site_settings
settings = @settings[:site_settings]
SiteSetting.clean_up_uploads = false
SiteSetting.authorized_extensions = settings[:authorized_extensions]
SiteSetting.max_attachment_size_kb = settings[:max_attachment_size_kb]
SiteSetting.max_image_size_kb = settings[:max_image_size_kb]
if settings[:enable_s3_uploads]
SiteSetting.s3_access_key_id = settings[:s3_access_key_id]
SiteSetting.s3_secret_access_key = settings[:s3_secret_access_key]
SiteSetting.s3_upload_bucket = settings[:s3_upload_bucket]
SiteSetting.s3_region = settings[:s3_region]
SiteSetting.s3_cdn_url = settings[:s3_cdn_url]
SiteSetting.enable_s3_uploads = true
raise "Failed to enable S3 uploads" if SiteSetting.enable_s3_uploads != true
Tempfile.open("discourse-s3-test") do |tmpfile|
tmpfile.write("test")
tmpfile.rewind
upload =
UploadCreator.new(tmpfile, "discourse-s3-test.txt").create_for(
Discourse::SYSTEM_USER_ID,
)
unless upload.present? && upload.persisted? && upload.errors.blank? &&
upload.url.start_with?("//")
raise "Failed to upload to S3"
end
upload.destroy
end
end
end
end
end
# bundle exec ruby script/bulk_import/uploads_importer.rb /path/to/uploads_importer.yml
BulkImport::UploadsImporter.new(ARGV.first).run