discourse/script/bulk_import/generic_bulk.rb

368 lines
9.6 KiB
Ruby

# frozen_string_literal: true
require_relative "base"
require "sqlite3"
require "json"
class BulkImport::Generic < BulkImport::Base
AVATAR_DIRECTORY = ENV["AVATAR_DIRECTORY"]
UPLOAD_DIRECTORY = ENV["UPLOAD_DIRECTORY"]
def initialize(db_path)
super()
@db = create_connection(db_path)
end
def start
run # will call execute, and then "complete" the migration
# Now that the migration is complete, do some more work:
Discourse::Application.load_tasks
puts "running 'import:ensure_consistency' rake task."
Rake::Task["import:ensure_consistency"].invoke
end
def execute
import_categories
import_users
import_user_emails
import_single_sign_on_records
import_topics
import_posts
import_topic_allowed_users
import_likes
import_user_stats
import_tags
end
def import_categories
puts "Importing categories..."
categories = query(<<~SQL)
WITH RECURSIVE tree(id, parent_category_id, name, description, color, text_color, read_restricted, slug,
old_relative_url, existing_id, level, rowid) AS (
SELECT c.id, c.parent_category_id, c.name, c.description, c.color, c.text_color, c.read_restricted, c.slug,
c.old_relative_url, c.existing_id, 0 AS level, c.ROWID
FROM categories c
WHERE c.parent_category_id IS NULL
UNION
SELECT c.id, c.parent_category_id, c.name, c.description, c.color, c.text_color, c.read_restricted, c.slug,
c.old_relative_url, c.existing_id, tree.level + 1 AS level, c.ROWID
FROM categories c,
tree
WHERE c.parent_category_id = tree.id
)
SELECT *
FROM tree
ORDER BY level, rowid
SQL
create_categories(categories) do |row|
{
imported_id: row["id"],
existing_id: row["existing_id"],
name: row["name"],
description: row["description"],
parent_category_id:
row["parent_category_id"] ? category_id_from_imported_id(row["parent_category_id"]) : nil,
slug: row["slug"],
}
end
end
def import_users
puts "Importing users..."
users = query(<<~SQL)
SELECT ROWID, *
FROM users
ORDER BY ROWID
SQL
create_users(users) do |row|
sso_record = JSON.parse(row["sso_record"]) if row["sso_record"].present?
if row["suspension"].present?
suspension = JSON.parse(row["suspension"])
suspended_at = suspension["suspended_at"]
suspended_till = suspension["suspended_till"]
end
{
imported_id: row["id"],
username: row["username"],
name: row["name"],
email: row["email"],
external_id: sso_record&.fetch("external_id"),
created_at: to_datetime(row["created_at"]),
admin: row["admin"],
moderator: row["moderator"],
suspended_at: suspended_at,
suspended_till: suspended_till,
}
end
end
def import_user_emails
puts "", "Importing user emails..."
users = query(<<~SQL)
SELECT ROWID, id, email, created_at
FROM users
ORDER BY ROWID
SQL
create_user_emails(users) do |row|
{
# FIXME: using both "imported_id" and "imported_user_id" and should be replaced by just "imported_id"
imported_id: row["id"],
imported_user_id: row["id"],
email: row["email"],
created_at: to_datetime(row["created_at"]),
}
end
end
def import_single_sign_on_records
puts "", "Importing SSO records..."
users = query(<<~SQL)
SELECT ROWID, id, sso_record
FROM users
WHERE sso_record IS NOT NULL
ORDER BY ROWID
SQL
create_single_sign_on_records(users) do |row|
sso_record = JSON.parse(row["sso_record"], symbolize_names: true)
# FIXME: using both "imported_id" and "imported_user_id" and should be replaced by just "imported_id"
sso_record[:imported_id] = row["id"]
sso_record[:imported_user_id] = row["id"]
sso_record
end
end
def import_topics
puts "Importing topics..."
topics = query(<<~SQL)
SELECT ROWID, *
FROM topics
ORDER BY ROWID
SQL
create_topics(topics) do |row|
{
archetype: row["private_message"] ? Archetype.private_message : Archetype.default,
imported_id: row["id"],
title: row["title"],
user_id: user_id_from_imported_id(row["user_id"]),
created_at: to_datetime(row["created_at"]),
category_id: category_id_from_imported_id(row["category_id"]),
closed: to_boolean(row["closed"]),
views: row["views"],
}
end
end
def import_topic_allowed_users
puts "Importing topic_allowed_users..."
topics = query(<<~SQL)
SELECT ROWID, *
FROM topics
WHERE private_message IS NOT NULL
ORDER BY ROWID
SQL
added = 0
create_topic_allowed_users(topics) do |row|
next unless topic_id = topic_id_from_imported_id(row["id"])
imported_user_id = JSON.parse(row["private_message"])["user_ids"].first
user_id = user_id_from_imported_id(imported_user_id)
added += 1
{
# FIXME: missing imported_id
topic_id: topic_id,
user_id: user_id,
}
end
puts "", "Added #{added} topic_allowed_users records."
end
def import_posts
puts "Importing posts..."
posts = query(<<~SQL)
SELECT ROWID, *
FROM posts
ORDER BY topic_id, post_number
SQL
create_posts(posts) do |row|
next if row["raw"].blank?
next unless topic_id = topic_id_from_imported_id(row["topic_id"])
{
imported_id: row["id"],
topic_id: topic_id,
user_id: user_id_from_imported_id(row["user_id"]),
created_at: to_datetime(row["created_at"]),
raw: row["raw"],
like_count: row["like_count"],
}
end
end
def import_likes
puts "Importing likes..."
@imported_likes = Set.new
likes = query(<<~SQL)
SELECT ROWID, *
FROM likes
ORDER BY ROWID
SQL
create_post_actions(likes) do |row|
post_id = post_id_from_imported_id(row["post_id"])
user_id = user_id_from_imported_id(row["user_id"])
next if post_id.nil? || user_id.nil?
next if @imported_likes.add?([post_id, user_id]).nil?
{
# FIXME: missing imported_id
post_id: post_id_from_imported_id(row["post_id"]),
user_id: user_id_from_imported_id(row["user_id"]),
post_action_type_id: 2,
created_at: to_datetime(row["created_at"]),
}
end
end
def import_user_stats
puts "Importing user stats..."
users = query(<<~SQL)
WITH posts_counts AS (
SELECT COUNT(p.id) AS count, p.user_id
FROM posts p GROUP BY p.user_id
),
topic_counts AS (
SELECT COUNT(t.id) AS count, t.user_id
FROM topics t GROUP BY t.user_id
),
first_post AS (
SELECT MIN(p.created_at) AS created_at, p.user_id
FROM posts p GROUP BY p.user_id ORDER BY p.created_at ASC
)
SELECT u.id AS user_id, u.created_at, pc.count AS posts, tc.count AS topics, fp.created_at AS first_post
FROM users u
JOIN posts_counts pc ON u.id = pc.user_id
JOIN topic_counts tc ON u.id = tc.user_id
JOIN first_post fp ON u.id = fp.user_id
SQL
create_user_stats(users) do |row|
user = {
imported_id: row["user_id"],
imported_user_id: row["user_id"],
new_since: to_datetime(row["created_at"]),
post_count: row["posts"],
topic_count: row["topics"],
first_post_created_at: to_datetime(row["first_post"]),
}
likes_received = @db.execute(<<~SQL)
SELECT COUNT(l.id) AS likes_received
FROM likes l JOIN posts p ON l.post_id = p.id
WHERE p.user_id = #{row["user_id"]}
SQL
user[:likes_received] = row["likes_received"] if likes_received
likes_given = @db.execute(<<~SQL)
SELECT COUNT(l.id) AS likes_given
FROM likes l
WHERE l.user_id = #{row["user_id"]}
SQL
user[:likes_given] = row["likes_given"] if likes_given
user
end
end
def import_tags
puts "", "Importing tags..."
tags =
query("SELECT id as topic_id, tags FROM topics")
.map do |r|
next unless r["tags"]
[r["topic_id"], JSON.parse(r["tags"]).uniq]
end
.compact
tag_mapping = {}
tags
.map(&:last)
.flatten
.compact
.uniq
.each do |tag_name|
cleaned_tag_name = DiscourseTagging.clean_tag(tag_name)
tag = Tag.find_by_name(cleaned_tag_name) || Tag.create!(name: cleaned_tag_name)
tag_mapping[tag_name] = tag.id
end
tags_disaggregated =
tags
.map do |topic_id, tags_of_topic|
tags_of_topic.map { |t| { topic_id: topic_id, tag_id: tag_mapping.fetch(t) } }
end
.flatten
create_topic_tags(tags_disaggregated) do |row|
next unless topic_id = topic_id_from_imported_id(row[:topic_id])
{ topic_id: topic_id, tag_id: row[:tag_id] }
end
end
def create_connection(path)
sqlite = SQLite3::Database.new(path, results_as_hash: true)
sqlite.busy_timeout = 60_000 # 60 seconds
sqlite.auto_vacuum = "full"
sqlite.foreign_keys = true
sqlite.journal_mode = "wal"
sqlite.synchronous = "normal"
sqlite
end
def query(sql)
@db.prepare(sql).execute
end
def to_date(text)
text.present? ? Date.parse(text) : nil
end
def to_datetime(text)
text.present? ? DateTime.parse(text) : nil
end
def to_boolean(value)
value == 1
end
end
BulkImport::Generic.new(ARGV.first).start