discourse/lib/jobs/importer.rb

289 lines
9.4 KiB
Ruby

require_dependency 'import/json_decoder'
require_dependency 'import/import'
require_dependency 'import/adapter/base'
(Dir.entries(File.join( Rails.root, 'lib', 'import', 'adapter' )) - ['.', '..', 'base.rb']).each do |f|
require_dependency "import/adapter/#{f}"
end
module Jobs
class Importer < Jobs::Base
sidekiq_options :retry => false
BACKUP_SCHEMA = 'backup'
def initialize
@index_definitions = {}
@format = :json
@warnings = []
end
def execute(args)
ordered_models_for_import.each { |model| model.primary_key } # a HACK to workaround cache problems
raise Import::ImportDisabledError unless SiteSetting.allow_import?
raise Import::ImportInProgressError if Import::is_import_running?
raise Export::ExportInProgressError if Export::is_export_running?
# Disable printing of NOTICE, DETAIL and other unimportant messages from postgresql
User.exec_sql("SET client_min_messages TO WARNING")
@format = args[:format] || :json
@archive_filename = args[:filename]
if args[:user_id]
# After the import is done, we'll need to reload the user record and make sure it's the same person
# before sending a notification
user = User.where(id: args[:user_id].to_i).first
@user_info = { user_id: user.id, email: user.email }
else
@user_info = nil
end
start_import
backup_tables
begin
load_data
create_indexes
extract_uploads
rescue
log "Performing a ROLLBACK because something went wrong!"
rollback
raise
end
ensure
finish_import
end
def ordered_models_for_import
Export.models_included_in_export
end
def start_import
if @format != :json
raise Import::FormatInvalidError
elsif @archive_filename.nil?
raise Import::FilenameMissingError
else
extract_files
@decoder = Import::JsonDecoder.new( File.join(tmp_directory, 'tables.json') )
Import.set_import_started
Discourse.enable_maintenance_mode
end
self
end
def tmp_directory
@tmp_directory ||= begin
f = File.join( Rails.root, 'tmp', Time.now.strftime('import%Y%m%d%H%M%S') )
Dir.mkdir(f) unless Dir[f].present?
f
end
end
def extract_files
FileUtils.cd( tmp_directory ) do
`tar xvzf #{@archive_filename} tables.json`
end
end
def backup_tables
log " Backing up tables"
ActiveRecord::Base.transaction do
create_backup_schema
ordered_models_for_import.each do |model|
backup_and_setup_table( model )
end
end
self
end
def create_backup_schema
User.exec_sql("DROP SCHEMA IF EXISTS #{BACKUP_SCHEMA} CASCADE")
User.exec_sql("CREATE SCHEMA #{BACKUP_SCHEMA}")
self
end
def backup_and_setup_table( model )
log " #{model.table_name}"
@index_definitions[model.table_name] = model.exec_sql("SELECT indexdef FROM pg_indexes WHERE tablename = '#{model.table_name}' and schemaname = 'public';").map { |x| x['indexdef'] }
model.exec_sql("ALTER TABLE #{model.table_name} SET SCHEMA #{BACKUP_SCHEMA}")
model.exec_sql("CREATE TABLE #{model.table_name} (LIKE #{BACKUP_SCHEMA}.#{model.table_name} INCLUDING DEFAULTS INCLUDING CONSTRAINTS INCLUDING COMMENTS INCLUDING STORAGE);")
self
end
def load_data
log " Importing data"
@decoder.start(
callbacks: {
schema_info: method(:set_schema_info),
table_data: method(:load_table)
}
)
self
end
def set_schema_info(arg)
if arg[:source] and arg[:source].downcase == 'discourse'
if arg[:version] and arg[:version] <= Export.current_schema_version
@export_schema_version = arg[:version]
if arg[:table_count] == ordered_models_for_import.size
true
else
raise Import::WrongTableCountError.new("Expected to find #{ordered_models_for_import.size} tables, but export file has #{arg[:table_count]} tables!")
end
elsif arg[:version].nil?
raise ArgumentError.new("The schema version must be provided.")
else
raise Import::UnsupportedSchemaVersion.new("Export file is from a newer version of Discourse. Upgrade and run migrations to import this file.")
end
else
raise Import::UnsupportedExportSource
end
end
def load_table(table_name, fields_arg, row_data, row_count)
fields = fields_arg.dup
model = Export::models_included_in_export.find { |m| m.table_name == table_name }
if model
@adapters ||= Import.adapters_for_version( @export_schema_version )
log " #{table_name}: #{row_count} rows"
if @adapters[table_name]
@adapters[table_name].each do |adapter|
fields = adapter.apply_to_column_names(table_name, fields)
end
end
if fields.size > model.columns.size
raise Import::WrongFieldCountError.new("Table #{table_name} is expected to have #{model.columns.size} fields, but got #{fields.size}! Maybe your Discourse server is older than the server that this export file comes from?")
end
# If there are fewer fields in the data than the model has, then insert only those fields and
# hope that the table uses default values or allows null for the missing columns.
# If the table doesn't have defaults or is not nullable, then a migration adapter should have been created
# along with the migration.
column_info = model.columns
col_num = -1
rows = row_data.map do |row|
if @adapters[table_name]
@adapters[table_name].each do |adapter|
row = adapter.apply_to_row(table_name, row)
end
end
row
end.transpose.map do |col_values|
col_num += 1
case column_info[col_num].type
when :boolean
col_values.map { |v| v.nil? ? nil : (v == 'f' ? false : true) }
else
col_values
end
end.transpose
parameter_markers = fields.map {|x| "?"}.join(',')
sql_stmt = "INSERT INTO #{table_name} (#{fields.join(',')}) VALUES (#{parameter_markers})"
rows.each do |row|
User.exec_sql(sql_stmt, *row)
end
true
else
add_warning "Export file contained an unrecognized table named: #{table_name}! It was ignored."
end
end
def create_indexes
log " Creating indexes"
ordered_models_for_import.each do |model|
log " #{model.table_name}"
@index_definitions[model.table_name].each do |indexdef|
model.exec_sql( indexdef )
end
# The indexdef statements don't create the primary keys, so we need to find the primary key and do it ourselves.
pkey_index_def = @index_definitions[model.table_name].find { |ixdef| ixdef =~ / ([\S]{1,}_pkey) / }
if pkey_index_def and pkey_index_name = / ([\S]{1,}_pkey) /.match(pkey_index_def)[1]
model.exec_sql( "ALTER TABLE ONLY #{model.table_name} ADD PRIMARY KEY USING INDEX #{pkey_index_name}" )
end
if model.columns.map(&:name).include?('id')
max_id = model.exec_sql("SELECT MAX(id) AS max FROM #{model.table_name}")[0]['max'].to_i + 1
seq_name = "#{model.table_name}_id_seq"
model.exec_sql("CREATE SEQUENCE #{seq_name} START WITH #{max_id} INCREMENT BY 1 NO MINVALUE NO MAXVALUE CACHE 1")
model.exec_sql("ALTER SEQUENCE #{seq_name} OWNED BY #{model.table_name}.id")
model.exec_sql("ALTER TABLE #{model.table_name} ALTER COLUMN id SET DEFAULT nextval('#{seq_name}')")
end
end
self
end
def extract_uploads
if `tar tf #{@archive_filename} | grep "uploads/"`.present?
FileUtils.cd( File.join(Rails.root, 'public') ) do
`tar -xz --keep-newer-files -f #{@archive_filename} uploads/`
end
end
end
def rollback
ordered_models_for_import.each do |model|
log " #{model.table_name}"
model.exec_sql("DROP TABLE IF EXISTS #{model.table_name}") rescue nil
begin
model.exec_sql("ALTER TABLE #{BACKUP_SCHEMA}.#{model.table_name} SET SCHEMA public")
rescue => e
log " Failed to restore. #{e.class.name}: #{e.message}"
end
end
end
def finish_import
Import.set_import_is_not_running
Discourse.disable_maintenance_mode
FileUtils.rm_rf(tmp_directory) if Dir[tmp_directory].present?
if @warnings.size > 0
log "WARNINGS:"
@warnings.each do |message|
log " #{message}"
end
end
# send_notification
end
def send_notification
# Doesn't work. "WARNING: Can't mass-assign protected attributes: created_at"
# Still a problem with the activerecord schema_cache I think.
# if @user_info and @user_info[:user_id]
# user = User.where(id: @user_info[:user_id]).first
# if user and user.email == @user_info[:email]
# SystemMessage.new(user).create('import_succeeded')
# end
# end
true
end
def add_warning(message)
@warnings << message
end
def log(*args)
puts args
args.each do |arg|
Rails.logger.info "#{Time.now.to_formatted_s(:db)}: [IMPORTER] #{arg}"
end
true
end
end
end