diff --git a/Gemfile b/Gemfile index 906b0f3aa23..a3222a8b26c 100644 --- a/Gemfile +++ b/Gemfile @@ -190,4 +190,5 @@ gem 'sassc', require: false if ENV["IMPORT"] == "1" gem 'mysql2' gem 'redcarpet' + gem 'sqlite3', '~> 1.3.13' end diff --git a/lib/email/receiver.rb b/lib/email/receiver.rb index eba4409dd09..75fb3e9bfc7 100644 --- a/lib/email/receiver.rb +++ b/lib/email/receiver.rb @@ -28,6 +28,9 @@ module Email class InvalidPostAction < ProcessingError; end attr_reader :incoming_email + attr_reader :raw_email + attr_reader :mail + attr_reader :message_id def initialize(mail_string) raise EmptyEmailError if mail_string.blank? @@ -241,7 +244,7 @@ module Email def try_to_encode(string, encoding) encoded = string.encode("UTF-8", encoding) - encoded.present? && encoded.valid_encoding? ? encoded : nil + !encoded.nil? && encoded.valid_encoding? ? encoded : nil rescue Encoding::InvalidByteSequenceError, Encoding::UndefinedConversionError, Encoding::ConverterNotFoundError @@ -622,10 +625,7 @@ module Email # only add elided part in messages if options[:elided].present? && (SiteSetting.always_show_trimmed_content || is_private_message) - options[:raw] << "\n\n" << "
" << "\n" - options[:raw] << "···" << "\n" - options[:raw] << options[:elided] << "\n" - options[:raw] << "
" << "\n" + options[:raw] << Email::Receiver.elided_html(options[:elided]) end user = options.delete(:user) @@ -643,6 +643,14 @@ module Email result.post end + def self.elided_html(elided) + html = "\n\n" << "
" << "\n" + html << "···" << "\n" + html << elided << "\n" + html << "
" << "\n" + html + end + def add_other_addresses(topic, sender) %i(to cc bcc).each do |d| if @mail[d] && @mail[d].address_list && @mail[d].address_list.addresses diff --git a/script/import_scripts/mbox-experimental.rb b/script/import_scripts/mbox-experimental.rb new file mode 100644 index 00000000000..065b0111bd8 --- /dev/null +++ b/script/import_scripts/mbox-experimental.rb @@ -0,0 +1,17 @@ +if ARGV.length != 1 || !File.exists?(ARGV[0]) + STDERR.puts '', 'Usage of mbox importer:', 'bundle exec ruby mbox-experimental.rb ' + STDERR.puts '', "Use the settings file from #{File.expand_path('mbox/settings.yml', File.dirname(__FILE__))} as an example." + exit 1 +end + +module ImportScripts + module Mbox + require_relative 'mbox/support/settings' + + @settings = Settings.load(ARGV[0]) + + require_relative 'mbox/importer' + Importer.new(@settings).perform + end +end + diff --git a/script/import_scripts/mbox/importer.rb b/script/import_scripts/mbox/importer.rb new file mode 100644 index 00000000000..762c162e823 --- /dev/null +++ b/script/import_scripts/mbox/importer.rb @@ -0,0 +1,161 @@ +require_relative '../base' +require_relative 'support/database' +require_relative 'support/indexer' +require_relative 'support/settings' + +module ImportScripts::Mbox + class Importer < ImportScripts::Base + # @param settings [ImportScripts::Mbox::Settings] + def initialize(settings) + @settings = settings + super() + + @database = Database.new(@settings.data_dir, @settings.batch_size) + end + + def change_site_settings + super + + SiteSetting.enable_staged_users = true + end + + protected + + def execute + index_messages + import_categories + import_users + import_posts + end + + def index_messages + puts '', 'creating index' + indexer = Indexer.new(@database, @settings) + indexer.execute + end + + def import_categories + puts '', 'creating categories' + rows = @database.fetch_categories + + create_categories(rows) do |row| + { + id: row['name'], + name: row['name'] + } + end + end + + def import_users + puts '', 'creating users' + total_count = @database.count_users + last_email = '' + + batches do |offset| + rows, last_email = @database.fetch_users(last_email) + break if rows.empty? + + next if all_records_exist?(:users, rows.map { |row| row['email'] }) + + create_users(rows, total: total_count, offset: offset) do |row| + { + id: row['email'], + email: row['email'], + name: row['name'], + trust_level: @settings.trust_level, + staged: true, + created_at: to_time(row['date_of_first_message']) + } + end + end + end + + def batches + super(@settings.batch_size) + end + + def import_posts + puts '', 'creating topics and posts' + total_count = @database.count_messages + last_row_id = 0 + + batches do |offset| + rows, last_row_id = @database.fetch_messages(last_row_id) + break if rows.empty? + + next if all_records_exist?(:posts, rows.map { |row| row['msg_id'] }) + + create_posts(rows, total: total_count, offset: offset) do |row| + if row['in_reply_to'].blank? + map_first_post(row) + else + map_reply(row) + end + end + end + end + + def map_post(row) + user_id = user_id_from_imported_user_id(row['from_email']) || Discourse::SYSTEM_USER_ID + body = row['body'] || '' + body << map_attachments(row['raw_message'], user_id) if row['attachment_count'].positive? + body << Email::Receiver.elided_html(row['elided']) if row['elided'].present? + + { + id: row['msg_id'], + user_id: user_id, + created_at: to_time(row['email_date']), + raw: body, + raw_email: row['raw_message'], + via_email: true, + # cook_method: Post.cook_methods[:email] # this is slowing down the import by factor 4 + } + end + + def map_first_post(row) + mapped = map_post(row) + mapped[:category] = category_id_from_imported_category_id(row['category']) + mapped[:title] = row['subject'].strip[0...255] + mapped + end + + def map_reply(row) + parent = @lookup.topic_lookup_from_imported_post_id(row['in_reply_to']) + + if parent.blank? + puts "Parent message #{row['in_reply_to']} doesn't exist. Skipping #{row['msg_id']}: #{row['subject'][0..40]}" + return nil + end + + mapped = map_post(row) + mapped[:topic_id] = parent[:topic_id] + mapped + end + + def map_attachments(raw_message, user_id) + receiver = Email::Receiver.new(raw_message) + attachment_markdown = '' + + receiver.attachments.each do |attachment| + tmp = Tempfile.new(['discourse-email-attachment', File.extname(attachment.filename)]) + + begin + File.open(tmp.path, 'w+b') { |f| f.write attachment.body.decoded } + upload = UploadCreator.new(tmp, attachment.filename).create_for(user_id) + + if upload && upload.errors.empty? + attachment_markdown << "\n\n#{receiver.attachment_markdown(upload)}\n\n" + end + ensure + tmp.try(:close!) + end + end + + attachment_markdown + end + + def to_time(datetime) + Time.zone.at(DateTime.iso8601(datetime)) if datetime + end + end +end diff --git a/script/import_scripts/mbox/settings.yml b/script/import_scripts/mbox/settings.yml new file mode 100644 index 00000000000..0a193aa1ed0 --- /dev/null +++ b/script/import_scripts/mbox/settings.yml @@ -0,0 +1,9 @@ +# PostgreSQL mailing lists +#data_dir: /data/import/postgres +#split_regex: "^From .*@postgresql.org.*" + +# ruby-talk mailing list +data_dir: /data/import/ruby-talk/news/gmane/comp/lang/ruby +split_regex: "" + +default_trust_level: 1 diff --git a/script/import_scripts/mbox/support/database.rb b/script/import_scripts/mbox/support/database.rb new file mode 100644 index 00000000000..396d23e5b23 --- /dev/null +++ b/script/import_scripts/mbox/support/database.rb @@ -0,0 +1,261 @@ +require 'sqlite3' + +module ImportScripts::Mbox + class Database + SCHEMA_VERSION = 1 + + def initialize(directory, batch_size) + @db = SQLite3::Database.new("#{directory}/index.db", results_as_hash: true) + @batch_size = batch_size + + configure_database + upgrade_schema_version + create_table_for_categories + create_table_for_imported_files + create_table_for_emails + create_table_for_replies + create_table_for_users + end + + def insert_category(category) + @db.execute(<<-SQL, category) + INSERT OR REPLACE INTO category (name, description) + VALUES (:name, :description) + SQL + end + + def insert_imported_file(imported_file) + @db.execute(<<-SQL, imported_file) + INSERT OR REPLACE INTO imported_file (category, filename, checksum) + VALUES (:category, :filename, :checksum) + SQL + end + + def insert_email(email) + @db.execute(<<-SQL, email) + INSERT OR REPLACE INTO email (msg_id, from_email, from_name, subject, + email_date, raw_message, body, elided, attachment_count, charset, + category, filename, first_line_number, last_line_number) + VALUES (:msg_id, :from_email, :from_name, :subject, + :email_date, :raw_message, :body, :elided, :attachment_count, :charset, + :category, :filename, :first_line_number, :last_line_number) + SQL + end + + def insert_replies(msg_id, reply_message_ids) + sql = <<-SQL + INSERT OR REPLACE INTO reply (msg_id, in_reply_to) + VALUES (:msg_id, :in_reply_to) + SQL + + @db.prepare(sql) do |stmt| + reply_message_ids.each do |in_reply_to| + stmt.execute(msg_id, in_reply_to) + end + end + end + + def update_in_reply_to_of_emails + @db.execute <<-SQL + UPDATE email + SET in_reply_to = ( + SELECT e.msg_id + FROM reply r + JOIN email e ON (r.in_reply_to = e.msg_id) + WHERE r.msg_id = email.msg_id + ORDER BY e.email_date DESC + LIMIT 1 + ) + SQL + end + + def sort_emails + @db.execute 'DELETE FROM email_order' + + @db.execute <<-SQL + WITH RECURSIVE + messages(msg_id, level, email_date) AS ( + SELECT msg_id, 0 AS level, email_date + FROM email + WHERE in_reply_to IS NULL + UNION ALL + SELECT e.msg_id, m.level + 1, e.email_date + FROM email e + JOIN messages m ON e.in_reply_to = m.msg_id + ORDER BY level, email_date, msg_id + ) + INSERT INTO email_order (msg_id) + SELECT msg_id + FROM messages + SQL + end + + def fill_users_from_emails + @db.execute 'DELETE FROM user' + + @db.execute <<-SQL + INSERT INTO user (email, name, date_of_first_message) + SELECT from_email, MIN(from_name) AS from_name, MIN(email_date) + FROM email + WHERE from_email IS NOT NULL + GROUP BY from_email + ORDER BY from_email + SQL + end + + def fetch_imported_files(category) + @db.execute(<<-SQL, category) + SELECT filename, checksum + FROM imported_file + WHERE category = :category + SQL + end + + def fetch_categories + @db.execute <<-SQL + SELECT name, description + FROM category + ORDER BY name + SQL + end + + def count_users + @db.get_first_value <<-SQL + SELECT COUNT(*) + FROM user + SQL + end + + def fetch_users(last_email) + rows = @db.execute(<<-SQL, last_email) + SELECT email, name, date_of_first_message + FROM user + WHERE email > :last_email + LIMIT #{@batch_size} + SQL + + add_last_column_value(rows, 'email') + end + + def count_messages + @db.get_first_value <<-SQL + SELECT COUNT(*) + FROM email + WHERE email_date IS NOT NULL + SQL + end + + def fetch_messages(last_row_id) + rows = @db.execute(<<-SQL, last_row_id) + SELECT o.ROWID, e.msg_id, from_email, subject, email_date, in_reply_to, + raw_message, body, elided, attachment_count, category + FROM email e + JOIN email_order o USING (msg_id) + WHERE email_date IS NOT NULL AND + o.ROWID > :last_row_id + ORDER BY o.ROWID + LIMIT #{@batch_size} + SQL + + add_last_column_value(rows, 'rowid') + end + + private + + def configure_database + @db.execute 'PRAGMA journal_mode = TRUNCATE' + end + + def upgrade_schema_version + # current_version = query("PRAGMA user_version").last[0] + @db.execute "PRAGMA user_version = #{SCHEMA_VERSION}" + end + + def create_table_for_categories + @db.execute <<-SQL + CREATE TABLE IF NOT EXISTS category ( + name TEXT NOT NULL PRIMARY KEY, + description TEXT + ) + SQL + end + + def create_table_for_imported_files + @db.execute <<-SQL + CREATE TABLE IF NOT EXISTS imported_file ( + category TEXT NOT NULL, + filename TEXT NOT NULL, + checksum TEXT NOT NULL, + PRIMARY KEY (category, filename), + FOREIGN KEY(category) REFERENCES category(name) + ) + SQL + end + + def create_table_for_emails + @db.execute <<-SQL + CREATE TABLE IF NOT EXISTS email ( + msg_id TEXT NOT NULL PRIMARY KEY, + from_email TEXT, + from_name TEXT, + subject TEXT, + in_reply_to TEXT, + email_date DATETIME, + raw_message TEXT, + body TEXT, + elided TEXT, + attachment_count INTEGER NOT NULL DEFAULT 0, + charset TEXT, + category TEXT NOT NULL, + filename TEXT NOT NULL, + first_line_number INTEGER, + last_line_number INTEGER, + FOREIGN KEY(category) REFERENCES category(name) + ) + SQL + + @db.execute 'CREATE INDEX IF NOT EXISTS email_by_from ON email (from_email)' + @db.execute 'CREATE INDEX IF NOT EXISTS email_by_in_reply_to ON email (in_reply_to)' + @db.execute 'CREATE INDEX IF NOT EXISTS email_by_date ON email (email_date)' + + @db.execute <<-SQL + CREATE TABLE IF NOT EXISTS email_order ( + msg_id TEXT NOT NULL PRIMARY KEY + ) + SQL + end + + def create_table_for_replies + @db.execute <<-SQL + CREATE TABLE IF NOT EXISTS reply ( + msg_id TEXT NOT NULL, + in_reply_to TEXT NOT NULL, + PRIMARY KEY (msg_id, in_reply_to), + FOREIGN KEY(msg_id) REFERENCES email(msg_id) + ) + SQL + + @db.execute 'CREATE INDEX IF NOT EXISTS reply_by_in_reply_to ON reply (in_reply_to)' + end + + def create_table_for_users + @db.execute <<-SQL + CREATE TABLE IF NOT EXISTS user ( + email TEXT NOT NULL PRIMARY KEY, + name TEXT, + date_of_first_message DATETIME NOT NULL + ) + SQL + end + + def add_last_column_value(rows, *last_columns) + return rows if last_columns.empty? + + result = [rows] + last_row = rows.last + + last_columns.each { |column| result.push(last_row ? last_row[column] : nil) } + result + end + end +end diff --git a/script/import_scripts/mbox/support/indexer.rb b/script/import_scripts/mbox/support/indexer.rb new file mode 100644 index 00000000000..4884365a062 --- /dev/null +++ b/script/import_scripts/mbox/support/indexer.rb @@ -0,0 +1,226 @@ +require_relative 'database' +require 'json' +require 'yaml' + +module ImportScripts::Mbox + class Indexer + # @param database [ImportScripts::Mbox::Database] + # @param settings [ImportScripts::Mbox::Settings] + def initialize(database, settings) + @database = database + @root_directory = settings.data_dir + @split_regex = settings.split_regex + end + + def execute + directories = Dir.glob(File.join(@root_directory, '*')) + directories.select! { |f| File.directory?(f) } + directories.sort! + + directories.each do |directory| + puts "indexing files in #{directory}" + category = index_category(directory) + index_emails(directory, category[:name]) + end + + puts '', 'indexing replies and users' + @database.update_in_reply_to_of_emails + @database.sort_emails + @database.fill_users_from_emails + end + + private + + METADATA_FILENAME = 'metadata.yml'.freeze + + def index_category(directory) + metadata_file = File.join(directory, METADATA_FILENAME) + + if File.exist?(metadata_file) + # workaround for YML files that contain classname in file header + yaml = File.read(metadata_file).sub(/^--- !.*$/, '---') + metadata = YAML.load(yaml) + else + metadata = {} + end + + category = { + name: metadata['name'].presence || File.basename(directory), + description: metadata['description'] + } + + @database.insert_category(category) + category + end + + def index_emails(directory, category_name) + all_messages(directory, category_name) do |receiver, filename, first_line_number, last_line_number| + msg_id = receiver.message_id + parsed_email = receiver.mail + from_email, from_display_name = receiver.parse_from_field(parsed_email) + body, elided = receiver.select_body + reply_message_ids = extract_reply_message_ids(parsed_email) + + email = { + msg_id: msg_id, + from_email: from_email, + from_name: from_display_name, + subject: extract_subject(receiver, category_name), + email_date: parsed_email.date&.to_s, + raw_message: receiver.raw_email, + body: body, + elided: elided, + attachment_count: receiver.attachments.count, + charset: parsed_email.charset&.downcase, + category: category_name, + filename: File.basename(filename), + first_line_number: first_line_number, + last_line_number: last_line_number + } + + @database.insert_email(email) + @database.insert_replies(msg_id, reply_message_ids) unless reply_message_ids.empty? + end + end + + def imported_file_checksums(category_name) + rows = @database.fetch_imported_files(category_name) + rows.each_with_object({}) do |row, hash| + hash[row['filename']] = row['checksum'] + end + end + + def all_messages(directory, category_name) + checksums = imported_file_checksums(category_name) + + Dir.foreach(directory) do |filename| + filename = File.join(directory, filename) + next if ignored_file?(filename, checksums) + + puts "indexing #{filename}" + + if @split_regex.present? + each_mail(filename) do |raw_message, first_line_number, last_line_number| + yield read_mail_from_string(raw_message), filename, first_line_number, last_line_number + end + else + yield read_mail_from_file(filename), filename + end + + mark_as_fully_indexed(category_name, filename) + end + end + + def mark_as_fully_indexed(category_name, filename) + imported_file = { + category: category_name, + filename: filename, + checksum: calc_checksum(filename) + } + + @database.insert_imported_file(imported_file) + end + + def each_mail(filename) + raw_message = '' + first_line_number = 1 + last_line_number = 0 + + each_line(filename) do |line| + line = line.scrub + + if line =~ @split_regex && last_line_number.positive? + yield raw_message, first_line_number, last_line_number + raw_message = '' + first_line_number = last_line_number + 1 + else + raw_message << line + end + + last_line_number += 1 + end + + yield raw_message, first_line_number, last_line_number if raw_message.present? + end + + def each_line(filename) + raw_file = File.open(filename, 'r') + text_file = filename.end_with?('.gz') ? Zlib::GzipReader.new(raw_file) : raw_file + + text_file.each_line do |line| + yield line + end + ensure + raw_file.close if raw_file + end + + def read_mail_from_file(filename) + raw_message = File.read(filename) + read_mail_from_string(raw_message) + end + + def read_mail_from_string(raw_message) + Email::Receiver.new(raw_message) + end + + def extract_reply_message_ids(mail) + message_ids = [mail.in_reply_to, Email::Receiver.extract_references(mail.references)] + message_ids.flatten! + message_ids.select!(&:present?) + message_ids.uniq! + message_ids.first(20) + end + + def extract_subject(receiver, list_name) + subject = receiver.subject + return nil if subject.blank? + + # TODO: make the list name (or maybe multiple names) configurable + # Strip mailing list name from subject + subject = subject.gsub(/\[#{Regexp.escape(list_name)}\]/, '').strip + + clean_subject(subject) + end + + # TODO: refactor and move prefixes to settings + def clean_subject(subject) + original_length = subject.length + + # Strip Reply prefix from title (Standard and localized) + subject = subject.gsub(/^Re: */i, '') + subject = subject.gsub(/^R: */i, '') #Italian + subject = subject.gsub(/^RIF: */i, '') #Italian + + # Strip Forward prefix from title (Standard and localized) + subject = subject.gsub(/^Fwd: */i, '') + subject = subject.gsub(/^I: */i, '') #Italian + + subject.strip + + # In case of mixed localized prefixes there could be many of them + # if the mail client didn't strip the localized ones + if original_length > subject.length + clean_subject(subject) + else + subject + end + end + + def ignored_file?(filename, checksums) + File.directory?(filename) || metadata_file?(filename) || fully_indexed?(filename, checksums) + end + + def metadata_file?(filename) + File.basename(filename) == METADATA_FILENAME + end + + def fully_indexed?(filename, checksums) + checksum = checksums[filename] + checksum.present? && calc_checksum(filename) == checksum + end + + def calc_checksum(filename) + Digest::SHA256.file(filename).hexdigest + end + end +end diff --git a/script/import_scripts/mbox/support/settings.rb b/script/import_scripts/mbox/support/settings.rb new file mode 100644 index 00000000000..79fb5ab56b6 --- /dev/null +++ b/script/import_scripts/mbox/support/settings.rb @@ -0,0 +1,22 @@ +require 'yaml' + +module ImportScripts::Mbox + class Settings + def self.load(filename) + yaml = YAML.load_file(filename) + Settings.new(yaml) + end + + attr_reader :data_dir + attr_reader :split_regex + attr_reader :batch_size + attr_reader :trust_level + + def initialize(yaml) + @data_dir = yaml['data_dir'] + @split_regex = Regexp.new(yaml['split_regex']) unless yaml['split_regex'].empty? + @batch_size = 1000 # no need to make this actually configurable at the moment + @trust_level = yaml['default_trust_level'] + end + end +end