FIX: Make summaries backfill job more resilient. (#1071)

To quickly select backfill candidates without comparing SHAs, we compare the last summarized post to the topic's highest_post_number. However, hiding or deleting a post and adding a small action will update this column, causing the job to stall and re-generate the same summary repeatedly until someone posts a regular reply. On top of this, this is not always true for topics with `best_replies`, as this last reply isn't necessarily included.

Since this is not evident at first glance and each summarization strategy picks its targets differently, I'm opting to simplify the backfill logic and how we track potential candidates.

The first step is dropping `content_range`, which serves no purpose and it's there because summary caching was supposed to work differently at the beginning. So instead, I'm replacing it with a column called `highest_target_number`, which tracks `highest_post_number` for topics and could track other things like channel's `message_count` in the future.

Now that we have this column when selecting every potential backfill candidate, we'll check if the summary is truly outdated by comparing the SHAs, and if it's not, we just update the column and move on
This commit is contained in:
Roman Rizzi 2025-01-16 09:42:53 -03:00 committed by GitHub
parent f9aa2de413
commit 46fcdb6ba5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 149 additions and 116 deletions

View File

@ -16,7 +16,7 @@ module ::Jobs
gist = summarizer.existing_summary gist = summarizer.existing_summary
return if gist.present? && (!gist.outdated || gist.created_at >= 5.minutes.ago) return if gist.present? && (!gist.outdated || gist.created_at >= 5.minutes.ago)
summarizer.force_summarize(Discourse.system_user) summarizer.summarize(Discourse.system_user)
end end
end end
end end

View File

@ -18,7 +18,8 @@ module ::Jobs
backfill_candidates(gist_t) backfill_candidates(gist_t)
.limit(current_budget(gist_t)) .limit(current_budget(gist_t))
.each do |topic| .each do |topic|
DiscourseAi::Summarization.topic_gist(topic).force_summarize(system_user) strategy = DiscourseAi::Summarization.topic_gist(topic)
try_summarize(strategy, system_user, topic)
end end
end end
@ -26,10 +27,25 @@ module ::Jobs
backfill_candidates(complete_t) backfill_candidates(complete_t)
.limit(current_budget(complete_t)) .limit(current_budget(complete_t))
.each do |topic| .each do |topic|
DiscourseAi::Summarization.topic_summary(topic).force_summarize(system_user) strategy = DiscourseAi::Summarization.topic_summary(topic)
try_summarize(strategy, system_user, topic)
end end
end end
def try_summarize(strategy, user, topic)
existing_summary = strategy.existing_summary
if existing_summary.blank? || existing_summary.outdated
strategy.summarize(user)
else
# Hiding or deleting a post, and creating a small action alters the Topic#highest_post_number.
# We use this as a quick way to select potential backfill candidates without relying on original_content_sha.
# At this point, we are confident the summary doesn't need to be regenerated so something other than a regular reply
# caused the number to change in the topic.
existing_summary.update!(highest_target_number: topic.highest_post_number)
end
end
def backfill_candidates(summary_type) def backfill_candidates(summary_type)
max_age_days = SiteSetting.ai_summary_backfill_topic_max_age_days max_age_days = SiteSetting.ai_summary_backfill_topic_max_age_days
@ -45,12 +61,12 @@ module ::Jobs
.where( .where(
<<~SQL, # (1..1) gets stored ad (1..2). <<~SQL, # (1..1) gets stored ad (1..2).
ais.id IS NULL OR ( ais.id IS NULL OR (
UPPER(ais.content_range) < topics.highest_post_number + 1 ais.highest_target_number < topics.highest_post_number
AND ais.created_at < (current_timestamp - INTERVAL '5 minutes') AND ais.updated_at < (current_timestamp - INTERVAL '5 minutes')
) )
SQL SQL
) )
.order("ais.created_at DESC NULLS FIRST, topics.last_posted_at DESC") .order("ais.updated_at DESC NULLS FIRST, topics.last_posted_at DESC")
end end
def current_budget(type) def current_budget(type)

View File

@ -1,6 +1,9 @@
# frozen_string_literal: true # frozen_string_literal: true
class AiSummary < ActiveRecord::Base class AiSummary < ActiveRecord::Base
# TODO remove this line 01-3-2025
self.ignored_columns = %i[content_range]
belongs_to :target, polymorphic: true belongs_to :target, polymorphic: true
enum :summary_type, { complete: 0, gist: 1 } enum :summary_type, { complete: 0, gist: 1 }
@ -15,14 +18,20 @@ class AiSummary < ActiveRecord::Base
target_id: strategy.target.id, target_id: strategy.target.id,
target_type: strategy.target.class.name, target_type: strategy.target.class.name,
algorithm: llm_model.name, algorithm: llm_model.name,
content_range: (content_ids.first..content_ids.last), highest_target_number: strategy.highest_target_number,
summarized_text: summary, summarized_text: summary,
original_content_sha: build_sha(content_ids.join), original_content_sha: build_sha(content_ids.join),
summary_type: strategy.type, summary_type: strategy.type,
origin: !!human ? origins[:human] : origins[:system], origin: !!human ? origins[:human] : origins[:system],
}, },
unique_by: %i[target_id target_type summary_type], unique_by: %i[target_id target_type summary_type],
update_only: %i[summarized_text original_content_sha algorithm origin content_range], update_only: %i[
summarized_text
original_content_sha
algorithm
origin
highest_target_number
],
) )
.first .first
.then { AiSummary.find_by(id: _1["id"]) } .then { AiSummary.find_by(id: _1["id"]) }
@ -45,17 +54,17 @@ end
# #
# Table name: ai_summaries # Table name: ai_summaries
# #
# id :bigint not null, primary key # id :bigint not null, primary key
# target_id :integer not null # target_id :integer not null
# target_type :string not null # target_type :string not null
# content_range :int4range # summarized_text :string not null
# summarized_text :string not null # original_content_sha :string not null
# original_content_sha :string not null # algorithm :string not null
# algorithm :string not null # created_at :datetime not null
# created_at :datetime not null # updated_at :datetime not null
# updated_at :datetime not null # summary_type :integer default("complete"), not null
# summary_type :integer default("complete"), not null # origin :integer
# origin :integer # highest_target_number :integer not null
# #
# Indexes # Indexes
# #

