FEATURE: Add Mechanism to redeliver all failed webhook events (#27609)

Background:
In order to redrive failed webhook events, an operator has to go through and click on each. This PR is adding a mechanism to retry all failed events to help resolve issues quickly once the underlying failure has been resolved.

What is the change?:
Previously, we had to redeliver each webhook event. This merge is adding a 'Redeliver Failed' button next to the webhook event filter to redeliver all failed events. If there is no failed webhook events to redeliver, 'Redeliver Failed' gets disabled. If you click it, a window pops up to confirm the operator. Failed webhook events will be added to the queue and webhook event list will show the redelivering progress. Every minute, a job will be ran to go through 20 events to redeliver. Every hour, a job will cleanup the redelivering events which have been stored more than 8 hours.
This commit is contained in:
Guhyoun Nam 2024-07-08 15:43:16 -05:00 committed by GitHub
parent 16a8a31c52
commit 784c04ea81
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
17 changed files with 305 additions and 4 deletions

View File

@ -1,6 +1,10 @@
<li>
<div class="col first status">
{{#if @event.redelivering}}
{{d-icon "sync"}}
{{else}}
<span class={{this.statusColorClasses}}>{{@event.status}}</span>
{{/if}}
</div>
<div class="col event-id">{{@event.id}}</div>

View File

@ -13,6 +13,13 @@
class="delivery-status-filters"
/>
<DButton
@icon="sync"
@label="admin.web_hooks.events.redeliver_failed"
@action={{this.redeliverFailed}}
@disabled={{not this.redeliverEnabled}}
/>
<DButton
@icon="paper-plane"
@label="admin.web_hooks.events.ping"

View File

@ -11,10 +11,12 @@ import I18n from "discourse-i18n";
export default class WebhookEvents extends Component {
@service messageBus;
@service store;
@service dialog;
@tracked pingEnabled = true;
@tracked events = [];
@tracked incomingEventIds = [];
@tracked redeliverEnabled = true;
@readOnly("incomingEventIds.length") incomingCount;
@gt("incomingCount", 0) hasIncoming;
@ -37,6 +39,17 @@ export default class WebhookEvents extends Component {
} finally {
this.loading = false;
}
this.redeliverEnabled = this.failedEventIds.length;
}
get failedEventIds() {
return this.events.content
.filter(
(event) =>
(event.status < 200 || event.status > 299) && event.status !== 0
)
.map((event) => event.id);
}
get statuses() {
@ -78,6 +91,24 @@ export default class WebhookEvents extends Component {
this.pingEnabled = true;
}
if (data.type === "redelivered") {
const event = this.events.find((e) => e.id === data.web_hook_event.id);
event.setProperties({
response_body: data.web_hook_event.response_body,
response_headers: data.web_hook_event.response_headers,
status: data.web_hook_event.status,
redelivering: false,
});
return;
}
if (data.type === "redelivery_failed") {
const event = this.events.find((e) => e.id === data.web_hook_event_id);
event.set("redelivering", false);
return;
}
if (!this.incomingEventIds.includes(data.web_hook_event_id)) {
this.incomingEventIds.pushObject(data.web_hook_event_id);
}
@ -117,4 +148,43 @@ export default class WebhookEvents extends Component {
popupAjaxError(error);
}
}
@action
async redeliverFailed() {
if (!this.failedEventIds.length) {
this.dialog.alert(
I18n.t("admin.web_hooks.events.no_events_to_redeliver")
);
this.redeliverEnabled = false;
return;
}
return this.dialog.yesNoConfirm({
message: I18n.t("admin.web_hooks.events.redeliver_failed_confirm", {
count: this.failedEventIds.length,
}),
didConfirm: async () => {
try {
const response = await ajax(
`/admin/api/web_hooks/${this.args.webhookId}/events/failed_redeliver`,
{ type: "POST", data: { event_ids: this.failedEventIds } }
);
if (response.event_ids?.length) {
response.event_ids.map((id) => {
const event = this.events.find((e) => e.id === id);
event.set("redelivering", true);
});
} else {
this.dialog.alert(
I18n.t("admin.web_hooks.events.no_events_to_redeliver")
);
}
} catch (error) {
popupAjaxError(error);
} finally {
this.redeliverEnabled = false;
}
},
});
}
}

View File

@ -1,7 +1,8 @@
# frozen_string_literal: true
class Admin::WebHooksController < Admin::AdminController
before_action :fetch_web_hook, only: %i[show update destroy list_events bulk_events ping]
before_action :fetch_web_hook,
only: %i[show update destroy list_events bulk_events ping redeliver_failed_events]
def index
limit = 50
@ -93,7 +94,7 @@ class Admin::WebHooksController < Admin::AdminController
def list_events
limit = 50
offset = params[:offset].to_i
events = @web_hook.web_hook_events
events = @web_hook.web_hook_events.includes(:redelivering_webhook_event)
status = params[:status]
if status == "successful"
events = events.successful
@ -141,6 +142,24 @@ class Admin::WebHooksController < Admin::AdminController
end
end
def redeliver_failed_events
web_hook_events =
@web_hook
.web_hook_events
.includes(:redelivering_webhook_event)
.not_ping
.where(id: params[:event_ids])
raise Discourse::InvalidParameters if web_hook_events.count.zero?
web_hook_events.each do |web_hook_event|
if !web_hook_event.redelivering_webhook_event
RedeliveringWebhookEvent.create!(web_hook_event: web_hook_event)
end
end
render json: { event_ids: web_hook_events.map(&:id) }
end
def ping
Jobs.enqueue(
:emit_web_hook_event,

View File

@ -0,0 +1,16 @@
# frozen_string_literal: true
module Jobs
class CleanupRedeliveringWebHookEvents < ::Jobs::Scheduled
every 1.hour
sidekiq_options queue: "low"
def execute(args)
RedeliveringWebhookEvent
.includes(web_hook_event: :web_hook)
.where("created_at < ?", 8.hour.ago)
.delete_all
end
end
end

View File

@ -0,0 +1,64 @@
# frozen_string_literal: true
require "excon"
module Jobs
class RedeliverWebHookEvents < ::Jobs::Scheduled
every 1.minute
sidekiq_options queue: "low"
sidekiq_options retry: false
REDELIVERED = "redelivered"
REDELIVERY_FAILED = "redelivery_failed"
LIMIT = 20
def execute(args)
redelivery_events =
RedeliveringWebhookEvent
.where(processing: false)
.includes(web_hook_event: :web_hook)
.limit(LIMIT)
event_ids = redelivery_events.pluck(:id)
redelivery_events.update_all(processing: true)
updated_redelivery_events = RedeliveringWebhookEvent.where(id: event_ids)
updated_redelivery_events.each do |redelivery_event|
begin
web_hook_event = redelivery_event.web_hook_event
web_hook = web_hook_event.web_hook
emitter = WebHookEmitter.new(web_hook, web_hook_event)
emitter.emit!(
headers: MultiJson.load(web_hook_event.headers),
body: web_hook_event.payload,
)
publish_webhook_event(web_hook_event, web_hook, REDELIVERED)
RedeliveringWebhookEvent.delete(redelivery_event)
rescue => e
Discourse.warn_exception(
e,
message: "Error redelivering web_hook_event #{web_hook_event.id}",
)
publish_webhook_event(web_hook_event, web_hook, REDELIVERY_FAILED)
RedeliveringWebhookEvent.delete(redelivery_event)
end
sleep 2
end
end
private
def publish_webhook_event(web_hook_event, web_hook, type)
MessageBus.publish(
"/web_hook_events/#{web_hook.id}",
{
type: type,
web_hook_event: AdminWebHookEventSerializer.new(web_hook_event, root: false).as_json,
},
)
end
end
end

View File

@ -0,0 +1,20 @@
# frozen_string_literal: true
class RedeliveringWebhookEvent < ActiveRecord::Base
belongs_to :web_hook_event
end
# == Schema Information
#
# Table name: redelivering_webhook_events
#
# id :bigint not null, primary key
# web_hook_event_id :bigint not null
# processing :boolean default(FALSE), not null
# created_at :datetime not null
# updated_at :datetime not null
#
# Indexes
#
# index_redelivering_webhook_events_on_web_hook_event_id (web_hook_event_id)
#

View File

@ -7,6 +7,7 @@ class WebHook < ActiveRecord::Base
has_and_belongs_to_many :tags
has_many :web_hook_events, dependent: :destroy
has_many :redelivering_webhook_events
has_many :web_hook_events_daily_aggregates, dependent: :destroy
default_scope { order("id ASC") }

View File

@ -3,8 +3,11 @@
class WebHookEvent < ActiveRecord::Base
scope :successful, -> { where("status >= 200 AND status <= 299") }
scope :failed, -> { where("status < 200 OR status > 299") }
scope :not_ping, -> { where("status <> 0") }
belongs_to :web_hook
has_one :redelivering_webhook_event, class_name: "RedeliveringWebhookEvent"
after_save :update_web_hook_delivery_status
default_scope { order("created_at DESC") }

View File

@ -10,9 +10,14 @@ class AdminWebHookEventSerializer < ApplicationSerializer
:response_headers,
:response_body,
:duration,
:created_at
:created_at,
:redelivering
def request_url
object.web_hook.payload_url
end
def redelivering
object.redelivering_webhook_event.present?
end
end

View File

@ -5348,6 +5348,7 @@ en:
events:
none: "There are no related events."
redeliver: "Redeliver"
redeliver_failed: "Redeliver Failed"
incoming:
one: "There is a new event."
other: "There are %{count} new events."
@ -5357,6 +5358,8 @@ en:
request: "Request"
response: "Response"
redeliver_confirm: "Are you sure you want to redeliver the same payload?"
redeliver_failed_confirm: "Are you sure you want to redeliver %{count} webhook events?"
no_events_to_redeliver: "No events to redeliver."
headers: "Headers"
payload: "Payload"
body: "Body"

View File

@ -344,6 +344,7 @@ Discourse::Application.routes.draw do
get "web_hook_events/:id" => "web_hooks#list_events", :as => :web_hook_events
get "web_hooks/:id/events/bulk" => "web_hooks#bulk_events"
post "web_hooks/:web_hook_id/events/:event_id/redeliver" => "web_hooks#redeliver_event"
post "web_hooks/:id/events/failed_redeliver" => "web_hooks#redeliver_failed_events"
post "web_hooks/:id/ping" => "web_hooks#ping"
end
end

View File

@ -0,0 +1,12 @@
# frozen_string_literal: true
class CreateRedeliveringWebhookEvents < ActiveRecord::Migration[7.0]
def change
create_table :redelivering_webhook_events do |t|
t.belongs_to :web_hook_event, null: false, index: true
t.boolean :processing, default: false, null: false
t.timestamps
end
end
end

View File

@ -0,0 +1,3 @@
# frozen_string_literal: true
Fabricator(:redelivering_webhook_event) { web_hook_event_id { Fabricate(:web_hook_event).id } }

View File

@ -0,0 +1,19 @@
# frozen_string_literal: true
RSpec.describe Jobs::CleanupRedeliveringWebHookEvents do
subject(:job) { described_class.new }
fab!(:redelivering_webhook_event1) do
Fabricate(:redelivering_webhook_event, created_at: Time.now)
end
fab!(:redelivering_webhook_event2) do
Fabricate(:redelivering_webhook_event, created_at: 9.hours.ago)
end
it "deletes redelivering_webhook_events that created more than 8 hours ago" do
job.execute({})
expect(RedeliveringWebhookEvent.count).to eq(1)
expect(RedeliveringWebhookEvent.find_by(id: redelivering_webhook_event1.id)).to be_present
end
end

View File

@ -0,0 +1,38 @@
# frozen_string_literal: true
require "excon"
RSpec.describe Jobs::RedeliverWebHookEvents do
subject(:job) { described_class.new }
fab!(:web_hook)
fab!(:web_hook_event) do
Fabricate(
:web_hook_event,
web_hook: web_hook,
payload: "abc",
headers: JSON.dump(aa: "1", bb: "2"),
)
end
fab!(:redelivering_webhook_event) do
Fabricate(:redelivering_webhook_event, web_hook_event_id: web_hook_event.id)
end
it "redelivers webhook events" do
stub_request(:post, web_hook.payload_url).with(
body: "abc",
headers: {
"aa" => 1,
"bb" => 2,
},
).to_return(status: 400, body: "", headers: {})
messages =
MessageBus.track_publish { job.execute(web_hook: web_hook, web_hook_event: web_hook_event) }
expect(RedeliveringWebhookEvent.count).to eq(0)
expect(messages.count).to eq(1)
expect(messages.first.data).to include(type: "redelivered")
end
end

View File

@ -363,4 +363,20 @@ RSpec.describe Admin::WebHooksController do
end
end
end
describe "#redeliver_failed_events" do
fab!(:web_hook_event) { Fabricate(:web_hook_event, web_hook: web_hook, status: 404) }
before { sign_in(admin) }
it "stores failed events" do
post "/admin/api/web_hooks/#{web_hook.id}/events/failed_redeliver.json",
params: {
event_ids: web_hook_event.id,
}
expect(RedeliveringWebhookEvent.find_by(web_hook_event_id: web_hook_event.id)).not_to be_nil
expect(response.status).to eq(200)
expect(response.parsed_body["event_ids"]).to eq([web_hook_event.id])
end
end
end