FIX: ensure we only attempt embedding once every 15 minutes (#76)
This also heavily reduced log noise and ensures our exception handling is more surgical.
This commit is contained in:
parent
074d00ca32
commit
b82fc1e692
|
@ -3,6 +3,19 @@
|
|||
module DiscourseAi
|
||||
module Embeddings
|
||||
class SemanticRelated
|
||||
def self.semantic_suggested_key(topic_id)
|
||||
"semantic-suggested-topic-#{topic_id}"
|
||||
end
|
||||
|
||||
def self.build_semantic_suggested_key(topic_id)
|
||||
"build-semantic-suggested-topic-#{topic_id}"
|
||||
end
|
||||
|
||||
def self.clear_cache_for(topic)
|
||||
Discourse.cache.delete(semantic_suggested_key(topic.id))
|
||||
Discourse.redis.del(build_semantic_suggested_key(topic.id))
|
||||
end
|
||||
|
||||
def self.candidates_for(topic)
|
||||
return ::Topic.none if SiteSetting.ai_embeddings_semantic_related_topics < 1
|
||||
|
||||
|
@ -25,12 +38,19 @@ module DiscourseAi
|
|||
candidate_ids =
|
||||
Discourse
|
||||
.cache
|
||||
.fetch("semantic-suggested-topic-#{topic.id}", expires_in: cache_for) do
|
||||
.fetch(semantic_suggested_key(topic.id), expires_in: cache_for) do
|
||||
DiscourseAi::Embeddings::Topic.new.symmetric_semantic_search(model, topic)
|
||||
end
|
||||
rescue StandardError => e
|
||||
Rails.logger.error("SemanticRelated: #{e}")
|
||||
Jobs.enqueue(:generate_embeddings, topic_id: topic.id)
|
||||
rescue MissingEmbeddingError
|
||||
# avoid a flood of jobs when visiting topic
|
||||
if Discourse.redis.set(
|
||||
build_semantic_suggested_key(topic.id),
|
||||
"queued",
|
||||
ex: 15.minutes.to_i,
|
||||
nx: true,
|
||||
)
|
||||
Jobs.enqueue(:generate_embeddings, topic_id: topic.id)
|
||||
end
|
||||
return ::Topic.none
|
||||
end
|
||||
|
||||
|
|
|
@ -2,6 +2,8 @@
|
|||
|
||||
module DiscourseAi
|
||||
module Embeddings
|
||||
MissingEmbeddingError = Class.new(StandardError)
|
||||
|
||||
class Topic
|
||||
def generate_and_store_embeddings_for(topic)
|
||||
return unless SiteSetting.ai_embeddings_enabled
|
||||
|
@ -17,29 +19,12 @@ module DiscourseAi
|
|||
end
|
||||
|
||||
def symmetric_semantic_search(model, topic)
|
||||
candidate_ids =
|
||||
DiscourseAi::Database::Connection.db.query(<<~SQL, topic_id: topic.id).map(&:topic_id)
|
||||
SELECT
|
||||
topic_id
|
||||
FROM
|
||||
topic_embeddings_#{model.name.underscore}
|
||||
ORDER BY
|
||||
embedding #{model.pg_function} (
|
||||
SELECT
|
||||
embedding
|
||||
FROM
|
||||
topic_embeddings_#{model.name.underscore}
|
||||
WHERE
|
||||
topic_id = :topic_id
|
||||
LIMIT 1
|
||||
)
|
||||
LIMIT 100
|
||||
SQL
|
||||
candidate_ids = query_symmetric_embeddings(model, topic)
|
||||
|
||||
# Happens when the topic doesn't have any embeddings
|
||||
# I'd rather not use Exceptions to control the flow, so this should be refactored soon
|
||||
if candidate_ids.empty? || !candidate_ids.include?(topic.id)
|
||||
raise StandardError, "No embeddings found for topic #{topic.id}"
|
||||
raise MissingEmbeddingError, "No embeddings found for topic #{topic.id}"
|
||||
end
|
||||
|
||||
candidate_ids
|
||||
|
@ -70,6 +55,26 @@ module DiscourseAi
|
|||
|
||||
private
|
||||
|
||||
def query_symmetric_embeddings(model, topic)
|
||||
DiscourseAi::Database::Connection.db.query(<<~SQL, topic_id: topic.id).map(&:topic_id)
|
||||
SELECT
|
||||
topic_id
|
||||
FROM
|
||||
topic_embeddings_#{model.name.underscore}
|
||||
ORDER BY
|
||||
embedding #{model.pg_function} (
|
||||
SELECT
|
||||
embedding
|
||||
FROM
|
||||
topic_embeddings_#{model.name.underscore}
|
||||
WHERE
|
||||
topic_id = :topic_id
|
||||
LIMIT 1
|
||||
)
|
||||
LIMIT 100
|
||||
SQL
|
||||
end
|
||||
|
||||
def persist_embedding(topic, model, embedding)
|
||||
DiscourseAi::Database::Connection.db.exec(<<~SQL, topic_id: topic.id, embedding: embedding)
|
||||
INSERT INTO topic_embeddings_#{model.name.underscore} (topic_id, embedding)
|
||||
|
|
|
@ -16,33 +16,65 @@ describe DiscourseAi::Embeddings::SemanticRelated do
|
|||
before { SiteSetting.ai_embeddings_semantic_related_topics_enabled = true }
|
||||
|
||||
describe "#candidates_for" do
|
||||
before do
|
||||
Discourse.cache.clear
|
||||
DiscourseAi::Embeddings::Topic
|
||||
.any_instance
|
||||
.expects(:symmetric_semantic_search)
|
||||
.returns(Topic.unscoped.order(id: :desc).limit(100).pluck(:id))
|
||||
context "when embeddings do not exist" do
|
||||
let (:topic) do
|
||||
topic = Fabricate(:topic)
|
||||
described_class.clear_cache_for(target)
|
||||
topic
|
||||
end
|
||||
|
||||
it "queues job only once per 15 minutes" do
|
||||
# sadly we need to mock a DB connection to return nothing
|
||||
DiscourseAi::Embeddings::Topic
|
||||
.any_instance
|
||||
.expects(:query_symmetric_embeddings)
|
||||
.returns([])
|
||||
.twice
|
||||
|
||||
results = nil
|
||||
|
||||
expect_enqueued_with(job: :generate_embeddings, args: { topic_id: topic.id }) do
|
||||
results = described_class.candidates_for(topic).to_a
|
||||
end
|
||||
|
||||
expect(results).to eq([])
|
||||
|
||||
expect_not_enqueued_with(job: :generate_embeddings, args: { topic_id: topic.id }) do
|
||||
results = described_class.candidates_for(topic).to_a
|
||||
end
|
||||
|
||||
expect(results).to eq([])
|
||||
end
|
||||
end
|
||||
context "when embeddings exist" do
|
||||
before do
|
||||
Discourse.cache.clear
|
||||
DiscourseAi::Embeddings::Topic
|
||||
.any_instance
|
||||
.expects(:symmetric_semantic_search)
|
||||
.returns(Topic.unscoped.order(id: :desc).limit(100).pluck(:id))
|
||||
end
|
||||
|
||||
after { Discourse.cache.clear }
|
||||
after { Discourse.cache.clear }
|
||||
|
||||
it "returns the related topics without non public topics" do
|
||||
results = described_class.candidates_for(target).to_a
|
||||
expect(results).to include(normal_topic_1)
|
||||
expect(results).to include(normal_topic_2)
|
||||
expect(results).to include(normal_topic_3)
|
||||
expect(results).to include(closed_topic)
|
||||
expect(results).to_not include(target)
|
||||
expect(results).to_not include(unlisted_topic)
|
||||
expect(results).to_not include(private_topic)
|
||||
expect(results).to_not include(secured_category_topic)
|
||||
end
|
||||
|
||||
context "when ai_embeddings_semantic_related_include_closed_topics is false" do
|
||||
before { SiteSetting.ai_embeddings_semantic_related_include_closed_topics = false }
|
||||
it "do not return closed topics" do
|
||||
it "returns the related topics without non public topics" do
|
||||
results = described_class.candidates_for(target).to_a
|
||||
expect(results).to_not include(closed_topic)
|
||||
expect(results).to include(normal_topic_1)
|
||||
expect(results).to include(normal_topic_2)
|
||||
expect(results).to include(normal_topic_3)
|
||||
expect(results).to include(closed_topic)
|
||||
expect(results).to_not include(target)
|
||||
expect(results).to_not include(unlisted_topic)
|
||||
expect(results).to_not include(private_topic)
|
||||
expect(results).to_not include(secured_category_topic)
|
||||
end
|
||||
|
||||
context "when ai_embeddings_semantic_related_include_closed_topics is false" do
|
||||
before { SiteSetting.ai_embeddings_semantic_related_include_closed_topics = false }
|
||||
it "do not return closed topics" do
|
||||
results = described_class.candidates_for(target).to_a
|
||||
expect(results).to_not include(closed_topic)
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
Loading…
Reference in New Issue