FIX: Prevent race condition in recurring automations (#26828)
Recurring automations are triggered by a scheduled job that runs every minute and checks for due automations, runs them and then marks as them as completed (by deleting the `PendingAutomation` record). However, the job is currently subject to a race condition where a recurring automation can be executed more than once at its due date if it takes more than a minute to finish. This commit adds a mutex around the code that triggers the recurring automation so that no concurrent executions can happen for a single automation. Meta topic: https://meta.discourse.org/t/daily-summary-9pm-utc/291850/119?u=osama.
This commit is contained in:
parent
d1f008a2fc
commit
8ed684312f
|
@ -23,22 +23,36 @@ module Jobs
|
|||
end
|
||||
|
||||
def send_pending_pm(pending_pm)
|
||||
DiscourseAutomation::Scriptable::Utils.send_pm(
|
||||
pending_pm.attributes.slice("target_usernames", "title", "raw"),
|
||||
sender: pending_pm.sender,
|
||||
prefers_encrypt: pending_pm.prefers_encrypt,
|
||||
)
|
||||
DistributedMutex.synchronize(
|
||||
"automation_send_pending_pm_#{pending_pm.id}",
|
||||
validity: 30.minutes,
|
||||
) do
|
||||
next if !DiscourseAutomation::PendingPm.exists?(pending_pm.id)
|
||||
|
||||
pending_pm.destroy!
|
||||
DiscourseAutomation::Scriptable::Utils.send_pm(
|
||||
pending_pm.attributes.slice("target_usernames", "title", "raw"),
|
||||
sender: pending_pm.sender,
|
||||
prefers_encrypt: pending_pm.prefers_encrypt,
|
||||
)
|
||||
|
||||
pending_pm.destroy!
|
||||
end
|
||||
end
|
||||
|
||||
def run_pending_automation(pending_automation)
|
||||
pending_automation.automation.trigger!(
|
||||
"kind" => pending_automation.automation.trigger,
|
||||
"execute_at" => pending_automation.execute_at,
|
||||
)
|
||||
DistributedMutex.synchronize(
|
||||
"process_pending_automation_#{pending_automation.id}",
|
||||
validity: 30.minutes,
|
||||
) do
|
||||
next if !DiscourseAutomation::PendingAutomation.exists?(pending_automation.id)
|
||||
|
||||
pending_automation.destroy!
|
||||
pending_automation.automation.trigger!(
|
||||
"kind" => pending_automation.automation.trigger,
|
||||
"execute_at" => pending_automation.execute_at,
|
||||
)
|
||||
|
||||
pending_automation.destroy!
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -59,6 +59,47 @@ describe Jobs::DiscourseAutomationTracker do
|
|||
}
|
||||
end
|
||||
end
|
||||
|
||||
it "doesn't run multiple times if the job is invoked multiple times concurrently" do
|
||||
count = 0
|
||||
|
||||
DiscourseAutomation::Scriptable.add("no_race_condition") do
|
||||
script { count += 1 }
|
||||
|
||||
triggerables [DiscourseAutomation::Triggers::RECURRING]
|
||||
end
|
||||
|
||||
automation =
|
||||
Fabricate(
|
||||
:automation,
|
||||
script: "no_race_condition",
|
||||
trigger: DiscourseAutomation::Triggers::RECURRING,
|
||||
)
|
||||
|
||||
automation.upsert_field!(
|
||||
"start_date",
|
||||
"date_time",
|
||||
{ value: 61.minutes.ago },
|
||||
target: "trigger",
|
||||
)
|
||||
|
||||
automation.upsert_field!(
|
||||
"recurrence",
|
||||
"period",
|
||||
{ value: { interval: 1, frequency: "hour" } },
|
||||
target: "trigger",
|
||||
)
|
||||
|
||||
freeze_time(2.hours.from_now) do
|
||||
threads = []
|
||||
5.times { threads << Thread.new { Jobs::DiscourseAutomationTracker.new.execute } }
|
||||
threads.each(&:join)
|
||||
end
|
||||
|
||||
expect(count).to eq(1)
|
||||
ensure
|
||||
DiscourseAutomation::Scriptable.remove("no_race_condition")
|
||||
end
|
||||
end
|
||||
|
||||
describe "pending pms" do
|
||||
|
@ -101,5 +142,16 @@ describe Jobs::DiscourseAutomationTracker do
|
|||
}
|
||||
end
|
||||
end
|
||||
|
||||
it "doesn't send multiple messages if the job is invoked multiple times concurrently" do
|
||||
pending_pm.update!(execute_at: 1.hour.from_now)
|
||||
expect do
|
||||
freeze_time(2.hours.from_now) do
|
||||
threads = []
|
||||
5.times { threads << Thread.new { Jobs::DiscourseAutomationTracker.new.execute } }
|
||||
threads.each(&:join)
|
||||
end
|
||||
end.to change { Topic.private_messages_for_user(Discourse.system_user).count }.by(1)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
Loading…
Reference in New Issue