FIX: remove the tmp inventory files after the s3 uploads check.

This commit is contained in:
Vinoth Kannan 2019-08-13 11:29:31 +05:30
parent 1358339bf9
commit 9919ee1900
2 changed files with 62 additions and 56 deletions

View File

@ -31,50 +31,58 @@ class S3Inventory
end end
DistributedMutex.synchronize("s3_inventory_list_missing_#{type}") do DistributedMutex.synchronize("s3_inventory_list_missing_#{type}") do
download_inventory_files_to_tmp_directory begin
decompress_inventory_files files.each do |file|
next if File.exists?(file[:filename][0...-3])
multisite_prefix = "uploads/#{RailsMultisite::ConnectionManagement.current_db}/" download_inventory_file_to_tmp_directory(file)
ActiveRecord::Base.transaction do decompress_inventory_file(file)
begin end
connection.exec("CREATE TEMP TABLE #{table_name}(url text UNIQUE, etag text, PRIMARY KEY(etag, url))")
connection.copy_data("COPY #{table_name} FROM STDIN CSV") do multisite_prefix = "uploads/#{RailsMultisite::ConnectionManagement.current_db}/"
files.each do |file| ActiveRecord::Base.transaction do
CSV.foreach(file[:filename][0...-3], headers: false) do |row| begin
key = row[CSV_KEY_INDEX] connection.exec("CREATE TEMP TABLE #{table_name}(url text UNIQUE, etag text, PRIMARY KEY(etag, url))")
next if Rails.configuration.multisite && key.exclude?(multisite_prefix) connection.copy_data("COPY #{table_name} FROM STDIN CSV") do
url = File.join(Discourse.store.absolute_base_url, key) files.each do |file|
connection.put_copy_data("#{url},#{row[CSV_ETAG_INDEX]}\n") CSV.foreach(file[:filename][0...-3], headers: false) do |row|
key = row[CSV_KEY_INDEX]
next if Rails.configuration.multisite && key.exclude?(multisite_prefix)
url = File.join(Discourse.store.absolute_base_url, key)
connection.put_copy_data("#{url},#{row[CSV_ETAG_INDEX]}\n")
end
end end
end end
end
# backfilling etags # backfilling etags
connection.async_exec("UPDATE #{model.table_name} connection.async_exec("UPDATE #{model.table_name}
SET etag = #{table_name}.etag SET etag = #{table_name}.etag
FROM #{table_name} FROM #{table_name}
WHERE #{model.table_name}.etag IS NULL WHERE #{model.table_name}.etag IS NULL
AND #{model.table_name}.url = #{table_name}.url") AND #{model.table_name}.url = #{table_name}.url")
list_missing_post_uploads if type == "original" list_missing_post_uploads if type == "original"
uploads = (model == Upload) ? model.by_users.where("created_at < ?", inventory_date) : model uploads = (model == Upload) ? model.by_users.where("created_at < ?", inventory_date) : model
missing_uploads = uploads missing_uploads = uploads
.joins("LEFT JOIN #{table_name} ON #{table_name}.etag = #{model.table_name}.etag") .joins("LEFT JOIN #{table_name} ON #{table_name}.etag = #{model.table_name}.etag")
.where("#{table_name}.etag IS NULL AND #{model.table_name}.etag IS NOT NULL") .where("#{table_name}.etag IS NULL AND #{model.table_name}.etag IS NOT NULL")
if (missing_count = missing_uploads.count) > 0 if (missing_count = missing_uploads.count) > 0
missing_uploads.select(:id, :url).find_each do |upload| missing_uploads.select(:id, :url).find_each do |upload|
log upload.url log upload.url
end
log "#{missing_count} of #{uploads.count} #{model.name.underscore.pluralize} are missing"
end end
log "#{missing_count} of #{uploads.count} #{model.name.underscore.pluralize} are missing" Discourse.stats.set("missing_s3_#{model.table_name}", missing_count)
ensure
connection.exec("DROP TABLE #{table_name}") unless connection.nil?
end end
Discourse.stats.set("missing_s3_#{model.table_name}", missing_count)
ensure
connection.exec("DROP TABLE #{table_name}") unless connection.nil?
end end
ensure
cleanup!
end end
end end
end end
@ -118,22 +126,18 @@ class S3Inventory
log "#{missing[:count]} post uploads are missing." log "#{missing[:count]} post uploads are missing."
end end
def download_inventory_files_to_tmp_directory def download_inventory_file_to_tmp_directory(file)
files.each do |file| return if File.exists?(file[:filename])
next if File.exists?(file[:filename])
log "Downloading inventory file '#{file[:key]}' to tmp directory..." log "Downloading inventory file '#{file[:key]}' to tmp directory..."
failure_message = "Failed to inventory file '#{file[:key]}' to tmp directory." failure_message = "Failed to inventory file '#{file[:key]}' to tmp directory."
@s3_helper.download_file(file[:key], file[:filename], failure_message) @s3_helper.download_file(file[:key], file[:filename], failure_message)
end
end end
def decompress_inventory_files def decompress_inventory_file(file)
files.each do |file| log "Decompressing inventory file '#{file[:filename]}', this may take a while..."
log "Decompressing inventory file '#{file[:filename]}', this may take a while..." Discourse::Utils.execute_command('gzip', '--decompress', file[:filename], failure_message: "Failed to decompress inventory file '#{file[:filename]}'.", chdir: tmp_directory)
Discourse::Utils.execute_command('gzip', '--decompress', file[:filename], failure_message: "Failed to decompress inventory file '#{file[:filename]}'.", chdir: tmp_directory)
end
end end
def update_bucket_policy def update_bucket_policy
@ -173,6 +177,13 @@ class S3Inventory
private private
def cleanup!
files.each do |file|
File.delete(file[:filename]) if File.exists?(file[:filename])
File.delete(file[:filename][0...-3]) if File.exists?(file[:filename][0...-3])
end
end
def connection def connection
@connection ||= ActiveRecord::Base.connection.raw_connection @connection ||= ActiveRecord::Base.connection.raw_connection
end end
@ -202,8 +213,7 @@ class S3Inventory
def tmp_directory def tmp_directory
@tmp_directory ||= begin @tmp_directory ||= begin
current_db = RailsMultisite::ConnectionManagement.current_db current_db = RailsMultisite::ConnectionManagement.current_db
timestamp = Time.now.strftime("%Y-%m-%d-%H%M%S") directory = File.join(Rails.root, "tmp", INVENTORY_PREFIX, current_db)
directory = File.join(Rails.root, "tmp", INVENTORY_PREFIX, current_db, timestamp)
FileUtils.mkdir_p(directory) FileUtils.mkdir_p(directory)
directory directory
end end

View File

@ -48,6 +48,8 @@ describe "S3Inventory" do
next_marker: "eyJNYXJrZXIiOiBudWxsLCAiYm90b190cnVuY2F0ZV9hbW91bnQiOiAyfQ==" next_marker: "eyJNYXJrZXIiOiBudWxsLCAiYm90b190cnVuY2F0ZV9hbW91bnQiOiAyfQ=="
} }
}) })
inventory.stubs(:cleanup!)
end end
it "should raise error if an inventory file is not found" do it "should raise error if an inventory file is not found" do
@ -67,9 +69,7 @@ describe "S3Inventory" do
Fabricate(:upload, etag: "ETag2", created_at: Time.now) Fabricate(:upload, etag: "ETag2", created_at: Time.now)
Fabricate(:upload, created_at: 2.days.ago) Fabricate(:upload, created_at: 2.days.ago)
inventory.expects(:download_inventory_files_to_tmp_directory) inventory.expects(:files).returns([{ key: "Key", filename: "#{csv_filename}.gz" }]).times(3)
inventory.expects(:decompress_inventory_files)
inventory.expects(:files).returns([{ key: "Key", filename: "#{csv_filename}.gz" }]).times(2)
inventory.expects(:inventory_date).returns(Time.now) inventory.expects(:inventory_date).returns(Time.now)
output = capture_stdout do output = capture_stdout do
@ -87,9 +87,7 @@ describe "S3Inventory" do
] ]
files.each { |file| Fabricate(:upload, url: file[0]) } files.each { |file| Fabricate(:upload, url: file[0]) }
inventory.expects(:download_inventory_files_to_tmp_directory) inventory.expects(:files).returns([{ key: "Key", filename: "#{csv_filename}.gz" }]).times(3)
inventory.expects(:decompress_inventory_files)
inventory.expects(:files).returns([{ key: "Key", filename: "#{csv_filename}.gz" }]).times(2)
output = capture_stdout do output = capture_stdout do
expect { inventory.backfill_etags_and_list_missing }.to change { Upload.where(etag: nil).count }.by(-2) expect { inventory.backfill_etags_and_list_missing }.to change { Upload.where(etag: nil).count }.by(-2)
@ -111,9 +109,7 @@ describe "S3Inventory" do
post.link_post_uploads post.link_post_uploads
upload.delete upload.delete
inventory.expects(:download_inventory_files_to_tmp_directory) inventory.expects(:files).returns([{ key: "Key", filename: "#{csv_filename}.gz" }]).times(3)
inventory.expects(:decompress_inventory_files)
inventory.expects(:files).returns([{ key: "Key", filename: "#{csv_filename}.gz" }]).times(2)
output = capture_stdout do output = capture_stdout do
inventory.backfill_etags_and_list_missing inventory.backfill_etags_and_list_missing