663 lines
20 KiB
Ruby
663 lines
20 KiB
Ruby
# frozen_string_literal: true
|
|
puts "Loading application..."
|
|
require_relative "../../config/environment"
|
|
|
|
require "etc"
|
|
require "sqlite3"
|
|
require "colored2"
|
|
|
|
module BulkImport
|
|
class UploadsImporter
|
|
TRANSACTION_SIZE = 1000
|
|
QUEUE_SIZE = 1000
|
|
|
|
def initialize(settings_path)
|
|
@settings = YAML.load_file(settings_path, symbolize_names: true)
|
|
@settings[:path_replacements] ||= []
|
|
|
|
@root_paths = @settings[:root_paths]
|
|
@output_db = create_connection(@settings[:output_db_path])
|
|
|
|
initialize_output_db
|
|
configure_site_settings
|
|
end
|
|
|
|
def run
|
|
# disable logging for EXIFR which is used by ImageOptim
|
|
EXIFR.logger = Logger.new(nil)
|
|
|
|
if @settings[:fix_missing]
|
|
@source_db = create_connection(@settings[:output_db_path])
|
|
|
|
puts "Fixing missing uploads..."
|
|
fix_missing
|
|
else
|
|
@source_db = create_connection(@settings[:source_db_path])
|
|
|
|
puts "Uploading uploads..."
|
|
upload_files
|
|
|
|
puts "", "Creating optimized images..."
|
|
create_optimized_images if @settings[:create_optimized_images]
|
|
end
|
|
puts ""
|
|
ensure
|
|
close
|
|
end
|
|
|
|
def upload_files
|
|
queue = SizedQueue.new(QUEUE_SIZE)
|
|
consumer_threads = []
|
|
|
|
if @settings[:delete_missing_uploads]
|
|
puts "Deleting missing uploads from output database..."
|
|
@output_db.execute(<<~SQL)
|
|
DELETE FROM uploads
|
|
WHERE upload IS NULL
|
|
SQL
|
|
end
|
|
|
|
output_existing_ids = Set.new
|
|
query("SELECT id FROM uploads", @output_db).tap do |result_set|
|
|
result_set.each { |row| output_existing_ids << row["id"] }
|
|
result_set.close
|
|
end
|
|
|
|
source_existing_ids = Set.new
|
|
query("SELECT id FROM uploads", @source_db).tap do |result_set|
|
|
result_set.each { |row| source_existing_ids << row["id"] }
|
|
result_set.close
|
|
end
|
|
|
|
if (surplus_upload_ids = output_existing_ids - source_existing_ids).any?
|
|
if @settings[:delete_surplus_uploads]
|
|
puts "Deleting #{surplus_upload_ids.size} uploads from output database..."
|
|
|
|
surplus_upload_ids.each_slice(TRANSACTION_SIZE) do |ids|
|
|
placeholders = (["?"] * ids.size).join(",")
|
|
@output_db.execute(<<~SQL, ids)
|
|
DELETE FROM uploads
|
|
WHERE id IN (#{placeholders})
|
|
SQL
|
|
end
|
|
|
|
output_existing_ids -= surplus_upload_ids
|
|
else
|
|
puts "Found #{surplus_upload_ids.size} surplus uploads in output database. " \
|
|
"Run with `delete_surplus_uploads: true` to delete them."
|
|
end
|
|
|
|
surplus_upload_ids = nil
|
|
end
|
|
|
|
max_count = (source_existing_ids - output_existing_ids).size
|
|
source_existing_ids = nil
|
|
puts "Found #{output_existing_ids.size} existing uploads. #{max_count} are missing."
|
|
|
|
producer_thread =
|
|
Thread.new do
|
|
query("SELECT * FROM uploads", @source_db).tap do |result_set|
|
|
result_set.each { |row| queue << row unless output_existing_ids.include?(row["id"]) }
|
|
result_set.close
|
|
end
|
|
end
|
|
|
|
status_queue = SizedQueue.new(QUEUE_SIZE)
|
|
status_thread =
|
|
Thread.new do
|
|
error_count = 0
|
|
skipped_count = 0
|
|
current_count = 0
|
|
|
|
while !(params = status_queue.pop).nil?
|
|
begin
|
|
if params.delete(:skipped) == true
|
|
skipped_count += 1
|
|
elsif (error_message = params.delete(:error)) || params[:upload].nil?
|
|
error_count += 1
|
|
puts "", "Failed to create upload: #{params[:id]} (#{error_message})", ""
|
|
end
|
|
|
|
@output_db.execute(<<~SQL, params)
|
|
INSERT INTO uploads (id, upload, markdown, skip_reason)
|
|
VALUES (:id, :upload, :markdown, :skip_reason)
|
|
SQL
|
|
rescue StandardError => e
|
|
puts "", "Failed to insert upload: #{params[:id]} (#{e.message}))", ""
|
|
error_count += 1
|
|
end
|
|
|
|
current_count += 1
|
|
error_count_text = error_count > 0 ? "#{error_count} errors".red : "0 errors"
|
|
|
|
print "\r%7d / %7d (%s, %d skipped)" %
|
|
[current_count, max_count, error_count_text, skipped_count]
|
|
end
|
|
end
|
|
|
|
(Etc.nprocessors * @settings[:thread_count_factor]).to_i.times do |index|
|
|
consumer_threads << Thread.new do
|
|
Thread.current.name = "worker-#{index}"
|
|
|
|
store = Discourse.store
|
|
|
|
while (row = queue.pop)
|
|
begin
|
|
data_file = nil
|
|
path = nil
|
|
|
|
if row["data"].present?
|
|
data_file = Tempfile.new("discourse-upload", binmode: true)
|
|
data_file.write(row["data"])
|
|
data_file.rewind
|
|
path = data_file.path
|
|
else
|
|
relative_path = row["relative_path"]
|
|
file_exists = false
|
|
|
|
@root_paths.each do |root_path|
|
|
path = File.join(root_path, relative_path, row["filename"])
|
|
break if (file_exists = File.exist?(path))
|
|
|
|
@settings[:path_replacements].each do |from, to|
|
|
path = File.join(root_path, relative_path.sub(from, to), row["filename"])
|
|
break if (file_exists = File.exist?(path))
|
|
end
|
|
end
|
|
|
|
if !file_exists
|
|
status_queue << {
|
|
id: row["id"],
|
|
upload: nil,
|
|
skipped: true,
|
|
skip_reason: "file not found",
|
|
}
|
|
next
|
|
end
|
|
end
|
|
|
|
retry_count = 0
|
|
|
|
loop do
|
|
error_message = nil
|
|
upload =
|
|
copy_to_tempfile(path) do |file|
|
|
begin
|
|
UploadCreator.new(file, row["filename"], type: row["type"]).create_for(
|
|
Discourse::SYSTEM_USER_ID,
|
|
)
|
|
rescue StandardError => e
|
|
error_message = e.message
|
|
nil
|
|
end
|
|
end
|
|
|
|
if (upload_okay = upload.present? && upload.persisted? && upload.errors.blank?)
|
|
upload_path = store.get_path_for_upload(upload)
|
|
|
|
file_exists =
|
|
if store.external?
|
|
store.object_from_path(upload_path).exists?
|
|
else
|
|
File.exist?(File.join(store.public_dir, upload_path))
|
|
end
|
|
|
|
unless file_exists
|
|
upload.destroy
|
|
upload = nil
|
|
upload_okay = false
|
|
end
|
|
end
|
|
|
|
if upload_okay
|
|
status_queue << {
|
|
id: row["id"],
|
|
upload: upload.attributes.to_json,
|
|
markdown: UploadMarkdown.new(upload).to_markdown,
|
|
skip_reason: nil,
|
|
}
|
|
break
|
|
elsif retry_count >= 3
|
|
error_message ||= upload&.errors&.full_messages&.join(", ") || "unknown error"
|
|
status_queue << {
|
|
id: row["id"],
|
|
upload: nil,
|
|
markdown: nil,
|
|
error: "too many retries: #{error_message}",
|
|
skip_reason: "too many retries",
|
|
}
|
|
break
|
|
end
|
|
|
|
retry_count += 1
|
|
sleep 0.25 * retry_count
|
|
end
|
|
rescue StandardError => e
|
|
status_queue << {
|
|
id: row["id"],
|
|
upload: nil,
|
|
markdown: nil,
|
|
error: e.message,
|
|
skip_reason: "error",
|
|
}
|
|
ensure
|
|
data_file&.close!
|
|
end
|
|
end
|
|
end
|
|
end
|
|
|
|
producer_thread.join
|
|
queue.close
|
|
consumer_threads.each(&:join)
|
|
status_queue.close
|
|
status_thread.join
|
|
end
|
|
|
|
def fix_missing
|
|
queue = SizedQueue.new(QUEUE_SIZE)
|
|
consumer_threads = []
|
|
|
|
max_count =
|
|
@source_db.get_first_value("SELECT COUNT(*) FROM uploads WHERE upload IS NOT NULL")
|
|
|
|
producer_thread =
|
|
Thread.new do
|
|
query(
|
|
"SELECT id, upload FROM uploads WHERE upload IS NOT NULL ORDER BY rowid DESC",
|
|
@source_db,
|
|
).tap do |result_set|
|
|
result_set.each { |row| queue << row }
|
|
result_set.close
|
|
end
|
|
end
|
|
|
|
status_queue = SizedQueue.new(QUEUE_SIZE)
|
|
status_thread =
|
|
Thread.new do
|
|
error_count = 0
|
|
current_count = 0
|
|
missing_count = 0
|
|
|
|
while !(result = status_queue.pop).nil?
|
|
current_count += 1
|
|
|
|
case result[:status]
|
|
when :ok
|
|
# ignore
|
|
when :error
|
|
error_count += 1
|
|
puts "Error in #{result[:id]}"
|
|
when :missing
|
|
missing_count += 1
|
|
puts "Missing #{result[:id]}"
|
|
|
|
@output_db.execute("DELETE FROM uploads WHERE id = ?", result[:id])
|
|
Upload.delete_by(id: result[:upload_id])
|
|
end
|
|
|
|
error_count_text = error_count > 0 ? "#{error_count} errors".red : "0 errors"
|
|
|
|
print "\r%7d / %7d (%s, %s missing)" %
|
|
[current_count, max_count, error_count_text, missing_count]
|
|
end
|
|
end
|
|
|
|
store = Discourse.store
|
|
|
|
(Etc.nprocessors * @settings[:thread_count_factor] * 2).to_i.times do |index|
|
|
consumer_threads << Thread.new do
|
|
Thread.current.name = "worker-#{index}"
|
|
fake_upload = OpenStruct.new(url: "")
|
|
while (row = queue.pop)
|
|
begin
|
|
upload = JSON.parse(row["upload"])
|
|
fake_upload.url = upload["url"]
|
|
path = store.get_path_for_upload(fake_upload)
|
|
|
|
file_exists =
|
|
if store.external?
|
|
store.object_from_path(path).exists?
|
|
else
|
|
File.exist?(File.join(store.public_dir, path))
|
|
end
|
|
|
|
if file_exists
|
|
status_queue << { id: row["id"], upload_id: upload["id"], status: :ok }
|
|
else
|
|
status_queue << { id: row["id"], upload_id: upload["id"], status: :missing }
|
|
end
|
|
rescue StandardError => e
|
|
puts e.message
|
|
status_queue << { id: row["id"], upload_id: upload["id"], status: :error }
|
|
end
|
|
end
|
|
end
|
|
end
|
|
|
|
producer_thread.join
|
|
queue.close
|
|
consumer_threads.each(&:join)
|
|
status_queue.close
|
|
status_thread.join
|
|
end
|
|
|
|
def create_optimized_images
|
|
init_threads = []
|
|
optimized_upload_ids = Set.new
|
|
post_upload_ids = Set.new
|
|
avatar_upload_ids = Set.new
|
|
max_count = 0
|
|
|
|
# allow more than 1 thread to optimized images at the same time
|
|
OptimizedImage.lock_per_machine = false
|
|
|
|
init_threads << Thread.new do
|
|
query("SELECT id FROM optimized_images", @output_db).tap do |result_set|
|
|
result_set.each { |row| optimized_upload_ids << row["id"] }
|
|
result_set.close
|
|
end
|
|
end
|
|
|
|
init_threads << Thread.new do
|
|
sql = <<~SQL
|
|
SELECT upload_ids
|
|
FROM posts
|
|
WHERE upload_ids IS NOT NULL
|
|
SQL
|
|
query(sql, @source_db).tap do |result_set|
|
|
result_set.each do |row|
|
|
JSON.parse(row["upload_ids"]).each { |id| post_upload_ids << id }
|
|
end
|
|
result_set.close
|
|
end
|
|
end
|
|
|
|
init_threads << Thread.new do
|
|
sql = <<~SQL
|
|
SELECT avatar_upload_id
|
|
FROM users
|
|
WHERE avatar_upload_id IS NOT NULL
|
|
SQL
|
|
query(sql, @source_db).tap do |result_set|
|
|
result_set.each { |row| avatar_upload_ids << row["avatar_upload_id"] }
|
|
result_set.close
|
|
end
|
|
end
|
|
|
|
init_threads << Thread.new do
|
|
max_count =
|
|
@output_db.get_first_value("SELECT COUNT(*) FROM uploads WHERE upload IS NOT NULL")
|
|
end
|
|
|
|
init_threads.each(&:join)
|
|
|
|
status_queue = SizedQueue.new(QUEUE_SIZE)
|
|
status_thread =
|
|
Thread.new do
|
|
error_count = 0
|
|
current_count = 0
|
|
skipped_count = 0
|
|
|
|
while !(params = status_queue.pop).nil?
|
|
current_count += 1
|
|
|
|
case params.delete(:status)
|
|
when :ok
|
|
@output_db.execute(<<~SQL, params)
|
|
INSERT INTO optimized_images (id, optimized_images)
|
|
VALUES (:id, :optimized_images)
|
|
SQL
|
|
when :error
|
|
error_count += 1
|
|
when :skipped
|
|
skipped_count += 1
|
|
end
|
|
|
|
error_count_text = error_count > 0 ? "#{error_count} errors".red : "0 errors"
|
|
|
|
print "\r%7d / %7d (%s, %d skipped)" %
|
|
[current_count, max_count, error_count_text, skipped_count]
|
|
end
|
|
end
|
|
|
|
queue = SizedQueue.new(QUEUE_SIZE)
|
|
consumer_threads = []
|
|
|
|
producer_thread =
|
|
Thread.new do
|
|
sql = <<~SQL
|
|
SELECT id AS upload_id, upload ->> 'sha1' AS upload_sha1, markdown
|
|
FROM uploads
|
|
WHERE upload IS NOT NULL
|
|
ORDER BY rowid
|
|
SQL
|
|
|
|
query(sql, @output_db).tap do |result_set|
|
|
result_set.each do |row|
|
|
upload_id = row["upload_id"]
|
|
|
|
if optimized_upload_ids.include?(upload_id) || !row["markdown"].start_with?("![")
|
|
status_queue << { id: row["upload_id"], status: :skipped }
|
|
next
|
|
end
|
|
|
|
if post_upload_ids.include?(upload_id)
|
|
row["type"] = "post"
|
|
queue << row
|
|
elsif avatar_upload_ids.include?(upload_id)
|
|
row["type"] = "avatar"
|
|
queue << row
|
|
else
|
|
status_queue << { id: row["upload_id"], status: :skipped }
|
|
end
|
|
end
|
|
result_set.close
|
|
end
|
|
end
|
|
|
|
avatar_sizes = Discourse.avatar_sizes
|
|
store = Discourse.store
|
|
remote_factor = store.external? ? 2 : 1
|
|
|
|
Jobs.run_immediately!
|
|
|
|
(Etc.nprocessors * @settings[:thread_count_factor] * remote_factor).to_i.times do |index|
|
|
consumer_threads << Thread.new do
|
|
Thread.current.name = "worker-#{index}"
|
|
|
|
post =
|
|
PostCreator.new(
|
|
Discourse.system_user,
|
|
raw: "Topic created by uploads_importer",
|
|
acting_user: Discourse.system_user,
|
|
skip_validations: true,
|
|
title: "Topic created by uploads_importer - #{SecureRandom.hex}",
|
|
archetype: Archetype.default,
|
|
category: Category.last.id,
|
|
).create!
|
|
|
|
while (row = queue.pop)
|
|
retry_count = 0
|
|
|
|
loop do
|
|
upload = Upload.find_by(sha1: row["upload_sha1"])
|
|
|
|
optimized_images =
|
|
begin
|
|
case row["type"]
|
|
when "post"
|
|
post.update_columns(baked_at: nil, cooked: "", raw: row["markdown"])
|
|
post.reload
|
|
post.rebake!
|
|
OptimizedImage.where(upload_id: upload.id).to_a
|
|
when "avatar"
|
|
avatar_sizes.map { |size| OptimizedImage.create_for(upload, size, size) }
|
|
end
|
|
rescue StandardError => e
|
|
puts e.message
|
|
puts e.stacktrace
|
|
nil
|
|
end
|
|
|
|
begin
|
|
if optimized_images.present?
|
|
optimized_images.map! do |optimized_image|
|
|
next unless optimized_image.present?
|
|
optimized_image_path = store.get_path_for_optimized_image(optimized_image)
|
|
|
|
file_exists =
|
|
if store.external?
|
|
store.object_from_path(optimized_image_path).exists?
|
|
else
|
|
File.exist?(File.join(store.public_dir, optimized_image_path))
|
|
end
|
|
|
|
unless file_exists
|
|
optimized_image.destroy
|
|
optimized_image = nil
|
|
end
|
|
|
|
optimized_image
|
|
end
|
|
end
|
|
rescue StandardError
|
|
optimized_images = nil
|
|
end
|
|
|
|
optimized_images_okay =
|
|
!optimized_images.nil? && optimized_images.all?(&:present?) &&
|
|
optimized_images.all?(&:persisted?) &&
|
|
optimized_images.all? { |o| o.errors.blank? }
|
|
|
|
if optimized_images_okay
|
|
status_queue << {
|
|
id: row["upload_id"],
|
|
optimized_images: optimized_images.presence&.to_json,
|
|
status: :ok,
|
|
}
|
|
break
|
|
elsif retry_count >= 3
|
|
status_queue << { id: row["upload_id"], status: :error }
|
|
break
|
|
end
|
|
|
|
retry_count += 1
|
|
sleep 0.25 * retry_count
|
|
end
|
|
end
|
|
end
|
|
end
|
|
|
|
producer_thread.join
|
|
queue.close
|
|
consumer_threads.each(&:join)
|
|
status_queue.close
|
|
status_thread.join
|
|
end
|
|
|
|
private
|
|
|
|
def create_connection(path)
|
|
sqlite = SQLite3::Database.new(path, results_as_hash: true)
|
|
sqlite.busy_timeout = 60_000 # 60 seconds
|
|
sqlite.journal_mode = "WAL"
|
|
sqlite.synchronous = "off"
|
|
sqlite
|
|
end
|
|
|
|
def query(sql, db)
|
|
db.prepare(sql).execute
|
|
end
|
|
|
|
def initialize_output_db
|
|
@statement_counter = 0
|
|
|
|
@output_db.execute(<<~SQL)
|
|
CREATE TABLE IF NOT EXISTS uploads (
|
|
id TEXT PRIMARY KEY NOT NULL,
|
|
upload JSON_TEXT,
|
|
markdown TEXT,
|
|
skip_reason TEXT
|
|
)
|
|
SQL
|
|
|
|
@output_db.execute(<<~SQL)
|
|
CREATE TABLE IF NOT EXISTS optimized_images (
|
|
id TEXT PRIMARY KEY NOT NULL,
|
|
optimized_images JSON_TEXT
|
|
)
|
|
SQL
|
|
end
|
|
|
|
def insert(sql, bind_vars = [])
|
|
@output_db.transaction if @statement_counter == 0
|
|
@output_db.execute(sql, bind_vars)
|
|
|
|
if (@statement_counter += 1) > TRANSACTION_SIZE
|
|
@output_db.commit
|
|
@statement_counter = 0
|
|
end
|
|
end
|
|
|
|
def close
|
|
@source_db.close if @source_db
|
|
|
|
if @output_db
|
|
@output_db.commit if @output_db.transaction_active?
|
|
@output_db.close
|
|
end
|
|
end
|
|
|
|
def copy_to_tempfile(source_path)
|
|
extension = File.extname(source_path)
|
|
|
|
Tempfile.open(["discourse-upload", extension]) do |tmpfile|
|
|
File.open(source_path, "rb") { |source_stream| IO.copy_stream(source_stream, tmpfile) }
|
|
tmpfile.rewind
|
|
yield(tmpfile)
|
|
end
|
|
end
|
|
|
|
def configure_site_settings
|
|
settings = @settings[:site_settings]
|
|
|
|
SiteSetting.clean_up_uploads = false
|
|
SiteSetting.authorized_extensions = settings[:authorized_extensions]
|
|
SiteSetting.max_attachment_size_kb = settings[:max_attachment_size_kb]
|
|
SiteSetting.max_image_size_kb = settings[:max_image_size_kb]
|
|
|
|
if settings[:enable_s3_uploads]
|
|
SiteSetting.s3_access_key_id = settings[:s3_access_key_id]
|
|
SiteSetting.s3_secret_access_key = settings[:s3_secret_access_key]
|
|
SiteSetting.s3_upload_bucket = settings[:s3_upload_bucket]
|
|
SiteSetting.s3_region = settings[:s3_region]
|
|
SiteSetting.s3_cdn_url = settings[:s3_cdn_url]
|
|
SiteSetting.enable_s3_uploads = true
|
|
|
|
raise "Failed to enable S3 uploads" if SiteSetting.enable_s3_uploads != true
|
|
|
|
Tempfile.open("discourse-s3-test") do |tmpfile|
|
|
tmpfile.write("test")
|
|
tmpfile.rewind
|
|
|
|
upload =
|
|
UploadCreator.new(tmpfile, "discourse-s3-test.txt").create_for(
|
|
Discourse::SYSTEM_USER_ID,
|
|
)
|
|
|
|
unless upload.present? && upload.persisted? && upload.errors.blank? &&
|
|
upload.url.start_with?("//")
|
|
raise "Failed to upload to S3"
|
|
end
|
|
|
|
upload.destroy
|
|
end
|
|
end
|
|
end
|
|
end
|
|
end
|
|
|
|
# bundle exec ruby script/bulk_import/uploads_importer.rb /path/to/uploads_importer.yml
|
|
BulkImport::UploadsImporter.new(ARGV.first).run
|