diff --git a/lib/s3_inventory.rb b/lib/s3_inventory.rb index 763f1d1e462..fab17564d8e 100644 --- a/lib/s3_inventory.rb +++ b/lib/s3_inventory.rb @@ -5,7 +5,7 @@ require "csv" class S3Inventory - attr_reader :inventory_id, :csv_filename, :model + attr_reader :inventory_id, :model, :last_modified CSV_KEY_INDEX ||= 1 CSV_ETAG_INDEX ||= 2 @@ -24,38 +24,29 @@ class S3Inventory end end - def file - @file ||= unsorted_files.sort_by { |file| -file.last_modified.to_i }.first - end - def list_missing - if file.blank? + if files.blank? error("Failed to list inventory from S3") return end DistributedMutex.synchronize("s3_inventory_list_missing_#{inventory_id}") do - current_db = RailsMultisite::ConnectionManagement.current_db - timestamp = Time.now.strftime("%Y-%m-%d-%H%M%S") - @tmp_directory = File.join(Rails.root, "tmp", INVENTORY_PREFIX, current_db, timestamp) - @archive_filename = File.join(@tmp_directory, File.basename(file.key)) - @csv_filename = @archive_filename[0...-3] - - FileUtils.mkdir_p(@tmp_directory) - download_inventory_file_to_tmp_directory - decompress_inventory_file + download_inventory_files_to_tmp_directory + decompress_inventory_files begin table_name = "#{inventory_id}_inventory" connection = ActiveRecord::Base.connection.raw_connection connection.exec("CREATE TEMP TABLE #{table_name}(key text UNIQUE, etag text PRIMARY KEY)") connection.copy_data("COPY #{table_name} FROM STDIN CSV") do - CSV.foreach(csv_filename, headers: false) do |row| - connection.put_copy_data("#{row[CSV_KEY_INDEX]},#{row[CSV_ETAG_INDEX]}\n") + files.each do |file| + CSV.foreach(file[:filename][0...-3], headers: false) do |row| + connection.put_copy_data("#{row[CSV_KEY_INDEX]},#{row[CSV_ETAG_INDEX]}\n") + end end end - uploads = model.where("created_at < ?", file.last_modified) + uploads = model.where("created_at < ?", last_modified) missing_uploads = uploads.joins("LEFT JOIN #{table_name} ON #{table_name}.etag = #{model.table_name}.etag").where("#{table_name}.etag is NULL") if (missing_count = missing_uploads.count) > 0 @@ -71,18 +62,21 @@ class S3Inventory end end - def download_inventory_file_to_tmp_directory - log "Downloading inventory file to tmp directory..." - failure_message = "Failed to inventory file to tmp directory." + def download_inventory_files_to_tmp_directory + files.each do |file| + log "Downloading inventory file '#{file[:key]}' to tmp directory..." + failure_message = "Failed to inventory file '#{file[:key]}' to tmp directory." - @s3_helper.download_file(file.key, @archive_filename, failure_message) + @s3_helper.download_file(file[:key], file[:filename], failure_message) + end end - def decompress_inventory_file - log "Decompressing inventory file, this may take a while..." - - FileUtils.cd(@tmp_directory) do - Discourse::Utils.execute_command('gzip', '--decompress', @archive_filename, failure_message: "Failed to decompress inventory file.") + def decompress_inventory_files + FileUtils.cd(tmp_directory) do + files.each do |file| + log "Decompressing inventory file '#{file[:filename]}', this may take a while..." + Discourse::Utils.execute_command('gzip', '--decompress', file[:filename], failure_message: "Failed to decompress inventory file '#{file[:filename]}'.") + end end end @@ -123,6 +117,34 @@ class S3Inventory private + def files + @files ||= begin + symlink_file = unsorted_files.sort_by { |file| -file.last_modified.to_i }.first + return [] if symlink_file.blank? + + @last_modified = symlink_file.last_modified + log "Downloading symlink file to tmp directory..." + failure_message = "Failed to download symlink file to tmp directory." + filename = File.join(tmp_directory, File.basename(symlink_file.key)) + + @s3_helper.download_file(symlink_file.key, filename, failure_message) + File.readlines(filename).map do |key| + key = key.sub("s3://#{bucket_name}/").sub("\n", "") + { key: key, filename: File.join(tmp_directory, File.basename(key)) } + end + end + end + + def tmp_directory + @tmp_directory ||= begin + current_db = RailsMultisite::ConnectionManagement.current_db + timestamp = Time.now.strftime("%Y-%m-%d-%H%M%S") + directory = File.join(Rails.root, "tmp", INVENTORY_PREFIX, current_db, timestamp) + FileUtils.mkdir_p(directory) + directory + end + end + def inventory_configuration filter_prefix = inventory_id filter_prefix = File.join(bucket_folder_path, filter_prefix) if bucket_folder_path.present? @@ -159,8 +181,9 @@ class S3Inventory def unsorted_files objects = [] - @s3_helper.list(inventory_data_path).each do |obj| - if obj.key.match?(/\.csv\.gz$/i) + hive_path = File.join(inventory_path, bucket_name, inventory_id, "hive") + @s3_helper.list(hive_path).each do |obj| + if obj.key.match?(/symlink\.txt$/i) objects << obj end end @@ -170,10 +193,6 @@ class S3Inventory log("Failed to list inventory from S3", e) end - def inventory_data_path - File.join(inventory_path, bucket_name, inventory_id, "data") - end - def inventory_path_arn File.join(bucket_arn, inventory_path) end diff --git a/spec/components/s3_inventory_spec.rb b/spec/components/s3_inventory_spec.rb index fdb4a793a6f..1156d5db01e 100644 --- a/spec/components/s3_inventory_spec.rb +++ b/spec/components/s3_inventory_spec.rb @@ -17,8 +17,7 @@ describe "S3Inventory" do SiteSetting.enable_s3_inventory = true client.stub_responses(:list_objects, -> (context) { - inventory_data_path = "#{S3Inventory::INVENTORY_PREFIX}/#{S3Inventory::INVENTORY_VERSION}/bucket/original/data" - expect(context.params[:prefix]).to eq(inventory_data_path) + expect(context.params[:prefix]).to eq("#{S3Inventory::INVENTORY_PREFIX}/#{S3Inventory::INVENTORY_VERSION}/bucket/original/hive") { contents: [ @@ -50,10 +49,6 @@ describe "S3Inventory" do }) end - it "should return the latest inventory file name" do - expect(inventory.file.key).to eq("example1.csv.gz") - end - it "should raise error if an inventory file is not found" do client.stub_responses(:list_objects, contents: []) output = capture_stdout { inventory.list_missing } @@ -69,14 +64,14 @@ describe "S3Inventory" do upload = Fabricate(:upload, etag: "ETag", created_at: 1.days.ago) Fabricate(:upload, etag: "ETag2", created_at: Time.now) - inventory.expects(:decompress_inventory_file) - inventory.expects(:csv_filename).returns(csv_filename) - inventory.file.expects(:last_modified).returns(Time.now) + inventory.expects(:decompress_inventory_files) + inventory.expects(:files).returns([{ key: "Key", filename: "#{csv_filename}.gz" }]).at_least(1) + inventory.expects(:last_modified).returns(Time.now) output = capture_stdout do inventory.list_missing end - expect(output).to eq("Downloading inventory file to tmp directory...\n#{upload.url}\n1 of 4 uploads are missing\n") + expect(output).to eq("Downloading inventory file 'Key' to tmp directory...\n#{upload.url}\n1 of 4 uploads are missing\n") end end