Fixes for vBulletin bulk importer (#17618)
* Allow taking table prefix from env var * FIX: remove unused column references The columns `filedata` and `extension` are not present in a v4.2.4 database, and they aren't used in the method anyways. * FIX: report progress for tables without imported_id * FIX: effectively check for AR validation errors NOTE: other migration scripts also have this problem; see /t/58202 * FIX: properly count Posts when importing attachments * FIX: improve logging * Remove leftover comment * FIX: show progress when exporting Permalink file * PERF: stream Permalink file The current way results in tons of memory usage; write once per line instead * Document fixes needed * WIP - deduplicate category names * Ignore non alphanumeric chars for grouping * FIX: properly deduplicate user emails by merging accounts * FIX: don't merge empty UserEmails * Improve logging * Merge users AFTER fixing primary key sequences * Parallelize user merging * Save duplicated users structure for debugging purposes * Add progress logging for the (multiple hour) user merging step
This commit is contained in:
parent
a3abbe07db
commit
bfecbde837
2
Gemfile
2
Gemfile
|
@ -254,6 +254,8 @@ if ENV["IMPORT"] == "1"
|
|||
gem 'reverse_markdown'
|
||||
gem 'tiny_tds'
|
||||
gem 'csv'
|
||||
|
||||
gem 'parallel', require: false
|
||||
end
|
||||
|
||||
gem 'webpush', require: false
|
||||
|
|
|
@ -99,6 +99,7 @@ class BulkImport::Base
|
|||
load_indexes
|
||||
execute
|
||||
fix_primary_keys
|
||||
execute_after
|
||||
puts "Done! Now run the 'import:ensure_consistency' rake task."
|
||||
end
|
||||
|
||||
|
@ -227,6 +228,9 @@ class BulkImport::Base
|
|||
raise NotImplementedError
|
||||
end
|
||||
|
||||
def execute_after
|
||||
end
|
||||
|
||||
def fix_primary_keys
|
||||
puts "Updating primary key sequences..."
|
||||
@raw_connection.exec("SELECT setval('#{Group.sequence_name}', #{@last_group_id})") if @last_group_id > 0
|
||||
|
@ -713,6 +717,7 @@ class BulkImport::Base
|
|||
imported_ids = []
|
||||
process_method_name = "process_#{name}"
|
||||
sql = "COPY #{name.pluralize} (#{columns.map { |c| "\"#{c}\"" }.join(",")}) FROM STDIN"
|
||||
rows_created = 0
|
||||
|
||||
@raw_connection.copy_data(sql, @encoder) do
|
||||
rows.each do |row|
|
||||
|
@ -722,7 +727,8 @@ class BulkImport::Base
|
|||
imported_ids << mapped[:imported_id] unless mapped[:imported_id].nil?
|
||||
imported_ids |= mapped[:imported_ids] unless mapped[:imported_ids].nil?
|
||||
@raw_connection.put_copy_data columns.map { |c| processed[c] } unless processed[:skip]
|
||||
print "\r%7d - %6d/sec" % [imported_ids.size, imported_ids.size.to_f / (Time.now - start)] if imported_ids.size % 5000 == 0
|
||||
rows_created += 1
|
||||
print "\r%7d - %6d/sec" % [rows_created, rows_created.to_f / (Time.now - start)] if rows_created % 100 == 0
|
||||
rescue => e
|
||||
puts "\n"
|
||||
puts "ERROR: #{e.message}"
|
||||
|
@ -731,10 +737,7 @@ class BulkImport::Base
|
|||
end
|
||||
end
|
||||
|
||||
if imported_ids.size > 0
|
||||
print "\r%7d - %6d/sec" % [imported_ids.size, imported_ids.size.to_f / (Time.now - start)]
|
||||
puts
|
||||
end
|
||||
print "\r%7d - %6d/sec\n" % [rows_created, rows_created.to_f / (Time.now - start)] if rows_created > 0
|
||||
|
||||
id_mapping_method_name = "#{name}_id_from_imported_id".freeze
|
||||
return unless respond_to?(id_mapping_method_name)
|
||||
|
@ -745,6 +748,7 @@ class BulkImport::Base
|
|||
}
|
||||
end
|
||||
rescue => e
|
||||
# FIXME: errors catched here stop the rest of the COPY
|
||||
puts e.message
|
||||
puts e.backtrace.join("\n")
|
||||
end
|
||||
|
|
|
@ -4,10 +4,11 @@ require_relative "base"
|
|||
require "set"
|
||||
require "mysql2"
|
||||
require "htmlentities"
|
||||
require "parallel"
|
||||
|
||||
class BulkImport::VBulletin < BulkImport::Base
|
||||
|
||||
TABLE_PREFIX = "vb_"
|
||||
TABLE_PREFIX ||= ENV['TABLE_PREFIX'] || "vb_"
|
||||
SUSPENDED_TILL ||= Date.new(3000, 1, 1)
|
||||
ATTACHMENT_DIR ||= ENV['ATTACHMENT_DIR'] || '/shared/import/data/attachments'
|
||||
AVATAR_DIR ||= ENV['AVATAR_DIR'] || '/shared/import/data/customavatars'
|
||||
|
@ -43,6 +44,8 @@ class BulkImport::VBulletin < BulkImport::Base
|
|||
AND `COLUMN_NAME` LIKE 'post_thanks_%'
|
||||
SQL
|
||||
).to_a.count > 0
|
||||
|
||||
@user_ids_by_email = {}
|
||||
end
|
||||
|
||||
def execute
|
||||
|
@ -82,8 +85,17 @@ class BulkImport::VBulletin < BulkImport::Base
|
|||
import_signatures
|
||||
end
|
||||
|
||||
def execute_after
|
||||
max_age = SiteSetting.delete_user_max_post_age
|
||||
SiteSetting.delete_user_max_post_age = 50 * 365
|
||||
|
||||
merge_duplicated_users
|
||||
|
||||
SiteSetting.delete_user_max_post_age = max_age
|
||||
end
|
||||
|
||||
def import_groups
|
||||
puts "Importing groups..."
|
||||
puts '', "Importing groups..."
|
||||
|
||||
groups = mysql_stream <<-SQL
|
||||
SELECT usergroupid, title, description, usertitle
|
||||
|
@ -103,7 +115,7 @@ class BulkImport::VBulletin < BulkImport::Base
|
|||
end
|
||||
|
||||
def import_users
|
||||
puts "Importing users..."
|
||||
puts '', "Importing users..."
|
||||
|
||||
users = mysql_stream <<-SQL
|
||||
SELECT u.userid, username, email, joindate, birthday, ipaddress, u.usergroupid, bandate, liftdate
|
||||
|
@ -133,7 +145,7 @@ class BulkImport::VBulletin < BulkImport::Base
|
|||
end
|
||||
|
||||
def import_user_emails
|
||||
puts "Importing user emails..."
|
||||
puts '', "Importing user emails..."
|
||||
|
||||
users = mysql_stream <<-SQL
|
||||
SELECT u.userid, email, joindate
|
||||
|
@ -143,17 +155,31 @@ class BulkImport::VBulletin < BulkImport::Base
|
|||
SQL
|
||||
|
||||
create_user_emails(users) do |row|
|
||||
user_id, email = row[0 .. 1]
|
||||
|
||||
@user_ids_by_email[email.downcase] ||= []
|
||||
user_ids = @user_ids_by_email[email.downcase] << user_id
|
||||
|
||||
if user_ids.count > 1
|
||||
# fudge email to avoid conflicts; accounts from the 2nd and on will later be merged back into the first
|
||||
# NOTE: gsub! is used to avoid creating a new (frozen) string
|
||||
email.gsub!(/^/, SecureRandom.hex)
|
||||
end
|
||||
|
||||
{
|
||||
imported_id: row[0],
|
||||
imported_user_id: row[0],
|
||||
email: row[1],
|
||||
imported_id: user_id,
|
||||
imported_user_id: user_id,
|
||||
email: email,
|
||||
created_at: Time.zone.at(row[2])
|
||||
}
|
||||
end
|
||||
|
||||
# for debugging purposes; not used operationally
|
||||
save_duplicated_users
|
||||
end
|
||||
|
||||
def import_user_stats
|
||||
puts "Importing user stats..."
|
||||
puts '', "Importing user stats..."
|
||||
|
||||
users = mysql_stream <<-SQL
|
||||
SELECT u.userid, joindate, posts, COUNT(t.threadid) AS threads, p.dateline
|
||||
|
@ -186,7 +212,7 @@ class BulkImport::VBulletin < BulkImport::Base
|
|||
end
|
||||
|
||||
def import_group_users
|
||||
puts "Importing group users..."
|
||||
puts '', "Importing group users..."
|
||||
|
||||
group_users = mysql_stream <<-SQL
|
||||
SELECT usergroupid, userid
|
||||
|
@ -203,7 +229,7 @@ class BulkImport::VBulletin < BulkImport::Base
|
|||
end
|
||||
|
||||
def import_user_passwords
|
||||
puts "Importing user passwords..."
|
||||
puts '', "Importing user passwords..."
|
||||
|
||||
user_passwords = mysql_stream <<-SQL
|
||||
SELECT userid, password
|
||||
|
@ -221,7 +247,7 @@ class BulkImport::VBulletin < BulkImport::Base
|
|||
end
|
||||
|
||||
def import_user_salts
|
||||
puts "Importing user salts..."
|
||||
puts '', "Importing user salts..."
|
||||
|
||||
user_salts = mysql_stream <<-SQL
|
||||
SELECT userid, salt
|
||||
|
@ -240,7 +266,7 @@ class BulkImport::VBulletin < BulkImport::Base
|
|||
end
|
||||
|
||||
def import_user_profiles
|
||||
puts "Importing user profiles..."
|
||||
puts '', "Importing user profiles..."
|
||||
|
||||
user_profiles = mysql_stream <<-SQL
|
||||
SELECT userid, homepage, profilevisits
|
||||
|
@ -259,14 +285,32 @@ class BulkImport::VBulletin < BulkImport::Base
|
|||
end
|
||||
|
||||
def import_categories
|
||||
puts "Importing categories..."
|
||||
puts '', "Importing categories..."
|
||||
|
||||
categories = mysql_query(<<-SQL
|
||||
SELECT forumid, parentid, title, description, displayorder
|
||||
FROM #{TABLE_PREFIX}forum
|
||||
WHERE forumid > #{@last_imported_category_id}
|
||||
ORDER BY forumid
|
||||
SQL
|
||||
select
|
||||
forumid,
|
||||
parentid,
|
||||
case
|
||||
when forumid in (
|
||||
select distinct forumid from (
|
||||
select forumid, title, count(title)
|
||||
from forum
|
||||
group by replace(replace(title, ':', ''), '&', '')
|
||||
having count(title) > 1
|
||||
) as duplicated_forum_ids
|
||||
)
|
||||
then
|
||||
-- deduplicate by fudging the title; categories will needed to be manually merged later
|
||||
concat(title, '_DUPLICATE_', forumid)
|
||||
else
|
||||
title
|
||||
end as title,
|
||||
description,
|
||||
displayorder
|
||||
from forum
|
||||
order by forumid
|
||||
SQL
|
||||
).to_a
|
||||
|
||||
return if categories.empty?
|
||||
|
@ -283,7 +327,7 @@ class BulkImport::VBulletin < BulkImport::Base
|
|||
end
|
||||
end
|
||||
|
||||
puts "Importing parent categories..."
|
||||
puts '', "Importing parent categories..."
|
||||
create_categories(parent_categories) do |row|
|
||||
{
|
||||
imported_id: row[0],
|
||||
|
@ -293,7 +337,7 @@ class BulkImport::VBulletin < BulkImport::Base
|
|||
}
|
||||
end
|
||||
|
||||
puts "Importing children categories..."
|
||||
puts '', "Importing children categories..."
|
||||
create_categories(children_categories) do |row|
|
||||
{
|
||||
imported_id: row[0],
|
||||
|
@ -306,7 +350,7 @@ class BulkImport::VBulletin < BulkImport::Base
|
|||
end
|
||||
|
||||
def import_topics
|
||||
puts "Importing topics..."
|
||||
puts '', "Importing topics..."
|
||||
|
||||
topics = mysql_stream <<-SQL
|
||||
SELECT threadid, title, forumid, postuserid, open, dateline, views, visible, sticky
|
||||
|
@ -337,7 +381,7 @@ class BulkImport::VBulletin < BulkImport::Base
|
|||
end
|
||||
|
||||
def import_posts
|
||||
puts "Importing posts..."
|
||||
puts '', "Importing posts..."
|
||||
|
||||
posts = mysql_stream <<-SQL
|
||||
SELECT postid, p.threadid, parentid, userid, p.dateline, p.visible, pagetext
|
||||
|
@ -371,7 +415,7 @@ class BulkImport::VBulletin < BulkImport::Base
|
|||
|
||||
def import_likes
|
||||
return unless @has_post_thanks
|
||||
puts "Importing likes..."
|
||||
puts '', "Importing likes..."
|
||||
|
||||
@imported_likes = Set.new
|
||||
@last_imported_post_id = 0
|
||||
|
@ -400,7 +444,7 @@ class BulkImport::VBulletin < BulkImport::Base
|
|||
end
|
||||
|
||||
def import_private_topics
|
||||
puts "Importing private topics..."
|
||||
puts '', "Importing private topics..."
|
||||
|
||||
@imported_topics = {}
|
||||
|
||||
|
@ -429,7 +473,7 @@ class BulkImport::VBulletin < BulkImport::Base
|
|||
end
|
||||
|
||||
def import_topic_allowed_users
|
||||
puts "Importing topic allowed users..."
|
||||
puts '', "Importing topic allowed users..."
|
||||
|
||||
allowed_users = Set.new
|
||||
|
||||
|
@ -456,7 +500,7 @@ class BulkImport::VBulletin < BulkImport::Base
|
|||
end
|
||||
|
||||
def import_private_posts
|
||||
puts "Importing private posts..."
|
||||
puts '', "Importing private posts..."
|
||||
|
||||
posts = mysql_stream <<-SQL
|
||||
SELECT pmtextid, title, fromuserid, touserarray, dateline, message
|
||||
|
@ -485,35 +529,27 @@ class BulkImport::VBulletin < BulkImport::Base
|
|||
def create_permalink_file
|
||||
puts '', 'Creating Permalink File...', ''
|
||||
|
||||
id_mapping = []
|
||||
total = Topic.listable_topics.count
|
||||
start = Time.now
|
||||
|
||||
Topic.listable_topics.find_each do |topic|
|
||||
pcf = topic.first_post.custom_fields
|
||||
if pcf && pcf["import_id"]
|
||||
id = pcf["import_id"].split('-').last
|
||||
id_mapping.push("XXX#{id} YYY#{topic.id}")
|
||||
end
|
||||
end
|
||||
i = 0
|
||||
File.open(File.expand_path("../vb_map.csv", __FILE__), "w") do |f|
|
||||
Topic.listable_topics.find_each do |topic|
|
||||
i += 1
|
||||
pcf = topic.posts.includes(:_custom_fields).where(post_number: 1).first.custom_fields
|
||||
if pcf && pcf["import_id"]
|
||||
id = pcf["import_id"].split('-').last
|
||||
|
||||
# Category.find_each do |cat|
|
||||
# ccf = cat.custom_fields
|
||||
# if ccf && ccf["import_id"]
|
||||
# id = ccf["import_id"].to_i
|
||||
# id_mapping.push("/forumdisplay.php?#{id} http://forum.quartertothree.com#{cat.url}")
|
||||
# end
|
||||
# end
|
||||
|
||||
CSV.open(File.expand_path("../vb_map.csv", __FILE__), "w") do |csv|
|
||||
id_mapping.each do |value|
|
||||
csv << [value]
|
||||
f.print [ "XXX#{id} YYY#{topic.id}" ].to_csv
|
||||
print "\r%7d/%7d - %6d/sec" % [i, total, i.to_f / (Time.now - start)] if i % 5000 == 0
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
# find the uploaded file information from the db
|
||||
def find_upload(post, attachment_id)
|
||||
sql = "SELECT a.attachmentid attachment_id, a.userid user_id, a.filename filename,
|
||||
a.filedata filedata, a.extension extension
|
||||
sql = "SELECT a.attachmentid attachment_id, a.userid user_id, a.filename filename
|
||||
FROM #{TABLE_PREFIX}attachment a
|
||||
WHERE a.attachmentid = #{attachment_id}"
|
||||
results = mysql_query(sql)
|
||||
|
@ -538,7 +574,7 @@ class BulkImport::VBulletin < BulkImport::Base
|
|||
|
||||
upload = create_upload(post.user.id, filename, real_filename)
|
||||
|
||||
if upload.nil? || !upload.valid?
|
||||
if upload.nil? || upload.errors.any?
|
||||
puts "Upload not valid :("
|
||||
puts upload.errors.inspect if upload
|
||||
return
|
||||
|
@ -556,15 +592,7 @@ class BulkImport::VBulletin < BulkImport::Base
|
|||
|
||||
RateLimiter.disable
|
||||
current_count = 0
|
||||
|
||||
total_count = mysql_query(<<-SQL
|
||||
SELECT COUNT(p.postid) count
|
||||
FROM #{TABLE_PREFIX}post p
|
||||
JOIN #{TABLE_PREFIX}thread t ON t.threadid = p.threadid
|
||||
WHERE t.firstpostid <> p.postid
|
||||
SQL
|
||||
).first[0].to_i
|
||||
|
||||
total_count = Post.count
|
||||
success_count = 0
|
||||
fail_count = 0
|
||||
|
||||
|
@ -677,6 +705,54 @@ class BulkImport::VBulletin < BulkImport::Base
|
|||
end
|
||||
end
|
||||
|
||||
def merge_duplicated_users
|
||||
count = 0
|
||||
total_count = 0
|
||||
|
||||
duplicated = {}
|
||||
@user_ids_by_email.
|
||||
select { |e, ids| ids.count > 1 }.
|
||||
each_with_index do |(email, ids), i|
|
||||
duplicated[email] = [ ids, i ]
|
||||
count += 1
|
||||
total_count += ids.count
|
||||
end
|
||||
|
||||
puts '', "Merging #{total_count} duplicated users across #{count} distinct emails..."
|
||||
|
||||
start = Time.now
|
||||
|
||||
Parallel.each duplicated do |email, (user_ids, i)|
|
||||
# nothing to do about these - they will remain a randomized hex string
|
||||
next unless email.presence
|
||||
|
||||
# queried one by one to ensure ordering
|
||||
first, *rest = user_ids.map do |id|
|
||||
UserCustomField.includes(:user).find_by!(name: 'import_id', value: id).user
|
||||
end
|
||||
|
||||
rest.each do |dup|
|
||||
UserMerger.new(dup, first).merge!
|
||||
first.reload
|
||||
printf '.'
|
||||
end
|
||||
|
||||
print "\n%6d/%6d - %6d/sec" % [i, count, i.to_f / (Time.now - start)] if i % 10 == 0
|
||||
end
|
||||
|
||||
puts
|
||||
end
|
||||
|
||||
def save_duplicated_users
|
||||
File.open('duplicated_users.json', 'w+') do |f|
|
||||
f.puts @user_ids_by_email.to_json
|
||||
end
|
||||
end
|
||||
|
||||
def read_duplicated_users
|
||||
@user_ids_by_email = JSON.parse File.read('duplicated_users.json')
|
||||
end
|
||||
|
||||
def extract_pm_title(title)
|
||||
normalize_text(title).scrub.gsub(/^Re\s*:\s*/i, "")
|
||||
end
|
||||
|
|
|
@ -590,15 +590,7 @@ class ImportScripts::VBulletin < ImportScripts::Base
|
|||
end
|
||||
|
||||
current_count = 0
|
||||
|
||||
total_count = mysql_query(<<-SQL
|
||||
SELECT COUNT(postid) count
|
||||
FROM #{TABLE_PREFIX}post p
|
||||
JOIN #{TABLE_PREFIX}thread t ON t.threadid = p.threadid
|
||||
WHERE t.firstpostid <> p.postid
|
||||
SQL
|
||||
).first["count"]
|
||||
|
||||
total_count = Post.count
|
||||
success_count = 0
|
||||
fail_count = 0
|
||||
|
||||
|
|
Loading…
Reference in New Issue