PERF: Preload S3 inventory data for multisite clusters
This commit is contained in:
parent
620c223d50
commit
16c65a94f7
|
@ -5,9 +5,38 @@ module Jobs
|
||||||
class EnsureS3UploadsExistence < ::Jobs::Scheduled
|
class EnsureS3UploadsExistence < ::Jobs::Scheduled
|
||||||
every 1.day
|
every 1.day
|
||||||
|
|
||||||
|
def perform(*args)
|
||||||
|
super
|
||||||
|
ensure
|
||||||
|
if @db_inventories
|
||||||
|
@db_inventories.values.each { |f| f.close; f.unlink }
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
def prepare_for_all_sites
|
||||||
|
inventory = S3Inventory.new(s3_helper, :upload)
|
||||||
|
@db_inventories = inventory.prepare_for_all_sites
|
||||||
|
@inventory_date = inventory.inventory_date
|
||||||
|
end
|
||||||
|
|
||||||
def execute(args)
|
def execute(args)
|
||||||
return unless SiteSetting.enable_s3_inventory
|
return unless SiteSetting.enable_s3_inventory
|
||||||
Discourse.store.list_missing_uploads(skip_optimized: true)
|
require 's3_inventory'
|
||||||
|
|
||||||
|
if !@db_inventories && Rails.configuration.multisite && GlobalSetting.use_s3?
|
||||||
|
prepare_for_all_sites
|
||||||
|
end
|
||||||
|
|
||||||
|
if @db_inventories && preloaded_inventory_file = @db_inventories[RailsMultisite::ConnectionManagement.current_db]
|
||||||
|
S3Inventory.new(
|
||||||
|
s3_helper,
|
||||||
|
:upload,
|
||||||
|
preloaded_inventory_file: preloaded_inventory_file,
|
||||||
|
preloaded_inventory_date: @inventory_date
|
||||||
|
).backfill_etags_and_list_missing
|
||||||
|
else
|
||||||
|
S3Inventory.new(s3_helper, :upload).backfill_etags_and_list_missing
|
||||||
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
|
@ -12,9 +12,15 @@ class S3Inventory
|
||||||
INVENTORY_PREFIX ||= "inventory"
|
INVENTORY_PREFIX ||= "inventory"
|
||||||
INVENTORY_VERSION ||= "1"
|
INVENTORY_VERSION ||= "1"
|
||||||
|
|
||||||
def initialize(s3_helper, type)
|
def initialize(s3_helper, type, preloaded_inventory_file: nil, preloaded_inventory_date: nil)
|
||||||
@s3_helper = s3_helper
|
@s3_helper = s3_helper
|
||||||
|
|
||||||
|
if preloaded_inventory_file && preloaded_inventory_date
|
||||||
|
# Data preloaded, so we don't need to fetch it again
|
||||||
|
@preloaded_inventory_file = preloaded_inventory_file
|
||||||
|
@inventory_date = preloaded_inventory_date
|
||||||
|
end
|
||||||
|
|
||||||
if type == :upload
|
if type == :upload
|
||||||
@type = "original"
|
@type = "original"
|
||||||
@model = Upload
|
@model = Upload
|
||||||
|
@ -25,34 +31,27 @@ class S3Inventory
|
||||||
end
|
end
|
||||||
|
|
||||||
def backfill_etags_and_list_missing
|
def backfill_etags_and_list_missing
|
||||||
if files.blank?
|
if !@preloaded_inventory_file && files.blank?
|
||||||
error("Failed to list inventory from S3")
|
error("Failed to list inventory from S3")
|
||||||
return
|
return
|
||||||
end
|
end
|
||||||
|
|
||||||
DistributedMutex.synchronize("s3_inventory_list_missing_#{type}", validity: 30.minutes) do
|
DistributedMutex.synchronize("s3_inventory_list_missing_#{type}", validity: 30.minutes) do
|
||||||
begin
|
begin
|
||||||
files.each do |file|
|
download_and_decompress_files if !@preloaded_inventory_file
|
||||||
next if File.exists?(file[:filename][0...-3])
|
|
||||||
|
|
||||||
download_inventory_file_to_tmp_directory(file)
|
|
||||||
decompress_inventory_file(file)
|
|
||||||
end
|
|
||||||
|
|
||||||
multisite_prefix = Discourse.store.upload_path
|
multisite_prefix = Discourse.store.upload_path
|
||||||
ActiveRecord::Base.transaction do
|
ActiveRecord::Base.transaction do
|
||||||
begin
|
begin
|
||||||
connection.exec("CREATE TEMP TABLE #{table_name}(url text UNIQUE, etag text, PRIMARY KEY(etag, url))")
|
connection.exec("CREATE TEMP TABLE #{table_name}(url text UNIQUE, etag text, PRIMARY KEY(etag, url))")
|
||||||
connection.copy_data("COPY #{table_name} FROM STDIN CSV") do
|
connection.copy_data("COPY #{table_name} FROM STDIN CSV") do
|
||||||
files.each do |file|
|
for_each_inventory_row do |row|
|
||||||
CSV.foreach(file[:filename][0...-3], headers: false) do |row|
|
|
||||||
key = row[CSV_KEY_INDEX]
|
key = row[CSV_KEY_INDEX]
|
||||||
next if Rails.configuration.multisite && key.exclude?(multisite_prefix)
|
next if Rails.configuration.multisite && key.exclude?(multisite_prefix)
|
||||||
url = File.join(Discourse.store.absolute_base_url, key)
|
url = File.join(Discourse.store.absolute_base_url, key)
|
||||||
connection.put_copy_data("#{url},#{row[CSV_ETAG_INDEX]}\n")
|
connection.put_copy_data("#{url},#{row[CSV_ETAG_INDEX]}\n")
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
|
||||||
|
|
||||||
# backfilling etags
|
# backfilling etags
|
||||||
connection.async_exec("UPDATE #{model.table_name}
|
connection.async_exec("UPDATE #{model.table_name}
|
||||||
|
@ -87,6 +86,16 @@ class S3Inventory
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def for_each_inventory_row
|
||||||
|
if @preloaded_inventory_file
|
||||||
|
CSV.foreach(@preloaded_inventory_file) { |row| yield(row) }
|
||||||
|
else
|
||||||
|
files.each do |file|
|
||||||
|
CSV.foreach(file[:filename][0...-3]) { |row| yield(row) }
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
def download_inventory_file_to_tmp_directory(file)
|
def download_inventory_file_to_tmp_directory(file)
|
||||||
return if File.exists?(file[:filename])
|
return if File.exists?(file[:filename])
|
||||||
|
|
||||||
|
@ -136,9 +145,36 @@ class S3Inventory
|
||||||
)
|
)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def prepare_for_all_sites
|
||||||
|
db_names = RailsMultisite::ConnectionManagement.all_dbs
|
||||||
|
db_files = {}
|
||||||
|
|
||||||
|
db_names.each do |db|
|
||||||
|
db_files[db] = Tempfile.new("#{db}-inventory.csv")
|
||||||
|
end
|
||||||
|
|
||||||
|
download_and_decompress_files
|
||||||
|
for_each_inventory_row do |row|
|
||||||
|
key = row[CSV_KEY_INDEX]
|
||||||
|
row_db = key.match(/uploads\/([^\/]+)\//)&.[](1)
|
||||||
|
if row_db && file = db_files[row_db]
|
||||||
|
file.write(row.to_csv)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
db_names.each do |db|
|
||||||
|
db_files[db].rewind
|
||||||
|
end
|
||||||
|
|
||||||
|
db_files
|
||||||
|
ensure
|
||||||
|
cleanup!
|
||||||
|
end
|
||||||
|
|
||||||
private
|
private
|
||||||
|
|
||||||
def cleanup!
|
def cleanup!
|
||||||
|
return if @preloaded_inventory_file
|
||||||
files.each do |file|
|
files.each do |file|
|
||||||
File.delete(file[:filename]) if File.exists?(file[:filename])
|
File.delete(file[:filename]) if File.exists?(file[:filename])
|
||||||
File.delete(file[:filename][0...-3]) if File.exists?(file[:filename][0...-3])
|
File.delete(file[:filename][0...-3]) if File.exists?(file[:filename][0...-3])
|
||||||
|
@ -154,6 +190,7 @@ class S3Inventory
|
||||||
end
|
end
|
||||||
|
|
||||||
def files
|
def files
|
||||||
|
return if @preloaded_inventory_file
|
||||||
@files ||= begin
|
@files ||= begin
|
||||||
symlink_file = unsorted_files.sort_by { |file| -file.last_modified.to_i }.first
|
symlink_file = unsorted_files.sort_by { |file| -file.last_modified.to_i }.first
|
||||||
return [] if symlink_file.blank?
|
return [] if symlink_file.blank?
|
||||||
|
@ -171,6 +208,15 @@ class S3Inventory
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def download_and_decompress_files
|
||||||
|
files.each do |file|
|
||||||
|
next if File.exists?(file[:filename][0...-3])
|
||||||
|
|
||||||
|
download_inventory_file_to_tmp_directory(file)
|
||||||
|
decompress_inventory_file(file)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
def tmp_directory
|
def tmp_directory
|
||||||
@tmp_directory ||= begin
|
@tmp_directory ||= begin
|
||||||
current_db = RailsMultisite::ConnectionManagement.current_db
|
current_db = RailsMultisite::ConnectionManagement.current_db
|
||||||
|
|
|
@ -62,6 +62,7 @@ describe "S3Inventory" do
|
||||||
freeze_time
|
freeze_time
|
||||||
|
|
||||||
CSV.foreach(csv_filename, headers: false) do |row|
|
CSV.foreach(csv_filename, headers: false) do |row|
|
||||||
|
next unless row[S3Inventory::CSV_KEY_INDEX].include?("default")
|
||||||
Fabricate(:upload, etag: row[S3Inventory::CSV_ETAG_INDEX], updated_at: 2.days.ago)
|
Fabricate(:upload, etag: row[S3Inventory::CSV_ETAG_INDEX], updated_at: 2.days.ago)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@ -82,8 +83,8 @@ describe "S3Inventory" do
|
||||||
|
|
||||||
it "should backfill etags to uploads table correctly" do
|
it "should backfill etags to uploads table correctly" do
|
||||||
files = [
|
files = [
|
||||||
["#{Discourse.store.absolute_base_url}/original/1X/0184537a4f419224404d013414e913a4f56018f2.jpg", "defcaac0b4aca535c284e95f30d608d0"],
|
["#{Discourse.store.absolute_base_url}/uploads/default/original/1X/0184537a4f419224404d013414e913a4f56018f2.jpg", "defcaac0b4aca535c284e95f30d608d0"],
|
||||||
["#{Discourse.store.absolute_base_url}/original/1X/0789fbf5490babc68326b9cec90eeb0d6590db05.png", "25c02eaceef4cb779fc17030d33f7f06"]
|
["#{Discourse.store.absolute_base_url}/uploads/default/original/1X/0789fbf5490babc68326b9cec90eeb0d6590db05.png", "25c02eaceef4cb779fc17030d33f7f06"]
|
||||||
]
|
]
|
||||||
files.each { |file| Fabricate(:upload, url: file[0]) }
|
files.each { |file| Fabricate(:upload, url: file[0]) }
|
||||||
|
|
||||||
|
@ -95,4 +96,45 @@ describe "S3Inventory" do
|
||||||
|
|
||||||
expect(Upload.by_users.order(:url).pluck(:url, :etag)).to eq(files)
|
expect(Upload.by_users.order(:url).pluck(:url, :etag)).to eq(files)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
it "should work when passed preloaded data" do
|
||||||
|
freeze_time
|
||||||
|
|
||||||
|
CSV.foreach(csv_filename, headers: false) do |row|
|
||||||
|
next unless row[S3Inventory::CSV_KEY_INDEX].include?("default")
|
||||||
|
Fabricate(:upload, etag: row[S3Inventory::CSV_ETAG_INDEX], updated_at: 2.days.ago)
|
||||||
|
end
|
||||||
|
|
||||||
|
upload = Fabricate(:upload, etag: "ETag", updated_at: 1.days.ago)
|
||||||
|
Fabricate(:upload, etag: "ETag2", updated_at: Time.now)
|
||||||
|
no_etag = Fabricate(:upload, updated_at: 2.days.ago)
|
||||||
|
|
||||||
|
output = capture_stdout do
|
||||||
|
File.open(csv_filename) do |f|
|
||||||
|
preloaded_inventory = S3Inventory.new(
|
||||||
|
helper,
|
||||||
|
:upload,
|
||||||
|
preloaded_inventory_file: f,
|
||||||
|
preloaded_inventory_date: Time.now
|
||||||
|
)
|
||||||
|
preloaded_inventory.backfill_etags_and_list_missing
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
expect(output).to eq("#{upload.url}\n#{no_etag.url}\n2 of 5 uploads are missing\n")
|
||||||
|
expect(Discourse.stats.get("missing_s3_uploads")).to eq(2)
|
||||||
|
end
|
||||||
|
|
||||||
|
it "can create per-site files", type: :multisite do
|
||||||
|
freeze_time
|
||||||
|
|
||||||
|
inventory.stubs(:files).returns([{ key: "Key", filename: "#{csv_filename}.gz" }])
|
||||||
|
|
||||||
|
files = inventory.prepare_for_all_sites
|
||||||
|
db1 = files["default"].read
|
||||||
|
db2 = files["second"].read
|
||||||
|
expect(db1.lines.count).to eq(3)
|
||||||
|
expect(db2.lines.count).to eq(1)
|
||||||
|
files.values.each { |f| f.close; f.unlink }
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
"abc","original/1X/0184537a4f419224404d013414e913a4f56018f2.jpg","defcaac0b4aca535c284e95f30d608d0"
|
"abc","uploads/default/original/1X/0184537a4f419224404d013414e913a4f56018f2.jpg","defcaac0b4aca535c284e95f30d608d0"
|
||||||
"abc","original/1X/050afc0ab01debe8cf48fd2ce50fbbf5eb072815.jpg","0cdc623af39cde0adb382670a6dc702a"
|
"abc","uploads/default/original/1X/050afc0ab01debe8cf48fd2ce50fbbf5eb072815.jpg","0cdc623af39cde0adb382670a6dc702a"
|
||||||
"abc","original/1X/0789fbf5490babc68326b9cec90eeb0d6590db05.png","25c02eaceef4cb779fc17030d33f7f06"
|
"abc","uploads/default/original/1X/0789fbf5490babc68326b9cec90eeb0d6590db05.png","25c02eaceef4cb779fc17030d33f7f06"
|
||||||
|
"abc","uploads/second/original/1X/f789fbf5490babc68326b9cec90eeb0d6590db03.png","15c02eaceef4cb779fc17030d33f7f04"
|
||||||
|
|
|
Loading…
Reference in New Issue