View File

@ -13,15 +13,6 @@ class AiTopicSummarySerializer < ApplicationSerializer
end end
def new_posts_since_summary def new_posts_since_summary
# Postgres uses discrete range types for int4range, which means object.target.highest_post_number.to_i - object.highest_target_number.to_i
# (1..2) is stored as (1...3).
#
# We use Range#max to work around this, which in the case above always returns 2.
# Be careful with using Range#end here, it could lead to unexpected results as:
#
# (1..2).end => 2
# (1...3).end => 3
object.target.highest_post_number.to_i - object.content_range&.max.to_i
end end
end end

View File

@ -0,0 +1,14 @@
# frozen_string_literal: true
class AddHighestTargetNumberToAiSummary < ActiveRecord::Migration[7.2]
def up
add_column :ai_summaries, :highest_target_number, :integer, null: false
execute <<~SQL
UPDATE ai_summaries SET highest_target_number = GREATEST(UPPER(content_range) - 1, 1)
SQL
end
def down
drop_column :ai_summaries, :highest_target_number
end
end

View File

@ -0,0 +1,12 @@
# frozen_string_literal: true
class DropAiSummariesContentRange < ActiveRecord::Migration[7.2]
DROPPED_COLUMNS ||= { ai_summaries: %i[content_range] }
def up
DROPPED_COLUMNS.each { |table, columns| Migration::ColumnDropper.execute_drop(table, columns) }
end
def down
raise ActiveRecord::IrreversibleMigration
end
end

