# Initially we used sidetiq, this was a problem:
#
# 1. No mechnism to add "randomisation" into job execution
# 2. No stats about previous runs or failures
# 3. Dependency on ice_cube gem causes runaway CPU

require_dependency 'distributed_mutex'

module Scheduler
  class Manager
    attr_accessor :random_ratio, :redis

    class Runner
      def initialize(manager)
        @mutex = Mutex.new
        @queue = Queue.new
        @manager = manager
        @reschedule_orphans_thread = Thread.new do
          while true
            sleep 1.minute
            @mutex.synchronize do
              reschedule_orphans
            end
          end
        end
        @keep_alive_thread = Thread.new do
          while true
            @mutex.synchronize do
              keep_alive
            end
            sleep (@manager.keep_alive_duration / 2)
          end
        end
        @thread = Thread.new do
          while true
            process_queue
          end
        end
      end

      def keep_alive
        @manager.keep_alive
      rescue => ex
        Discourse.handle_job_exception(ex, {message: "Scheduling manager keep-alive"})
      end

      def reschedule_orphans
        @manager.reschedule_orphans!
      rescue => ex
        Discourse.handle_job_exception(ex, {message: "Scheduling manager orphan rescheduler"})
      end

      def process_queue
        klass = @queue.deq
        # hack alert, I need to both deq and set @running atomically.
        @running = true
        failed = false
        start = Time.now.to_f
        info = @mutex.synchronize { @manager.schedule_info(klass) }
        begin
          info.prev_result = "RUNNING"
          @mutex.synchronize { info.write! }
          klass.new.perform
        rescue Jobs::HandledExceptionWrapper
          # Discourse.handle_exception was already called, and we don't have any extra info to give
          failed = true
        rescue => e
          Discourse.handle_job_exception(e, {message: "Running a scheduled job", job: klass})
          failed = true
        end
        duration = ((Time.now.to_f - start) * 1000).to_i
        info.prev_duration = duration
        info.prev_result = failed ? "FAILED" : "OK"
        info.current_owner = nil
        attempts(3) do
          @mutex.synchronize { info.write! }
        end
      rescue => ex
        Discourse.handle_job_exception(ex, {message: "Processing scheduled job queue"})
      ensure
        @running = false
      end

      def stop!
        @mutex.synchronize do
          @thread.kill
          @keep_alive_thread.kill
          @reschedule_orphans_thread.kill
        end
      end

      def enq(klass)
        @queue << klass
      end

      def wait_till_done
        while !@queue.empty? && !(@queue.num_waiting > 0)
          sleep 0.001
        end
        # this is a hack, but is only used for test anyway
        sleep 0.001
        while @running
          sleep 0.001
        end
      end

      def attempts(n)
        n.times {
          begin
            yield; break
          rescue
            sleep Random.rand
          end
        }
      end

    end

    def self.without_runner(redis=nil)
      self.new(redis, skip_runner: true)
    end

    def initialize(redis = nil, options=nil)
      @redis = $redis || redis
      @random_ratio = 0.1
      unless options && options[:skip_runner]
        @runner = Runner.new(self)
        self.class.current = self
      end

      @hostname = options && options[:hostname]
      @manager_id = SecureRandom.hex
    end

    def self.current
      @current
    end

    def self.current=(manager)
      @current = manager
    end

    def hostname
      @hostname ||= `hostname`.strip
    end

    def schedule_info(klass)
      ScheduleInfo.new(klass, self)
    end

    def next_run(klass)
      schedule_info(klass).next_run
    end

    def ensure_schedule!(klass)
      lock do
        schedule_info(klass).schedule!
      end
    end

    def remove(klass)
      lock do
        schedule_info(klass).del!
      end
    end

    def reschedule_orphans!
      lock do
        reschedule_orphans_on!
        reschedule_orphans_on!(hostname)
      end
    end

    def reschedule_orphans_on!(hostname=nil)
      redis.zrange(Manager.queue_key(hostname), 0, -1).each do |key|
        klass = get_klass(key)
        next unless klass
        info = schedule_info(klass)

        if ['QUEUED', 'RUNNING'].include?(info.prev_result) &&
          (info.current_owner.blank? || !redis.get(info.current_owner))
          info.prev_result = 'ORPHAN'
          info.next_run = Time.now.to_i
          info.write!
        end
      end
    end

    def get_klass(name)
      name.constantize
    rescue NameError
      nil
    end

    def tick
      lock do
        schedule_next_job
        schedule_next_job(hostname)
      end
    end

    def schedule_next_job(hostname=nil)
      (key, due), _ = redis.zrange Manager.queue_key(hostname), 0, 0, withscores: true
      return unless key

      if due.to_i <= Time.now.to_i
        klass = get_klass(key)
        unless klass
          # corrupt key, nuke it (renamed job or something)
          redis.zrem Manager.queue_key(hostname), key
          return
        end
        info = schedule_info(klass)
        info.prev_run = Time.now.to_i
        info.prev_result = "QUEUED"
        info.prev_duration = -1
        info.next_run = nil
        info.current_owner = identity_key
        info.schedule!
        @runner.enq(klass)
      end
    end

    def blocking_tick
      tick
      @runner.wait_till_done
    end

    def stop!
      @runner.stop!
      self.class.current = nil
    end

    def keep_alive_duration
      60
    end

    def keep_alive
      redis.setex identity_key, keep_alive_duration, ""
    end

    def lock
      DistributedMutex.new(Manager.lock_key).synchronize do
        yield
      end
    end


    def self.discover_schedules
      # hack for developemnt reloader is crazytown
      # multiple classes with same name can be in
      # object space
      unique = Set.new
      schedules = []
      ObjectSpace.each_object(Scheduler::Schedule) do |schedule|
        if schedule.scheduled?
          next if unique.include?(schedule.to_s)
          schedules << schedule
          unique << schedule.to_s
        end
      end
      schedules
    end

    @mutex = Mutex.new
    def self.seq
      @mutex.synchronize do
        @i ||= 0
        @i += 1
      end
    end

    def identity_key
      @identity_key ||= "_scheduler_#{hostname}:#{Process.pid}:#{self.class.seq}:#{SecureRandom.hex}"
    end

    def self.lock_key
      "_scheduler_lock_"
    end

    def self.queue_key(hostname=nil)
      if hostname
        "_scheduler_queue_#{hostname}_"
      else
        "_scheduler_queue_"
      end
    end

    def self.schedule_key(klass,hostname=nil)
      if hostname
        "_scheduler_#{klass}_#{hostname}"
      else
        "_scheduler_#{klass}"
      end
    end
  end
end