module Import class ImportDisabledError < RuntimeError; end class FilenameMissingError < RuntimeError; end class Importer def initialize(user_id, filename, publish_to_message_bus = false) @user_id, @filename, @publish_to_message_bus = user_id, filename, publish_to_message_bus ensure_import_is_enabled ensure_no_operation_is_running ensure_we_have_a_user ensure_we_have_a_filename initialize_state end def run log "[STARTED]" log "'#{@user_info[:username]}' has started the restore!" mark_import_as_running listen_for_shutdown_signal enable_readonly_mode pause_sidekiq wait_for_sidekiq ensure_directory_exists(@tmp_directory) copy_archive_to_tmp_directory unzip_archive extract_metadata validate_metadata extract_dump restore_dump switch_schema! # TOFIX: MessageBus is busted... migrate_database reconnect_database extract_uploads notify_user rescue SystemExit log "Restore process was cancelled!" rollback rescue Exception => ex log "EXCEPTION: " + ex.message log ex.backtrace.join("\n") rollback else @success = true ensure clean_up @success ? log("[SUCCESS]") : log("[FAILED]") end protected def ensure_import_is_enabled raise Import::ImportDisabledError unless SiteSetting.allow_restore? end def ensure_no_operation_is_running raise BackupRestore::OperationRunningError if BackupRestore.is_operation_running? end def ensure_we_have_a_user user = User.where(id: @user_id).first raise Discourse::InvalidParameters.new(:user_id) unless user # keep some user data around to check them against the newly restored database @user_info = { id: user.id, username: user.username, email: user.email } end def ensure_we_have_a_filename raise Import::FilenameMissingError if @filename.nil? end def initialize_state @success = false @current_db = RailsMultisite::ConnectionManagement.current_db @current_version = BackupRestore.current_version @timestamp = Time.now.strftime("%Y-%m-%d-%H%M%S") @tmp_directory = File.join(Rails.root, "tmp", "restores", @current_db, @timestamp) @archive_filename = File.join(@tmp_directory, @filename) @tar_filename = @archive_filename[0...-3] @meta_filename = File.join(@tmp_directory, BackupRestore::METADATA_FILE) @dump_filename = File.join(@tmp_directory, BackupRestore::DUMP_FILE) end def listen_for_shutdown_signal Thread.new do while BackupRestore.is_operation_running? exit if BackupRestore.should_shutdown? sleep 0.1 end end end def mark_import_as_running log "Marking restore as running..." BackupRestore.mark_as_running! end def enable_readonly_mode log "Enabling readonly mode..." Discourse.enable_readonly_mode end def pause_sidekiq log "Pausing sidekiq..." Sidekiq.pause! end def wait_for_sidekiq log "Waiting for sidekiq to finish running jobs..." iterations = 0 workers = Sidekiq::Workers.new while (running = workers.size) > 0 log " Waiting for #{running} jobs..." sleep 5 iterations += 1 raise "Sidekiq did not finish running all the jobs in the allowed time!" if iterations >= 20 end end def copy_archive_to_tmp_directory log "Copying archive to tmp directory..." source = File.join(Backup.base_directory, @filename) `cp #{source} #{@archive_filename}` end def unzip_archive log "Unzipping archive..." FileUtils.cd(@tmp_directory) { `gzip --decompress #{@archive_filename}` } end def extract_metadata log "Extracting metadata file..." FileUtils.cd(@tmp_directory) { `tar --extract --file #{@tar_filename} #{BackupRestore::METADATA_FILE}` } @metadata = Oj.load_file(@meta_filename) end def validate_metadata log "Validating metadata..." log " Current version: #{@current_version}" log " Restored version: #{@metadata["version"]}" error = "You're trying to import a more recent version of the schema. You should migrate first!" raise error if @metadata["version"] > @current_version end def extract_dump log "Extracting dump file..." FileUtils.cd(@tmp_directory) { `tar --extract --file #{@tar_filename} #{BackupRestore::DUMP_FILE}` } end def restore_dump log "Restoring dump file... (can be quite long)" psql_command = build_psql_command log "Running: #{psql_command}" logs = Queue.new psql_running = true has_error = false Thread.new do while psql_running message = logs.pop.strip has_error ||= (message =~ /ERROR:/) log(message) unless message.blank? end end IO.popen("#{psql_command} 2>&1") do |pipe| begin while line = pipe.readline logs << line end rescue EOFError # finished reading... ensure psql_running = false logs << "" end end # psql does not return a valid exit code when an error happens raise "psql failed" if has_error end def build_psql_command db_conf = BackupRestore.database_configuration password_argument = "PGPASSWORD=#{db_conf.password}" if db_conf.password.present? host_argument = "--host=#{db_conf.host}" if db_conf.host.present? [ password_argument, # pass the password to psql "psql", # the psql command "--dbname='#{db_conf.database}'", # connect to database *dbname* "--file='#{@dump_filename}'", # read the dump "--single-transaction", # all or nothing (also runs COPY commands faster) host_argument, # the hostname to connect to "--username=#{db_conf.username}" # the username to connect as ].join(" ") end def switch_schema! log "Switching schemas..." sql = [ "BEGIN;", BackupRestore.move_tables_between_schemas_sql("public", "backup"), BackupRestore.move_tables_between_schemas_sql("restore", "public"), "COMMIT;" ].join("\n") User.exec_sql(sql) end def migrate_database log "Migrating the database..." Discourse::Application.load_tasks ENV["VERSION"] = @current_version.to_s Rake::Task["db:migrate:up"].invoke end def reconnect_database log "Reconnecting to the database..." ActiveRecord::Base.establish_connection end def extract_uploads log "Extracting uploads..." if `tar --list --file #{@tar_filename} | grep 'uploads/'`.present? FileUtils.cd(File.join(Rails.root, "public")) do `tar --extract --keep-newer-files --file #{@tar_filename} uploads/` end end end def notify_user if user = User.where(email: @user_info[:email]).first log "Notifying '#{user.username}' of the success of the restore..." # NOTE: will only notify if user != Discourse.site_contact_user SystemMessage.create(user, :import_succeeded) else log "Could not send notification to '#{@user_info[:username]}' (#{@user_info[:email]}), because the user does not exists..." end end def rollback log "Trying to rollback..." if BackupRestore.can_rollback? log "Rolling back..." BackupRestore.move_tables_between_schemas("backup", "public") else log "There was no need to rollback" end end def clean_up log "Cleaning stuff up..." remove_tmp_directory unpause_sidekiq disable_readonly_mode mark_import_as_not_running log "Finished!" end def remove_tmp_directory log "Removing tmp '#{@tmp_directory}' directory..." FileUtils.rm_rf(@tmp_directory) if Dir[@tmp_directory].present? rescue log "Something went wrong while removing the following tmp directory: #{@tmp_directory}" end def unpause_sidekiq log "Unpausing sidekiq..." Sidekiq.unpause! end def disable_readonly_mode log "Disabling readonly mode..." Discourse.disable_readonly_mode end def mark_import_as_not_running log "Marking restore as finished..." BackupRestore.mark_as_not_running! end def ensure_directory_exists(directory) log "Making sure #{directory} exists..." FileUtils.mkdir_p(directory) end def log(message) puts(message) rescue nil publish_log(message) rescue nil end def publish_log(message) return unless @publish_to_message_bus data = { timestamp: Time.now, operation: "restore", message: message } MessageBus.publish(BackupRestore::LOGS_CHANNEL, data, user_ids: [@user_id]) end end end