# frozen_string_literal: true

require "aws-sdk-s3"

module FileStore
  ToS3MigrationError = Class.new(RuntimeError)

  class ToS3Migration
    MISSING_UPLOADS_RAKE_TASK_NAME ||= "posts:missing_uploads"
    UPLOAD_CONCURRENCY ||= 20

    def initialize(
      s3_options:,
      dry_run: false,
      migrate_to_multisite: false,
      skip_etag_verify: false
    )
      @s3_bucket = s3_options[:bucket]
      @s3_client_options = s3_options[:client_options]
      @dry_run = dry_run
      @migrate_to_multisite = migrate_to_multisite
      @skip_etag_verify = skip_etag_verify
      @current_db = RailsMultisite::ConnectionManagement.current_db
    end

    def self.s3_options_from_site_settings
      {
        client_options: S3Helper.s3_options(SiteSetting),
        bucket: SiteSetting.Upload.s3_upload_bucket,
      }
    end

    def self.s3_options_from_env
      unless ENV["DISCOURSE_S3_BUCKET"].present? && ENV["DISCOURSE_S3_REGION"].present? &&
               (
                 (
                   ENV["DISCOURSE_S3_ACCESS_KEY_ID"].present? &&
                     ENV["DISCOURSE_S3_SECRET_ACCESS_KEY"].present?
                 ) || ENV["DISCOURSE_S3_USE_IAM_PROFILE"].present?
               )
        raise ToS3MigrationError.new(<<~TEXT)
          Please provide the following environment variables:
            - DISCOURSE_S3_BUCKET
            - DISCOURSE_S3_REGION
            and either
            - DISCOURSE_S3_ACCESS_KEY_ID
            - DISCOURSE_S3_SECRET_ACCESS_KEY
            or
            - DISCOURSE_S3_USE_IAM_PROFILE
        TEXT
      end

      opts = { region: ENV["DISCOURSE_S3_REGION"] }
      opts[:endpoint] = ENV["DISCOURSE_S3_ENDPOINT"] if ENV["DISCOURSE_S3_ENDPOINT"].present?

      if ENV["DISCOURSE_S3_USE_IAM_PROFILE"].blank?
        opts[:access_key_id] = ENV["DISCOURSE_S3_ACCESS_KEY_ID"]
        opts[:secret_access_key] = ENV["DISCOURSE_S3_SECRET_ACCESS_KEY"]
      end

      { client_options: opts, bucket: ENV["DISCOURSE_S3_BUCKET"] }
    end

    def migrate
      migrate_to_s3
    end

    def migration_successful?(should_raise: false)
      success = true

      failure_message = "S3 migration failed for db '#{@current_db}'."
      prefix = @migrate_to_multisite ? "uploads/#{@current_db}/original/" : "original/"

      base_url = File.join(SiteSetting.Upload.s3_base_url, prefix)
      count = Upload.by_users.where("url NOT LIKE '#{base_url}%'").count
      if count > 0
        error_message =
          "#{count} of #{Upload.count} uploads are not migrated to S3. #{failure_message}"
        raise_or_log(error_message, should_raise)
        success = false
      end

      cdn_path = SiteSetting.cdn_path("/uploads/#{@current_db}/original").sub(/https?:/, "")
      count = Post.where("cooked LIKE '%#{cdn_path}%'").count
      if count > 0
        error_message = "#{count} posts are not remapped to new S3 upload URL. #{failure_message}"
        raise_or_log(error_message, should_raise)
        success = false
      end

      unless Rake::Task.task_defined?(MISSING_UPLOADS_RAKE_TASK_NAME)
        Discourse::Application.load_tasks
      end
      Rake::Task[MISSING_UPLOADS_RAKE_TASK_NAME]
      count = DB.query_single(<<~SQL, Post::MISSING_UPLOADS, Post::MISSING_UPLOADS_IGNORED).first
        SELECT COUNT(1)
        FROM posts p
        WHERE EXISTS (
          SELECT 1
          FROM post_custom_fields f
          WHERE f.post_id = p.id AND f.name = ?
        ) AND NOT EXISTS (
          SELECT 1
          FROM post_custom_fields f
          WHERE f.post_id = p.id AND f.name = ?
        )
      SQL
      if count > 0
        error_message = "rake posts:missing_uploads identified #{count} issues. #{failure_message}"
        raise_or_log(error_message, should_raise)
        success = false
      end

      count = Post.where("baked_version <> ? OR baked_version IS NULL", Post::BAKED_VERSION).count
      if count > 0
        log("#{count} posts still require rebaking and will be rebaked during regular job")
        if count > 100
          log(
            "To speed up migrations of posts we recommend you run 'rake posts:rebake_uncooked_posts'",
          )
        end
        success = false
      else
        log("No posts require rebaking")
      end

      success
    end

    protected

    def log(message)
      puts message
    end

    def raise_or_log(message, should_raise)
      if should_raise
        raise ToS3MigrationError.new(message)
      else
        log(message)
      end
    end

    def uploads_migrated_to_new_scheme?
      seeded_image_url = "uploads/#{@current_db}/original/_X/"
      !Upload.by_users.where("url NOT LIKE '//%' AND url NOT LIKE '/%#{seeded_image_url}%'").exists?
    end

    def migrate_to_s3
      # we don't want have migrated state, ensure we run all jobs here
      Jobs.run_immediately!

      log "*" * 30 + " DRY RUN " + "*" * 30 if @dry_run
      log "Migrating uploads to S3 for '#{@current_db}'..."

      if !uploads_migrated_to_new_scheme?
        log "Some uploads were not migrated to the new scheme. Running the migration, this may take a while..."
        SiteSetting.migrate_to_new_scheme = true
        Upload.migrate_to_new_scheme

        if !uploads_migrated_to_new_scheme?
          raise ToS3MigrationError.new(
                  "Some uploads could not be migrated to the new scheme. " \
                    "You need to fix this manually.",
                )
        end
      end

      bucket_has_folder_path = true if @s3_bucket.include? "/"
      public_directory = Rails.root.join("public").to_s

      s3 = Aws::S3::Client.new(@s3_client_options)

      if bucket_has_folder_path
        bucket, folder = S3Helper.get_bucket_and_folder_path(@s3_bucket)
        folder = File.join(folder, "/")
      else
        bucket, folder = @s3_bucket, ""
      end

      log "Uploading files to S3..."
      log " - Listing local files"

      local_files = []
      IO
        .popen("cd #{public_directory} && find uploads/#{@current_db}/original -type f")
        .each do |file|
          local_files << file.chomp
          putc "." if local_files.size % 1000 == 0
        end

      log " => #{local_files.size} files"
      log " - Listing S3 files"

      s3_objects = []
      prefix = @migrate_to_multisite ? "uploads/#{@current_db}/original/" : "original/"

      options = { bucket: bucket, prefix: folder + prefix }

      loop do
        response = s3.list_objects_v2(options)
        s3_objects.concat(response.contents)
        putc "."
        break if response.next_continuation_token.blank?
        options[:continuation_token] = response.next_continuation_token
      end

      log " => #{s3_objects.size} files"
      log " - Syncing files to S3"

      queue = Queue.new
      synced = 0
      failed = []

      lock = Mutex.new
      upload_threads =
        UPLOAD_CONCURRENCY.times.map do
          Thread.new do
            while obj = queue.pop
              if s3.put_object(obj[:options]).etag[obj[:etag]]
                putc "."
                lock.synchronize { synced += 1 }
              else
                putc "X"
                lock.synchronize { failed << obj[:path] }
              end
            end
          end
        end

      local_files.each do |file|
        path = File.join(public_directory, file)
        name = File.basename(path)
        etag = Digest::MD5.file(path).hexdigest unless @skip_etag_verify
        key = file[file.index(prefix)..-1]
        key.prepend(folder) if bucket_has_folder_path
        original_path = file.sub("uploads/#{@current_db}", "")

        if s3_object = s3_objects.find { |obj| obj.key.ends_with?(original_path) }
          next if File.size(path) == s3_object.size && (@skip_etag_verify || s3_object.etag[etag])
        end

        options = {
          acl: "public-read",
          body: File.open(path, "rb"),
          bucket: bucket,
          content_type: MiniMime.lookup_by_filename(name)&.content_type,
          key: key,
        }

        if !FileHelper.is_supported_image?(name)
          upload = Upload.find_by(url: "/#{file}")

          if upload&.original_filename
            options[:content_disposition] = ActionDispatch::Http::ContentDisposition.format(
              disposition: "attachment",
              filename: upload.original_filename,
            )
          end

          options[:acl] = "private" if upload&.secure
        elsif !FileHelper.is_inline_image?(name)
          upload = Upload.find_by(url: "/#{file}")
          options[:content_disposition] = ActionDispatch::Http::ContentDisposition.format(
            disposition: "attachment",
            filename: upload&.original_filename || name,
          )
        end

        etag ||= Digest::MD5.file(path).hexdigest

        if @dry_run
          log "#{file} => #{options[:key]}"
          synced += 1
        else
          queue << { path: path, options: options, etag: etag }
        end
      end

      queue.close
      upload_threads.each(&:join)

      puts

      failure_message = "S3 migration failed for db '#{@current_db}'."

      if failed.size > 0
        log "Failed to upload #{failed.size} files"
        log failed.join("\n")
        raise failure_message
      elsif s3_objects.size + synced >= local_files.size
        log "Updating the URLs in the database..."

        from = "/uploads/#{@current_db}/original/"
        to = "#{SiteSetting.Upload.s3_base_url}/#{prefix}"

        if @dry_run
          log "REPLACING '#{from}' WITH '#{to}'"
        else
          DbHelper.remap(from, to, anchor_left: true)
        end

        [
          [
            "src=\"/uploads/#{@current_db}/original/(\\dX/(?:[a-f0-9]/)*[a-f0-9]{40}[a-z0-9\\.]*)",
            "src=\"#{SiteSetting.Upload.s3_base_url}/#{prefix}\\1",
          ],
          [
            "src='/uploads/#{@current_db}/original/(\\dX/(?:[a-f0-9]/)*[a-f0-9]{40}[a-z0-9\\.]*)",
            "src='#{SiteSetting.Upload.s3_base_url}/#{prefix}\\1",
          ],
          [
            "href=\"/uploads/#{@current_db}/original/(\\dX/(?:[a-f0-9]/)*[a-f0-9]{40}[a-z0-9\\.]*)",
            "href=\"#{SiteSetting.Upload.s3_base_url}/#{prefix}\\1",
          ],
          [
            "href='/uploads/#{@current_db}/original/(\\dX/(?:[a-f0-9]/)*[a-f0-9]{40}[a-z0-9\\.]*)",
            "href='#{SiteSetting.Upload.s3_base_url}/#{prefix}\\1",
          ],
          [
            "\\[img\\]/uploads/#{@current_db}/original/(\\dX/(?:[a-f0-9]/)*[a-f0-9]{40}[a-z0-9\\.]*)\\[/img\\]",
            "[img]#{SiteSetting.Upload.s3_base_url}/#{prefix}\\1[/img]",
          ],
        ].each do |from_url, to_url|
          if @dry_run
            log "REPLACING '#{from_url}' WITH '#{to_url}'"
          else
            DbHelper.regexp_replace(from_url, to_url)
          end
        end

        unless @dry_run
          # Legacy inline image format
          Post
            .where("raw LIKE '%![](/uploads/default/original/%)%'")
            .each do |post|
              regexp =
                /!\[\](\/uploads\/#{@current_db}\/original\/(\dX\/(?:[a-f0-9]\/)*[a-f0-9]{40}[a-z0-9\.]*))/

              post
                .raw
                .scan(regexp)
                .each do |upload_url, _|
                  upload = Upload.get_from_url(upload_url)
                  post.raw = post.raw.gsub("![](#{upload_url})", "![](#{upload.short_url})")
                end

              post.save!(validate: false)
            end
        end

        if Discourse.asset_host.present?
          # Uploads that were on local CDN will now be on S3 CDN
          from = "#{Discourse.asset_host}/uploads/#{@current_db}/original/"
          to = "#{SiteSetting.Upload.s3_cdn_url}/#{prefix}"

          if @dry_run
            log "REMAPPING '#{from}' TO '#{to}'"
          else
            DbHelper.remap(from, to)
          end
        end

        # Uploads that were on base hostname will now be on S3 CDN
        from = "#{Discourse.base_url}/uploads/#{@current_db}/original/"
        to = "#{SiteSetting.Upload.s3_cdn_url}/#{prefix}"

        if @dry_run
          log "REMAPPING '#{from}' TO '#{to}'"
        else
          DbHelper.remap(from, to)
        end

        unless @dry_run
          log "Removing old optimized images..."

          OptimizedImage
            .joins("LEFT JOIN uploads u ON optimized_images.upload_id = u.id")
            .where("u.id IS NOT NULL AND u.url LIKE '//%' AND optimized_images.url NOT LIKE '//%'")
            .delete_all

          log "Flagging all posts containing lightboxes for rebake..."

          count = Post.where("cooked LIKE '%class=\"lightbox\"%'").update_all(baked_version: nil)
          log "#{count} posts were flagged for a rebake"
        end
      end

      migration_successful?(should_raise: true)

      log "Done!"
    ensure
      Jobs.run_later!
    end
  end
end