From 02a9429c384befb26fbc0728a181c4c019890218 Mon Sep 17 00:00:00 2001 From: Tarek Khalil <45508821+khalilovcmded@users.noreply.github.com> Date: Wed, 17 Apr 2019 10:03:23 +0100 Subject: [PATCH] REFACTOR: Quick refactor of the webhook event emitter job (#7385) * REFACTOR: Quick refactor of the webhook event emitter job --- app/jobs/regular/emit_web_hook_event.rb | 208 ++++++++++++++---------- spec/jobs/emit_web_hook_event_spec.rb | 6 +- spec/models/web_hook_spec.rb | 2 +- 3 files changed, 125 insertions(+), 91 deletions(-) diff --git a/app/jobs/regular/emit_web_hook_event.rb b/app/jobs/regular/emit_web_hook_event.rb index cdba6da734a..4095f3b9d7e 100644 --- a/app/jobs/regular/emit_web_hook_event.rb +++ b/app/jobs/regular/emit_web_hook_event.rb @@ -7,62 +7,39 @@ module Jobs RETRY_BACKOFF = 5 def execute(args) - %i{ - web_hook_id - event_type - }.each do |key| - raise Discourse::InvalidParameters.new(key) unless args[key].present? + memoize_arguments(args) + validate_arguments! + + unless ping_event?(arguments[:event_type]) + validate_argument!(:payload) + + return if webhook_inactive? + return if group_webhook_invalid? + return if category_webhook_invalid? + return if tag_webhook_invalid? end - @orig_args = args.dup - - web_hook = WebHook.find_by(id: args[:web_hook_id]) - raise Discourse::InvalidParameters.new(:web_hook_id) if web_hook.blank? - - unless ping_event?(args[:event_type]) - return unless web_hook.active? - - return if web_hook.group_ids.present? && (args[:group_id].present? || - !web_hook.group_ids.include?(args[:group_id])) - - return if web_hook.category_ids.present? && (!args[:category_id].present? || - !web_hook.category_ids.include?(args[:category_id])) - - return if web_hook.tag_ids.present? && (args[:tag_ids].blank? || - (web_hook.tag_ids & args[:tag_ids]).blank?) - - raise Discourse::InvalidParameters.new(:payload) unless args[:payload].present? - args[:payload] = JSON.parse(args[:payload]) - end - - web_hook_request(args, web_hook) + send_webhook! end private - def guardian - Guardian.new(Discourse.system_user) + def validate_arguments! + validate_argument!(:web_hook_id) + validate_argument!(:event_type) + raise Discourse::InvalidParameters.new(:web_hook_id) if web_hook.blank? end - def ping_event?(event_type) - PING_EVENT == event_type.to_s + def validate_argument!(key) + raise Discourse::InvalidParameters.new(key) unless arguments[key].present? end - def build_web_hook_body(args, web_hook) - body = {} - event_type = args[:event_type].to_s - - if ping_event?(event_type) - body[:ping] = 'OK' - else - body[event_type] = args[:payload] - end - - new_body = Plugin::Filter.apply(:after_build_web_hook_body, self, body) - MultiJson.dump(new_body) + def memoize_arguments(args) + @arguments = args + @retry_count = @arguments[:retry_count] || 0 end - def web_hook_request(args, web_hook) + def send_webhook! uri = URI(web_hook.payload_url.strip) conn = Excon.new( @@ -71,42 +48,17 @@ module Jobs retry_limit: 0 ) - body = build_web_hook_body(args, web_hook) - web_hook_event = WebHookEvent.create!(web_hook_id: web_hook.id, payload: body) + web_hook_body = build_webhook_body + web_hook_event = create_webhook_event(web_hook_body) + web_hook_headers = build_webhook_headers(uri, web_hook_body, web_hook_event) + response = nil begin - content_type = - case web_hook.content_type - when WebHook.content_types['application/x-www-form-urlencoded'] - 'application/x-www-form-urlencoded' - else - 'application/json' - end - - headers = { - 'Accept' => '*/*', - 'Connection' => 'close', - 'Content-Length' => body.bytesize, - 'Content-Type' => content_type, - 'Host' => uri.host, - 'User-Agent' => "Discourse/#{Discourse::VERSION::STRING}", - 'X-Discourse-Instance' => Discourse.base_url, - 'X-Discourse-Event-Id' => web_hook_event.id, - 'X-Discourse-Event-Type' => args[:event_type] - } - - headers['X-Discourse-Event'] = args[:event_name].to_s if args[:event_name].present? - - if web_hook.secret.present? - headers['X-Discourse-Event-Signature'] = "sha256=#{OpenSSL::HMAC.hexdigest("sha256", web_hook.secret, body)}" - end - now = Time.zone.now - response = conn.post(headers: headers, body: body) - + response = conn.post(headers: web_hook_headers, body: web_hook_body) web_hook_event.update!( - headers: MultiJson.dump(headers), + headers: MultiJson.dump(web_hook_headers), status: response.status, response_headers: MultiJson.dump(response.headers), response_body: response.body, @@ -114,28 +66,114 @@ module Jobs ) rescue => e web_hook_event.update!( - headers: MultiJson.dump(headers), + headers: MultiJson.dump(web_hook_headers), status: -1, response_headers: MultiJson.dump(error: e), duration: ((Time.zone.now - now) * 1000).to_i ) end - MessageBus.publish("/web_hook_events/#{web_hook.id}", { - web_hook_event_id: web_hook_event.id, - event_type: args[:event_type] - }, user_ids: User.human_users.staff.pluck(:id)) - + publish_webhook_event(web_hook_event) retry_web_hook if response&.status != 200 end def retry_web_hook if SiteSetting.retry_web_hook_events? - @orig_args[:retry_count] = (@orig_args[:retry_count] || 0) + 1 - return if @orig_args[:retry_count] > MAX_RETRY_COUNT - delay = RETRY_BACKOFF**(@orig_args[:retry_count] - 1) - Jobs.enqueue_in(delay.minutes, :emit_web_hook_event, @orig_args) + @retry_count += 1 + return if @retry_count > MAX_RETRY_COUNT + delay = RETRY_BACKOFF**(@retry_count - 1) + Jobs.enqueue_in(delay.minutes, :emit_web_hook_event, arguments) end end + + def publish_webhook_event(web_hook_event) + MessageBus.publish("/web_hook_events/#{web_hook.id}", { + web_hook_event_id: web_hook_event.id, + event_type: arguments[:event_type] + }, user_ids: User.human_users.staff.pluck(:id)) + end + + def ping_event?(event_type) + PING_EVENT == event_type + end + + def webhook_inactive? + !web_hook.active? + end + + def group_webhook_invalid? + web_hook.group_ids.present? && (arguments[:group_id].present? || + !web_hook.group_ids.include?(arguments[:group_id])) + end + + def category_webhook_invalid? + web_hook.category_ids.present? && (!arguments[:category_id].present? || + !web_hook.category_ids.include?(arguments[:category_id])) + end + + def tag_webhook_invalid? + web_hook.tag_ids.present? && (arguments[:tag_ids].blank? || + (web_hook.tag_ids & arguments[:tag_ids]).blank?) + end + + def arguments + @arguments + end + + def parsed_payload + @parsed_payload ||= JSON.parse(arguments[:payload]) + end + + def web_hook + @web_hook ||= WebHook.find_by(id: arguments[:web_hook_id]) + end + + def build_webhook_headers(uri, web_hook_body, web_hook_event) + content_type = + case web_hook.content_type + when WebHook.content_types['application/x-www-form-urlencoded'] + 'application/x-www-form-urlencoded' + else + 'application/json' + end + + headers = { + 'Accept' => '*/*', + 'Connection' => 'close', + 'Content-Length' => web_hook_body.bytesize, + 'Content-Type' => content_type, + 'Host' => uri.host, + 'User-Agent' => "Discourse/#{Discourse::VERSION::STRING}", + 'X-Discourse-Instance' => Discourse.base_url, + 'X-Discourse-Event-Id' => web_hook_event.id, + 'X-Discourse-Event-Type' => arguments[:event_type] + } + + headers['X-Discourse-Event'] = arguments[:event_name] if arguments[:event_name].present? + + if web_hook.secret.present? + headers['X-Discourse-Event-Signature'] = "sha256=#{OpenSSL::HMAC.hexdigest("sha256", web_hook.secret, web_hook_body)}" + end + + headers + end + + def build_webhook_body + body = {} + + if ping_event?(arguments[:event_type]) + body['ping'] = "OK" + else + body[arguments[:event_type]] = parsed_payload + end + + new_body = Plugin::Filter.apply(:after_build_web_hook_body, self, body) + MultiJson.dump(new_body) + end + + def create_webhook_event(web_hook_body) + WebHookEvent.create!(web_hook_id: web_hook.id, payload: web_hook_body) + end + end end diff --git a/spec/jobs/emit_web_hook_event_spec.rb b/spec/jobs/emit_web_hook_event_spec.rb index ceb3f84df0a..95cc8d4b909 100644 --- a/spec/jobs/emit_web_hook_event_spec.rb +++ b/spec/jobs/emit_web_hook_event_spec.rb @@ -52,10 +52,6 @@ describe Jobs::EmitWebHookEvent do event_type: described_class::PING_EVENT ) end.to change { Jobs::EmitWebHookEvent.jobs.size }.by(1) - - job = Jobs::EmitWebHookEvent.jobs.first - args = job["args"].first - expect(args["retry_count"]).to eq(1) end it 'does not retry for more than maximum allowed times' do @@ -189,7 +185,7 @@ describe Jobs::EmitWebHookEvent do end end - describe '#web_hook_request' do + describe '#send_webhook!' do it 'creates delivery event record' do stub_request(:post, post_hook.payload_url) .to_return(body: 'OK', status: 200) diff --git a/spec/models/web_hook_spec.rb b/spec/models/web_hook_spec.rb index 792efb21b3e..7e038006608 100644 --- a/spec/models/web_hook_spec.rb +++ b/spec/models/web_hook_spec.rb @@ -291,7 +291,7 @@ describe WebHook do PostDestroyer.new(user, post).destroy job = Jobs::EmitWebHookEvent.new - job.expects(:web_hook_request).times(2) + job.expects(:send_webhook!).times(2) args = Jobs::EmitWebHookEvent.jobs[1]["args"].first job.execute(args.with_indifferent_access)