View File

@ -21,6 +21,8 @@ module DiscourseAi
# @param &on_partial_blk { Block - Optional } - The passed block will get called with the LLM partial response alongside a cancel function. # @param &on_partial_blk { Block - Optional } - The passed block will get called with the LLM partial response alongside a cancel function.
# Note: The block is only called with results of the final summary, not intermediate summaries. # Note: The block is only called with results of the final summary, not intermediate summaries.
# #
# This method doesn't care if we already have an up to date summary. It always regenerate.
#
# @returns { AiSummary } - Resulting summary. # @returns { AiSummary } - Resulting summary.
def summarize(user, &on_partial_blk) def summarize(user, &on_partial_blk)
base_summary = "" base_summary = ""
@ -68,11 +70,6 @@ module DiscourseAi
AiSummary.where(target: strategy.target, summary_type: strategy.type).destroy_all AiSummary.where(target: strategy.target, summary_type: strategy.type).destroy_all
end end
def force_summarize(user, &on_partial_blk)
@persist_summaries = true
summarize(user, &on_partial_blk)
end
private private
attr_reader :persist_summaries attr_reader :persist_summaries

View File

@ -8,6 +8,10 @@ module DiscourseAi
AiSummary.summary_types[:complete] AiSummary.summary_types[:complete]
end end
def highest_target_number
nil # We don't persist so we can return nil.
end
def initialize(target, since) def initialize(target, since)
super(target) super(target)
@since = since @since = since

View File

@ -12,6 +12,10 @@ module DiscourseAi
"gists" "gists"
end end
def highest_target_number
target.highest_post_number
end
def targets_data def targets_data
op_post_number = 1 op_post_number = 1
@ -38,6 +42,7 @@ module DiscourseAi
.limit(20) .limit(20)
.pluck(:post_number) .pluck(:post_number)
end end
posts_data = posts_data =
Post Post
.where(topic_id: target.id) .where(topic_id: target.id)

View File

