discourse/lib/s3_inventory.rb

290 lines
8.2 KiB
Ruby

# frozen_string_literal: true
require "aws-sdk-s3"
require "csv"
class S3Inventory
attr_reader :type, :model, :inventory_date
CSV_KEY_INDEX ||= 1
CSV_ETAG_INDEX ||= 2
INVENTORY_PREFIX ||= "inventory"
INVENTORY_VERSION ||= "1"
def initialize(s3_helper, type)
@s3_helper = s3_helper
if type == :upload
@type = "original"
@model = Upload
elsif type == :optimized
@type = "optimized"
@model = OptimizedImage
end
end
def backfill_etags_and_list_missing
if files.blank?
error("Failed to list inventory from S3")
return
end
DistributedMutex.synchronize("s3_inventory_list_missing_#{type}") do
download_inventory_files_to_tmp_directory
decompress_inventory_files
multisite_prefix = "uploads/#{RailsMultisite::ConnectionManagement.current_db}/"
ActiveRecord::Base.transaction do
begin
connection.exec("CREATE TEMP TABLE #{table_name}(url text UNIQUE, etag text, PRIMARY KEY(etag, url))")
connection.copy_data("COPY #{table_name} FROM STDIN CSV") do
files.each do |file|
CSV.foreach(file[:filename][0...-3], headers: false) do |row|
key = row[CSV_KEY_INDEX]
next if Rails.configuration.multisite && key.exclude?(multisite_prefix)
url = File.join(Discourse.store.absolute_base_url, key)
connection.put_copy_data("#{url},#{row[CSV_ETAG_INDEX]}\n")
end
end
end
# backfilling etags
connection.async_exec("UPDATE #{model.table_name}
SET etag = #{table_name}.etag
FROM #{table_name}
WHERE #{model.table_name}.etag IS NULL
AND #{model.table_name}.url = #{table_name}.url")
list_missing_post_uploads if type == "original"
uploads = (model == Upload) ? model.by_users.where("created_at < ?", inventory_date) : model
missing_uploads = uploads
.joins("LEFT JOIN #{table_name} ON #{table_name}.etag = #{model.table_name}.etag")
.where("#{table_name}.etag IS NULL AND #{model.table_name}.etag IS NOT NULL")
if (missing_count = missing_uploads.count) > 0
missing_uploads.select(:id, :url).find_each do |upload|
log upload.url
end
log "#{missing_count} of #{uploads.count} #{model.name.underscore.pluralize} are missing"
end
Discourse.stats.set("missing_s3_#{model.table_name}", missing_count)
ensure
connection.exec("DROP TABLE #{table_name}") unless connection.nil?
end
end
end
end
def list_missing_post_uploads
log "Listing missing post uploads..."
missing = Post.find_missing_uploads(include_local_upload: false) do |_, _, _, sha1|
next if sha1.blank?
upload_id = nil
result = connection.exec("SELECT * FROM #{table_name} WHERE url LIKE '%original/%/#{sha1}%'")
if result.count >= 1
begin
url = result[0]["url"]
key = url.sub(/^#{Discourse.store.absolute_base_url}\//, "")
data = @s3_helper.object(key).data
filename = (data.content_disposition&.match(/filename=\"(.*)\"/) || [])[1]
upload = Upload.new(
user_id: Discourse.system_user.id,
original_filename: filename || File.basename(key),
filesize: data.content_length,
url: url,
sha1: sha1,
etag: result[0]["etag"]
)
upload.save!(validate: false)
upload_id = upload.id
rescue Aws::S3::Errors::NotFound
next
end
end
upload_id
end
Discourse.stats.set("missing_post_uploads", missing[:count])
log "#{missing[:count]} post uploads are missing."
end
def download_inventory_files_to_tmp_directory
files.each do |file|
log "Downloading inventory file '#{file[:key]}' to tmp directory..."
failure_message = "Failed to inventory file '#{file[:key]}' to tmp directory."
@s3_helper.download_file(file[:key], file[:filename], failure_message)
end
end
def decompress_inventory_files
files.each do |file|
log "Decompressing inventory file '#{file[:filename]}', this may take a while..."
Discourse::Utils.execute_command('gzip', '--decompress', file[:filename], failure_message: "Failed to decompress inventory file '#{file[:filename]}'.", chdir: tmp_directory)
end
end
def update_bucket_policy
@s3_helper.s3_client.put_bucket_policy(
bucket: bucket_name,
policy: {
"Version": "2012-10-17",
"Statement": [
{
"Sid": "InventoryAndAnalyticsPolicy",
"Effect": "Allow",
"Principal": { "Service": "s3.amazonaws.com" },
"Action": ["s3:PutObject"],
"Resource": ["#{inventory_path_arn}/*"],
"Condition": {
"ArnLike": {
"aws:SourceArn": bucket_arn
},
"StringEquals": {
"s3:x-amz-acl": "bucket-owner-full-control"
}
}
}
]
}.to_json
)
end
def update_bucket_inventory_configuration
@s3_helper.s3_client.put_bucket_inventory_configuration(
bucket: bucket_name,
id: inventory_id,
inventory_configuration: inventory_configuration,
use_accelerate_endpoint: false
)
end
private
def connection
@connection ||= ActiveRecord::Base.connection.raw_connection
end
def table_name
"#{type}_inventory"
end
def files
@files ||= begin
symlink_file = unsorted_files.sort_by { |file| -file.last_modified.to_i }.first
return [] if symlink_file.blank?
@inventory_date = symlink_file.last_modified - 1.day
log "Downloading symlink file to tmp directory..."
failure_message = "Failed to download symlink file to tmp directory."
filename = File.join(tmp_directory, File.basename(symlink_file.key))
@s3_helper.download_file(symlink_file.key, filename, failure_message)
File.readlines(filename).map do |key|
key = key.sub("s3://#{bucket_name}/", "").sub("\n", "")
{ key: key, filename: File.join(tmp_directory, File.basename(key)) }
end
end
end
def tmp_directory
@tmp_directory ||= begin
current_db = RailsMultisite::ConnectionManagement.current_db
timestamp = Time.now.strftime("%Y-%m-%d-%H%M%S")
directory = File.join(Rails.root, "tmp", INVENTORY_PREFIX, current_db, timestamp)
FileUtils.mkdir_p(directory)
directory
end
end
def inventory_configuration
filter_prefix = type
filter_prefix = File.join(bucket_folder_path, filter_prefix) if bucket_folder_path.present?
{
destination: {
s3_bucket_destination: {
bucket: bucket_arn,
prefix: inventory_path,
format: "CSV"
}
},
filter: {
prefix: filter_prefix
},
is_enabled: SiteSetting.enable_s3_inventory,
id: inventory_id,
included_object_versions: "Current",
optional_fields: ["ETag"],
schedule: {
frequency: "Daily"
}
}
end
def bucket_name
@s3_helper.s3_bucket_name
end
def bucket_folder_path
@s3_helper.s3_bucket_folder_path
end
def unsorted_files
objects = []
hive_path = File.join(inventory_path, bucket_name, inventory_id, "hive")
@s3_helper.list(hive_path).each do |obj|
if obj.key.match?(/symlink\.txt$/i)
objects << obj
end
end
objects
rescue Aws::Errors::ServiceError => e
log("Failed to list inventory from S3", e)
end
def inventory_id
@inventory_id ||= begin
if bucket_folder_path.present?
"#{bucket_folder_path}-#{type}"
else
type
end
end
end
def inventory_path_arn
File.join(bucket_arn, inventory_path)
end
def inventory_path
path = File.join(INVENTORY_PREFIX, INVENTORY_VERSION)
path = File.join(bucket_folder_path, path) if bucket_folder_path.present?
path
end
def bucket_arn
"arn:aws:s3:::#{bucket_name}"
end
def log(message, ex = nil)
puts(message)
Rails.logger.error("#{ex}\n" + (ex.backtrace || []).join("\n")) if ex
end
def error(message)
log(message, StandardError.new(message))
end
end