diff --git a/app/jobs/regular/update_s3_inventory.rb b/app/jobs/regular/update_s3_inventory.rb new file mode 100644 index 00000000000..946ec47892d --- /dev/null +++ b/app/jobs/regular/update_s3_inventory.rb @@ -0,0 +1,17 @@ +require "s3_inventory" + +module Jobs + # if upload bucket changes or inventory bucket changes we want to update s3 bucket policy and inventory configuration + class UpdateS3Inventory < Jobs::Base + + def execute(args) + return unless SiteSetting.enable_s3_inventory? && SiteSetting.enable_s3_uploads? + + [:upload, :optimized].each do |type| + s3_inventory = S3Inventory.new(Discourse.store.s3_helper, type) + s3_inventory.update_bucket_policy if type == :upload + s3_inventory.update_bucket_inventory_configuration + end + end + end +end diff --git a/config/locales/server.en.yml b/config/locales/server.en.yml index ddea713c1c6..2af15b9479b 100644 --- a/config/locales/server.en.yml +++ b/config/locales/server.en.yml @@ -191,6 +191,7 @@ en: other: "You specified the invalid choices %{name}" default_categories_already_selected: "You cannot select a category used in another list." s3_upload_bucket_is_required: "You cannot enable uploads to S3 unless you've provided the 's3_upload_bucket'." + enable_s3_uploads_is_required: "You cannot enable inventory to S3 unless you've enabled the S3 uploads." s3_backup_requires_s3_settings: "You cannot use S3 as backup location unless you've provided the '%{setting_name}'." s3_bucket_reused: "You cannot use the same bucket for 's3_upload_bucket' and 's3_backup_bucket'. Choose a different bucket or use a different path for each bucket." conflicting_google_user_id: 'The Google Account ID for this account has changed; staff intervention is required for security reasons. Please contact staff and point them to
https://meta.discourse.org/t/76575' @@ -1488,6 +1489,7 @@ en: s3_force_path_style: "Enforce path-style addressing for your custom endpoint. IMPORTANT: Required for using Minio uploads and backups." s3_configure_tombstone_policy: "Enable automatic deletion policy for tombstone uploads. IMPORTANT: If disabled, no space will be reclaimed after uploads are deleted." s3_disable_cleanup: "Disable the removal of backups from S3 when removed locally." + enable_s3_inventory: "Generate reports and verify uploads using Amazon S3 inventory. IMPORTANT: requires valid S3 credentials (both access key id & secret access key)." backup_time_of_day: "Time of day UTC when the backup should occur." backup_with_uploads: "Include uploads in scheduled backups. Disabling this will only backup the database." backup_location: "Location where backups are stored. IMPORTANT: S3 requires valid S3 credentials entered in Files settings." diff --git a/config/site_settings.yml b/config/site_settings.yml index a897c968549..17ae4a2618b 100644 --- a/config/site_settings.yml +++ b/config/site_settings.yml @@ -1050,6 +1050,8 @@ files: s3_configure_tombstone_policy: default: true shadowed_by_global: true + enable_s3_inventory: + default: false allow_profile_backgrounds: client: true default: true diff --git a/lib/backup_restore/s3_backup_store.rb b/lib/backup_restore/s3_backup_store.rb index 03edc014c38..fbbb18f1d75 100644 --- a/lib/backup_restore/s3_backup_store.rb +++ b/lib/backup_restore/s3_backup_store.rb @@ -32,9 +32,7 @@ module BackupRestore end def download_file(filename, destination_path, failure_message = nil) - unless @s3_helper.object(filename).download_file(destination_path) - raise failure_message&.to_s || "Failed to download file" - end + @s3_helper.download_file(filename, destination_path, failure_message) end def upload_file(filename, source_path, content_type) diff --git a/lib/discourse.rb b/lib/discourse.rb index c8d76f3e158..8f621b838c5 100644 --- a/lib/discourse.rb +++ b/lib/discourse.rb @@ -463,6 +463,11 @@ module Discourse end end + DiscourseEvent.on(:site_setting_saved) do |site_setting| + name = site_setting.name.to_s + Jobs.enqueue(:update_s3_inventory) if name.include?("s3_inventory") || name == "s3_upload_bucket" + end + def self.current_user_provider @current_user_provider || Auth::DefaultCurrentUserProvider end diff --git a/lib/file_store/s3_store.rb b/lib/file_store/s3_store.rb index 119dd9c0a1c..d4bb6647d29 100644 --- a/lib/file_store/s3_store.rb +++ b/lib/file_store/s3_store.rb @@ -124,8 +124,14 @@ module FileStore end def list_missing_uploads(skip_optimized: false) - list_missing(Upload, "original/") - list_missing(OptimizedImage, "optimized/") unless skip_optimized + if SiteSetting.enable_s3_inventory + require 's3_inventory' + S3Inventory.new(s3_helper, :upload).list_missing + S3Inventory.new(s3_helper, :optimized).list_missing unless skip_optimized + else + list_missing(Upload, "original/") + list_missing(OptimizedImage, "optimized/") unless skip_optimized + end end private @@ -140,7 +146,7 @@ module FileStore verified_ids = [] files.each do |f| - id = model.where("url LIKE '%#{f.key}'").pluck(:id).first if f.size > 0 + id = model.where("url LIKE '%#{f.key}' AND etag = '#{f.etag}'").pluck(:id).first verified_ids << id if id.present? marker = f.key end @@ -150,7 +156,7 @@ module FileStore files = @s3_helper.list(prefix, marker) end - missing_uploads = model.where("id NOT IN (SELECT val FROM verified_ids)") + missing_uploads = model.joins('LEFT JOIN verified_ids ON verified_ids.val = id').where("verified_ids.val IS NULL") missing_count = missing_uploads.count if missing_count > 0 diff --git a/lib/s3_helper.rb b/lib/s3_helper.rb index fbdde9674a7..380ba86e5af 100644 --- a/lib/s3_helper.rb +++ b/lib/s3_helper.rb @@ -205,6 +205,20 @@ class S3Helper opts end + def download_file(filename, destination_path, failure_message = nil) + unless object(filename).download_file(destination_path) + raise failure_message&.to_s || "Failed to download file" + end + end + + def s3_client + @s3_client ||= Aws::S3::Client.new(@s3_options) + end + + def s3_inventory_path(path = 'inventory') + get_path_for_s3_upload(path) + end + private def default_s3_options @@ -228,10 +242,6 @@ class S3Helper File.join("uploads", RailsMultisite::ConnectionManagement.current_db, "/") end - def s3_client - @s3_client ||= Aws::S3::Client.new(@s3_options) - end - def s3_resource Aws::S3::Resource.new(client: s3_client) end diff --git a/lib/s3_inventory.rb b/lib/s3_inventory.rb new file mode 100644 index 00000000000..321182f3d6a --- /dev/null +++ b/lib/s3_inventory.rb @@ -0,0 +1,193 @@ +# frozen_string_literal: true + +require "aws-sdk-s3" +require "csv" + +class S3Inventory + + attr_reader :inventory_id, :csv_filename, :model + + CSV_KEY_INDEX ||= 1 + CSV_ETAG_INDEX ||= 2 + INVENTORY_PREFIX ||= "inventory" + + def initialize(s3_helper, type) + @s3_helper = s3_helper + + if type == :upload + @inventory_id = "original" + @model = Upload + elsif type == :optimized + @inventory_id = "optimized" + @model = OptimizedImage + end + end + + def file + @file ||= unsorted_files.sort_by { |file| -file.last_modified.to_i }.first + end + + def list_missing + if file.blank? + error("Failed to list inventory from S3") + return + end + + DistributedMutex.synchronize("s3_inventory_list_missing_#{inventory_id}") do + current_db = RailsMultisite::ConnectionManagement.current_db + timestamp = Time.now.strftime("%Y-%m-%d-%H%M%S") + @tmp_directory = File.join(Rails.root, "tmp", INVENTORY_PREFIX, current_db, timestamp) + @archive_filename = File.join(@tmp_directory, File.basename(file.key)) + @csv_filename = @archive_filename[0...-3] + + FileUtils.mkdir_p(@tmp_directory) + download_inventory_file_to_tmp_directory + decompress_inventory_file + + begin + table_name = "#{inventory_id}_inventory" + connection = ActiveRecord::Base.connection.raw_connection + connection.exec("CREATE TEMP TABLE #{table_name}(key text UNIQUE, etag text PRIMARY KEY)") + connection.copy_data("COPY #{table_name} FROM STDIN CSV") do + CSV.foreach(csv_filename, headers: false) do |row| + connection.put_copy_data("#{row[CSV_KEY_INDEX]},#{row[CSV_ETAG_INDEX]}\n") + end + end + + uploads = model.where("created_at < ?", file.last_modified) + missing_uploads = uploads.joins("LEFT JOIN #{table_name} ON #{table_name}.etag = #{model.table_name}.etag").where("#{table_name}.etag is NULL") + + if (missing_count = missing_uploads.count) > 0 + missing_uploads.select(:id, :url).find_each do |upload| + log upload.url + end + + log "#{missing_count} of #{uploads.count} #{model.name.underscore.pluralize} are missing" + end + ensure + connection.exec("DROP TABLE #{table_name}") unless connection.nil? + end + end + end + + def download_inventory_file_to_tmp_directory + log "Downloading inventory file to tmp directory..." + failure_message = "Failed to inventory file to tmp directory." + + @s3_helper.download_file(file.key, @archive_filename, failure_message) + end + + def decompress_inventory_file + log "Decompressing inventory file, this may take a while..." + + FileUtils.cd(@tmp_directory) do + Discourse::Utils.execute_command('gzip', '--decompress', @archive_filename, failure_message: "Failed to decompress inventory file.") + end + end + + def update_bucket_policy + @s3_helper.s3_client.put_bucket_policy( + bucket: bucket_name, + policy: { + "Version": "2012-10-17", + "Statement": [ + { + "Sid": "InventoryAndAnalyticsPolicy", + "Effect": "Allow", + "Principal": { "Service": "s3.amazonaws.com" }, + "Action": ["s3:PutObject"], + "Resource": ["arn:aws:s3:::#{inventory_root_path}/*"], + "Condition": { + "ArnLike": { + "aws:SourceArn": "arn:aws:s3:::#{bucket_name}" + }, + "StringEquals": { + "s3:x-amz-acl": "bucket-owner-full-control" + } + } + } + ] + }.to_json + ) + end + + def update_bucket_inventory_configuration + @s3_helper.s3_client.put_bucket_inventory_configuration( + bucket: bucket_name, + id: inventory_id, + inventory_configuration: inventory_configuration, + use_accelerate_endpoint: false + ) + end + + private + + def inventory_configuration + filter_prefix = inventory_id + destination_prefix = File.join(INVENTORY_PREFIX, inventory_id) + + if bucket_folder_path.present? + filter_prefix = File.join(bucket_folder_path, filter_prefix) + destination_prefix = File.join(bucket_folder_path, destination_prefix) + end + + { + destination: { + s3_bucket_destination: { + bucket: "arn:aws:s3:::#{bucket_name}", + prefix: destination_prefix, + format: "CSV" + } + }, + filter: { + prefix: filter_prefix + }, + is_enabled: SiteSetting.enable_s3_inventory, + id: inventory_id, + included_object_versions: "Current", + optional_fields: ["ETag"], + schedule: { + frequency: "Daily" + } + } + end + + def bucket_name + @s3_helper.s3_bucket_name + end + + def bucket_folder_path + @s3_helper.s3_bucket_folder_path + end + + def unsorted_files + objects = [] + + @s3_helper.list(File.join(inventory_path, "data")).each do |obj| + if obj.key.match?(/\.csv\.gz$/i) + objects << obj + end + end + + objects + rescue Aws::Errors::ServiceError => e + log("Failed to list inventory from S3", e) + end + + def inventory_path + File.join(inventory_root_path, inventory_id) + end + + def inventory_root_path + File.join(bucket_name, bucket_folder_path || "", INVENTORY_PREFIX) + end + + def log(message, ex = nil) + puts(message) + Rails.logger.error("#{ex}\n" + (ex.backtrace || []).join("\n")) if ex + end + + def error(message) + log(message, StandardError.new(message)) + end +end diff --git a/lib/site_settings/validations.rb b/lib/site_settings/validations.rb index 2d03fefc3cf..eaab98f5f22 100644 --- a/lib/site_settings/validations.rb +++ b/lib/site_settings/validations.rb @@ -53,6 +53,10 @@ module SiteSettings::Validations validate_error :s3_upload_bucket_is_required if new_val == "t" && SiteSetting.s3_upload_bucket.blank? end + def validate_enable_s3_inventory(new_val) + validate_error :enable_s3_uploads_is_required if new_val == "t" && !SiteSetting.enable_s3_uploads? + end + def validate_backup_location(new_val) return unless new_val == BackupLocationSiteSetting::S3 validate_error(:s3_backup_requires_s3_settings, setting_name: "s3_backup_bucket") if SiteSetting.s3_backup_bucket.blank? diff --git a/spec/components/migration/safe_migrate_spec.rb b/spec/components/migration/safe_migrate_spec.rb index 62b94805cbd..0255006adb3 100644 --- a/spec/components/migration/safe_migrate_spec.rb +++ b/spec/components/migration/safe_migrate_spec.rb @@ -11,16 +11,6 @@ describe Migration::SafeMigrate do Migration::SafeMigrate::SafeMigration.enable_safe! end - def capture_stdout - old_stdout = $stdout - io = StringIO.new - $stdout = io - yield - io.string - ensure - $stdout = old_stdout - end - def migrate_up(path) migrations = ActiveRecord::MigrationContext.new(path).migrations ActiveRecord::Migrator.new(:up, migrations, migrations.first.version).run diff --git a/spec/components/s3_inventory_spec.rb b/spec/components/s3_inventory_spec.rb new file mode 100644 index 00000000000..5f147f370f4 --- /dev/null +++ b/spec/components/s3_inventory_spec.rb @@ -0,0 +1,77 @@ +require "rails_helper" +require "s3_helper" +require "s3_inventory" +require "file_store/s3_store" + +describe "S3Inventory" do + let(:client) { Aws::S3::Client.new(stub_responses: true) } + let(:helper) { S3Helper.new(SiteSetting.Upload.s3_upload_bucket.downcase, "", client: client) } + let(:store) { FileStore::S3Store.new(helper) } + let(:inventory) { S3Inventory.new(helper, :upload) } + let(:csv_filename) { "#{Rails.root}/spec/fixtures/csv/s3_inventory.csv" } + + before do + SiteSetting.enable_s3_uploads = true + SiteSetting.s3_access_key_id = "abc" + SiteSetting.s3_secret_access_key = "def" + SiteSetting.enable_s3_inventory = true + + client.stub_responses(:list_objects, + contents: [ + { + etag: "\"70ee1738b6b21e2c8a43f3a5ab0eee71\"", + key: "example1.csv.gz", + last_modified: Time.parse("2014-11-21T19:40:05.000Z"), + owner: { + display_name: "myname", + id: "12345example25102679df27bb0ae12b3f85be6f290b936c4393484be31bebcc", + }, + size: 11, + storage_class: "STANDARD", + }, + { + etag: "\"9c8af9a76df052144598c115ef33e511\"", + key: "example2.csv.gz", + last_modified: Time.parse("2013-11-15T01:10:49.000Z"), + owner: { + display_name: "myname", + id: "12345example25102679df27bb0ae12b3f85be6f290b936c4393484be31bebcc", + }, + size: 713193, + storage_class: "STANDARD", + } + ], + next_marker: "eyJNYXJrZXIiOiBudWxsLCAiYm90b190cnVuY2F0ZV9hbW91bnQiOiAyfQ==" + ) + end + + it "should return the latest inventory file name" do + expect(inventory.file.key).to eq("example1.csv.gz") + end + + it "should raise error if an inventory file is not found" do + client.stub_responses(:list_objects, contents: []) + output = capture_stdout { inventory.list_missing } + expect(output).to eq("Failed to list inventory from S3\n") + end + + it "should display missing uploads correctly" do + freeze_time + + CSV.foreach(csv_filename, headers: false) do |row| + Fabricate(:upload, etag: row[S3Inventory::CSV_ETAG_INDEX], created_at: 2.days.ago) + end + upload = Fabricate(:upload, etag: "ETag", created_at: 1.days.ago) + Fabricate(:upload, etag: "ETag2", created_at: Time.now) + + inventory.expects(:decompress_inventory_file) + inventory.expects(:csv_filename).returns(csv_filename) + inventory.file.expects(:last_modified).returns(Time.now) + + output = capture_stdout do + inventory.list_missing + end + + expect(output).to eq("Downloading inventory file to tmp directory...\n#{upload.url}\n1 of 4 uploads are missing\n") + end +end diff --git a/spec/fixtures/csv/s3_inventory.csv b/spec/fixtures/csv/s3_inventory.csv new file mode 100644 index 00000000000..04325fb7c39 --- /dev/null +++ b/spec/fixtures/csv/s3_inventory.csv @@ -0,0 +1,3 @@ +"abc","original/0184537a4f419224404d013414e913a4f56018f2.jpg","defcaac0b4aca535c284e95f30d608d0" +"abc","original/050afc0ab01debe8cf48fd2ce50fbbf5eb072815.jpg","0cdc623af39cde0adb382670a6dc702a" +"abc","original/0789fbf5490babc68326b9cec90eeb0d6590db05.png","25c02eaceef4cb779fc17030d33f7f06" diff --git a/spec/jobs/update_s3_inventory_spec.rb b/spec/jobs/update_s3_inventory_spec.rb new file mode 100644 index 00000000000..1bcb8985a79 --- /dev/null +++ b/spec/jobs/update_s3_inventory_spec.rb @@ -0,0 +1,50 @@ +require 'rails_helper' +require "file_store/s3_store" + +describe Jobs::UpdateS3Inventory do + before do + SiteSetting.enable_s3_uploads = true + SiteSetting.s3_access_key_id = "abc" + SiteSetting.s3_secret_access_key = "def" + SiteSetting.enable_s3_inventory = true + + store = FileStore::S3Store.new + @client = Aws::S3::Client.new(stub_responses: true) + store.s3_helper.stubs(:s3_client).returns(@client) + Discourse.stubs(:store).returns(store) + end + + it "updates the bucket policy and inventory configuration in S3" do + id = "original" + @client.expects(:put_bucket_policy).with( + bucket: "bucket", + policy: "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Sid\":\"InventoryAndAnalyticsPolicy\",\"Effect\":\"Allow\",\"Principal\":{\"Service\":\"s3.amazonaws.com\"},\"Action\":[\"s3:PutObject\"],\"Resource\":[\"arn:aws:s3:::bucket/inventory/*\"],\"Condition\":{\"ArnLike\":{\"aws:SourceArn\":\"arn:aws:s3:::bucket\"},\"StringEquals\":{\"s3:x-amz-acl\":\"bucket-owner-full-control\"}}}]}" + ) + @client.expects(:put_bucket_inventory_configuration) + @client.expects(:put_bucket_inventory_configuration).with( + bucket: "bucket", + id: id, + inventory_configuration: { + destination: { + s3_bucket_destination: { + bucket: "arn:aws:s3:::bucket", + prefix: "inventory/#{id}", + format: "CSV" + } + }, + filter: { + prefix: id + }, + is_enabled: true, + id: id, + included_object_versions: "Current", + optional_fields: ["ETag"], + schedule: { frequency: "Daily" } + }, + use_accelerate_endpoint: false + ) + + described_class.new.execute(nil) + end + +end diff --git a/spec/support/helpers.rb b/spec/support/helpers.rb index 8877d23a9f0..455230e1b48 100644 --- a/spec/support/helpers.rb +++ b/spec/support/helpers.rb @@ -102,4 +102,14 @@ module Helpers tag_group.tags << (Tag.where(name: name).first || Fabricate(:tag, name: name)) end end + + def capture_stdout + old_stdout = $stdout + io = StringIO.new + $stdout = io + yield + io.string + ensure + $stdout = old_stdout + end end