@ -8,6 +8,10 @@ module DiscourseAi
AiSummary.summary_types[:complete] AiSummary.summary_types[:complete]
end end
def highest_target_number
target.highest_post_number
end
def targets_data def targets_data
posts_data = posts_data =
(target.has_summary? ? best_replies : pick_selection).pluck( (target.has_summary? ? best_replies : pick_selection).pluck(

View File

@ -7,6 +7,7 @@ Fabricator(:ai_summary) do
target { Fabricate(:topic) } target { Fabricate(:topic) }
summary_type AiSummary.summary_types[:complete] summary_type AiSummary.summary_types[:complete]
origin AiSummary.origins[:human] origin AiSummary.origins[:human]
highest_target_number 1
end end
Fabricator(:topic_ai_gist, from: :ai_summary) do Fabricator(:topic_ai_gist, from: :ai_summary) do

View File

@ -44,13 +44,13 @@ RSpec.describe Jobs::SummariesBackfill do
end end
it "ignores up to date summaries" do it "ignores up to date summaries" do
Fabricate(:ai_summary, target: topic, content_range: (1..2)) Fabricate(:ai_summary, target: topic, highest_target_number: 2, updated_at: 10.minutes.ago)
expect(subject.backfill_candidates(type)).to be_empty expect(subject.backfill_candidates(type)).to be_empty
end end
it "ignores outdated summaries created less than five minutes ago" do it "ignores outdated summaries updated less than five minutes ago" do
Fabricate(:ai_summary, target: topic, content_range: (1..1), created_at: 4.minutes.ago) Fabricate(:ai_summary, target: topic, highest_target_number: 1, updated_at: 4.minutes.ago)
expect(subject.backfill_candidates(type)).to be_empty expect(subject.backfill_candidates(type)).to be_empty
end end
@ -66,7 +66,7 @@ RSpec.describe Jobs::SummariesBackfill do
topic_2 = topic_2 =
Fabricate(:topic, word_count: 200, last_posted_at: 2.minutes.ago, highest_post_number: 1) Fabricate(:topic, word_count: 200, last_posted_at: 2.minutes.ago, highest_post_number: 1)
topic.update!(last_posted_at: 1.minute.ago) topic.update!(last_posted_at: 1.minute.ago)
Fabricate(:ai_summary, target: topic, created_at: 1.hour.ago, content_range: (1..1)) Fabricate(:ai_summary, target: topic, updated_at: 1.hour.ago, highest_target_number: 1)
expect(subject.backfill_candidates(type).map(&:id)).to contain_exactly(topic_2.id, topic.id) expect(subject.backfill_candidates(type).map(&:id)).to contain_exactly(topic_2.id, topic.id)
end end
@ -84,13 +84,13 @@ RSpec.describe Jobs::SummariesBackfill do
topic_2 = topic_2 =
Fabricate(:topic, word_count: 200, last_posted_at: 2.minutes.ago, highest_post_number: 1) Fabricate(:topic, word_count: 200, last_posted_at: 2.minutes.ago, highest_post_number: 1)
topic.update!(last_posted_at: 1.minute.ago) topic.update!(last_posted_at: 1.minute.ago)
Fabricate(:ai_summary, target: topic, created_at: 3.hours.ago, content_range: (1..1)) Fabricate(:ai_summary, target: topic, updated_at: 3.hours.ago, highest_target_number: 1)
Fabricate(:topic_ai_gist, target: topic, created_at: 3.hours.ago, content_range: (1..1)) Fabricate(:topic_ai_gist, target: topic, updated_at: 3.hours.ago, highest_target_number: 1)
summary_1 = "Summary of topic_2" summary_1 = "Summary of topic_2"
gist_1 = "Gist of topic_2" gist_1 = "Gist of topic_2"
summary_2 = "Summary of topic" summary_2 = "Updated summary of topic"
gist_2 = "Gist of topic" gist_2 = "Updated gist of topic"
DiscourseAi::Completions::Llm.with_prepared_responses( DiscourseAi::Completions::Llm.with_prepared_responses(
[gist_1, gist_2, summary_1, summary_2], [gist_1, gist_2, summary_1, summary_2],
@ -100,6 +100,32 @@ RSpec.describe Jobs::SummariesBackfill do
expect(AiSummary.gist.find_by(target: topic_2).summarized_text).to eq(gist_1) expect(AiSummary.gist.find_by(target: topic_2).summarized_text).to eq(gist_1)
expect(AiSummary.complete.find_by(target: topic).summarized_text).to eq(summary_2) expect(AiSummary.complete.find_by(target: topic).summarized_text).to eq(summary_2)
expect(AiSummary.gist.find_by(target: topic).summarized_text).to eq(gist_2) expect(AiSummary.gist.find_by(target: topic).summarized_text).to eq(gist_2)
# Queue has to be empty if we just generated all summaries
expect(subject.backfill_candidates(AiSummary.summary_types[:complete])).to be_empty
expect(subject.backfill_candidates(AiSummary.summary_types[:gist])).to be_empty
# Queue still empty when they are up to date and time passes.
AiSummary.update_all(updated_at: 20.minutes.ago)
expect(subject.backfill_candidates(AiSummary.summary_types[:complete])).to be_empty
expect(subject.backfill_candidates(AiSummary.summary_types[:gist])).to be_empty
end
it "updates the highest_target_number if the summary turned to be up to date" do
existing_summary =
Fabricate(
:ai_summary,
target: topic,
updated_at: 3.hours.ago,
highest_target_number: topic.highest_post_number,
)
og_highest_post_number = topic.highest_post_number
topic.update!(highest_post_number: og_highest_post_number + 1)
# No prepared responses here. We don't perform a completion call.
subject.execute({})
expect(existing_summary.reload.highest_target_number).to eq(og_highest_post_number + 1)
end end
end end
end end

View File

@ -26,13 +26,7 @@ describe DiscourseAi::GuardianExtensions do
end end
it "returns true if there is a cached summary" do it "returns true if there is a cached summary" do
AiSummary.create!( Fabricate(:ai_summary, target: topic)
target: topic,
summarized_text: "test",
original_content_sha: "123",
algorithm: "test",
summary_type: AiSummary.summary_types[:complete],
)
expect(guardian.can_see_summary?(topic)).to eq(true) expect(guardian.can_see_summary?(topic)).to eq(true)
end end
@ -66,13 +60,7 @@ describe DiscourseAi::GuardianExtensions do
end end
it "returns true for anons when there is a cached summary" do it "returns true for anons when there is a cached summary" do
AiSummary.create!( Fabricate(:ai_summary, target: topic)
target: topic,
summarized_text: "test",
original_content_sha: "123",
algorithm: "test",
summary_type: AiSummary.summary_types[:complete],
)
expect(guardian.can_see_summary?(topic)).to eq(true) expect(guardian.can_see_summary?(topic)).to eq(true)
end end

View File

@ -13,15 +13,7 @@ RSpec.describe DiscourseAi::Summarization::SummaryController do
context "when streaming" do context "when streaming" do
it "return a cached summary with json payload and does not trigger job if it exists" do it "return a cached summary with json payload and does not trigger job if it exists" do
section = summary = Fabricate(:ai_summary, target: topic)
AiSummary.create!(
target: topic,
summarized_text: "test",
algorithm: "test",
original_content_sha: "test",
summary_type: AiSummary.summary_types[:complete],
)
sign_in(Fabricate(:admin)) sign_in(Fabricate(:admin))
get "/discourse-ai/summarization/t/#{topic.id}.json?stream=true" get "/discourse-ai/summarization/t/#{topic.id}.json?stream=true"
@ -29,8 +21,10 @@ RSpec.describe DiscourseAi::Summarization::SummaryController do
expect(response.status).to eq(200) expect(response.status).to eq(200)
expect(Jobs::StreamTopicAiSummary.jobs.size).to eq(0) expect(Jobs::StreamTopicAiSummary.jobs.size).to eq(0)
summary = response.parsed_body response_summary = response.parsed_body
expect(summary.dig("ai_topic_summary", "summarized_text")).to eq(section.summarized_text) expect(response_summary.dig("ai_topic_summary", "summarized_text")).to eq(
summary.summarized_text,
)
end end
end end
@ -42,21 +36,15 @@ RSpec.describe DiscourseAi::Summarization::SummaryController do
end end
it "returns a cached summary" do it "returns a cached summary" do
section = summary = Fabricate(:ai_summary, target: topic)
AiSummary.create!(
target: topic,
summarized_text: "test",
algorithm: "test",
original_content_sha: "test",
summary_type: AiSummary.summary_types[:complete],
)
get "/discourse-ai/summarization/t/#{topic.id}.json" get "/discourse-ai/summarization/t/#{topic.id}.json"
expect(response.status).to eq(200) expect(response.status).to eq(200)
summary = response.parsed_body response_summary = response.parsed_body
expect(summary.dig("ai_topic_summary", "summarized_text")).to eq(section.summarized_text) expect(response_summary.dig("ai_topic_summary", "summarized_text")).to eq(
summary.summarized_text,
)
end end
end end
@ -90,15 +78,15 @@ RSpec.describe DiscourseAi::Summarization::SummaryController do
get "/discourse-ai/summarization/t/#{topic.id}.json" get "/discourse-ai/summarization/t/#{topic.id}.json"
expect(response.status).to eq(200) expect(response.status).to eq(200)
summary = response.parsed_body["ai_topic_summary"] response_summary = response.parsed_body["ai_topic_summary"]
section = AiSummary.last summary = AiSummary.last
expect(section.summarized_text).to eq(summary_text) expect(summary.summarized_text).to eq(summary_text)
expect(summary["summarized_text"]).to eq(section.summarized_text) expect(response_summary["summarized_text"]).to eq(summary.summarized_text)
expect(summary["algorithm"]).to eq("fake") expect(response_summary["algorithm"]).to eq("fake")
expect(summary["outdated"]).to eq(false) expect(response_summary["outdated"]).to eq(false)
expect(summary["can_regenerate"]).to eq(true) expect(response_summary["can_regenerate"]).to eq(true)
expect(summary["new_posts_since_summary"]).to be_zero expect(response_summary["new_posts_since_summary"]).to be_zero
end end
end end
@ -129,21 +117,16 @@ RSpec.describe DiscourseAi::Summarization::SummaryController do
end end
it "returns a cached summary" do it "returns a cached summary" do
section = summary = Fabricate(:ai_summary, target: topic)
AiSummary.create!(
target: topic,
summarized_text: "test",
algorithm: "test",
original_content_sha: "test",
summary_type: AiSummary.summary_types[:complete],
)
get "/discourse-ai/summarization/t/#{topic.id}.json" get "/discourse-ai/summarization/t/#{topic.id}.json"
expect(response.status).to eq(200) expect(response.status).to eq(200)
summary = response.parsed_body response_summary = response.parsed_body
expect(summary.dig("ai_topic_summary", "summarized_text")).to eq(section.summarized_text) expect(response_summary.dig("ai_topic_summary", "summarized_text")).to eq(
summary.summarized_text,
)
end end
end end
end end

View File

@ -20,7 +20,7 @@ describe DiscourseAi::TopicSummarization do
cached_summary = cached_summary =
AiSummary.find_by(target: topic, summary_type: AiSummary.summary_types[:complete]) AiSummary.find_by(target: topic, summary_type: AiSummary.summary_types[:complete])
expect(cached_summary.content_range).to cover(*topic.posts.map(&:post_number)) expect(cached_summary.highest_target_number).to eq(topic.highest_post_number)
expect(cached_summary.summarized_text).to eq(summary) expect(cached_summary.summarized_text).to eq(summary)
expect(cached_summary.original_content_sha).to be_present expect(cached_summary.original_content_sha).to be_present
expect(cached_summary.algorithm).to eq("fake") expect(cached_summary.algorithm).to eq("fake")

View File

@ -16,6 +16,8 @@ RSpec.describe "Summarize a topic ", type: :system do
let(:topic_page) { PageObjects::Pages::Topic.new } let(:topic_page) { PageObjects::Pages::Topic.new }
let(:summary_box) { PageObjects::Components::AiSummaryTrigger.new } let(:summary_box) { PageObjects::Components::AiSummaryTrigger.new }
fab!(:ai_summary) { Fabricate(:ai_summary, target: topic, summarized_text: "This is a summary") }
before do before do
group.add(current_user) group.add(current_user)
@ -27,16 +29,6 @@ RSpec.describe "Summarize a topic ", type: :system do
end end
context "when a summary is cached" do context "when a summary is cached" do
before do
AiSummary.create!(
target: topic,
summarized_text: summarization_result,
algorithm: "test",
original_content_sha: "test",
summary_type: AiSummary.summary_types[:complete],
)
end
it "displays it" do it "displays it" do
topic_page.visit_topic(topic) topic_page.visit_topic(topic)
summary_box.click_summarize summary_box.click_summarize
@ -45,15 +37,6 @@ RSpec.describe "Summarize a topic ", type: :system do
end end
context "when a summary is outdated" do context "when a summary is outdated" do
before do
AiSummary.create!(
target: topic,
summarized_text: summarization_result,
algorithm: "test",
original_content_sha: "test",
summary_type: AiSummary.summary_types[:complete],
)
end
fab!(:new_post) do fab!(:new_post) do
Fabricate( Fabricate(
:post, :post,