diff --git a/Gemfile.lock b/Gemfile.lock index 19fa675a27e..741e30714b2 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -178,7 +178,7 @@ GEM mini_mime (>= 0.1.1) maxminddb (0.1.22) memory_profiler (0.9.13) - message_bus (2.2.0) + message_bus (2.2.2) rack (>= 1.1.3) metaclass (0.0.4) method_source (0.9.2) diff --git a/config/discourse_defaults.conf b/config/discourse_defaults.conf index 4f409f574e8..dd4587acc39 100644 --- a/config/discourse_defaults.conf +++ b/config/discourse_defaults.conf @@ -224,6 +224,10 @@ force_anonymous_min_queue_seconds = 1 # only trigger anon if we see more than N requests for this path in last 10 seconds force_anonymous_min_per_10_seconds = 3 +# if a message bus request queues for 100ms or longer, we will reject it and ask consumer +# to back off +reject_message_bus_queue_seconds = 0.1 + # disable search if app server is queueing for longer than this (in seconds) disable_search_queue_threshold = 1 diff --git a/config/initializers/004-message_bus.rb b/config/initializers/004-message_bus.rb index a39b6ee782c..6897ba4be94 100644 --- a/config/initializers/004-message_bus.rb +++ b/config/initializers/004-message_bus.rb @@ -17,6 +17,14 @@ end def setup_message_bus_env(env) return if env["__mb"] + ::Middleware::RequestTracker.populate_request_queue_seconds!(env) + + if queue_time = env["REQUEST_QUEUE_SECONDS"] + if queue_time > (GlobalSetting.reject_message_bus_queue_seconds).to_f + raise RateLimiter::LimitExceeded, 30 + (rand * 120).to_i + end + end + host = RailsMultisite::ConnectionManagement.host(env) RailsMultisite::ConnectionManagement.with_hostname(host) do extra_headers = { diff --git a/lib/middleware/request_tracker.rb b/lib/middleware/request_tracker.rb index 470ab5f1671..2a75adaf2e9 100644 --- a/lib/middleware/request_tracker.rb +++ b/lib/middleware/request_tracker.rb @@ -139,17 +139,23 @@ class Middleware::RequestTracker end + def self.populate_request_queue_seconds!(env) + if !env['REQUEST_QUEUE_SECONDS'] + if queue_start = env['HTTP_X_REQUEST_START'] + queue_start = queue_start.split("t=")[1].to_f + queue_time = (Time.now.to_f - queue_start) + env['REQUEST_QUEUE_SECONDS'] = queue_time + end + end + end + def call(env) result = nil log_request = true # doing this as early as possible so we have an # accurate counter - if queue_start = env['HTTP_X_REQUEST_START'] - queue_start = queue_start.split("t=")[1].to_f - queue_time = (Time.now.to_f - queue_start) - env['REQUEST_QUEUE_SECONDS'] = queue_time - end + ::Middleware::RequestTracker.populate_request_queue_seconds!(env) request = Rack::Request.new(env) diff --git a/spec/integration/rate_limiting_spec.rb b/spec/integration/rate_limiting_spec.rb index 87a58490d45..5f609cf3196 100644 --- a/spec/integration/rate_limiting_spec.rb +++ b/spec/integration/rate_limiting_spec.rb @@ -14,6 +14,31 @@ describe 'rate limiter integration' do RateLimiter.disable end + it "will rate limit message bus requests once queueing" do + freeze_time + + global_setting :reject_message_bus_queue_seconds, 0.1 + + post "/message-bus/#{SecureRandom.hex}/poll", headers: { + "HTTP_X_REQUEST_START" => "t=#{Time.now.to_f - 0.2}" + } + + expect(response.status).to eq(429) + expect(response.headers['Retry-After']).to be > 29 + end + + it "will not rate limit when all is good" do + freeze_time + + global_setting :reject_message_bus_queue_seconds, 0.1 + + post "/message-bus/#{SecureRandom.hex}/poll", headers: { + "HTTP_X_REQUEST_START" => "t=#{Time.now.to_f - 0.05}" + } + + expect(response.status).to eq(200) + end + it "will clear the token cookie if invalid" do name = Auth::DefaultCurrentUserProvider::TOKEN_COOKIE