FEATURE: Make embeddings turn-key (#261)

To ease the administrative burden of enabling the embeddings model, this change introduces automatic backfill when the setting is enabled. It also moves the topic visit embedding creation to a lower priority queue in sidekiq and adds an option to skip embedding computation and persistence when we match on the digest.
This commit is contained in:
Rafael dos Santos Silva 2023-10-26 12:07:37 -03:00 committed by GitHub
parent 426e348c8a
commit 818b20fb6f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 164 additions and 25 deletions

View File

@ -60,6 +60,7 @@ en:
ai_embeddings_generate_for_pms: "Generate embeddings for personal messages." ai_embeddings_generate_for_pms: "Generate embeddings for personal messages."
ai_embeddings_semantic_related_topics_enabled: "Use Semantic Search for related topics." ai_embeddings_semantic_related_topics_enabled: "Use Semantic Search for related topics."
ai_embeddings_semantic_related_topics: "Maximum number of topics to show in related topic section." ai_embeddings_semantic_related_topics: "Maximum number of topics to show in related topic section."
ai_embeddings_backfill_batch_size: "Number of embeddings to backfill every 15 minutes."
ai_embeddings_pg_connection_string: "PostgreSQL connection string for the embeddings module. Needs pgvector extension enabled and a series of tables created. See docs for more info." ai_embeddings_pg_connection_string: "PostgreSQL connection string for the embeddings module. Needs pgvector extension enabled and a series of tables created. See docs for more info."
ai_embeddings_semantic_search_enabled: "Enable full-page semantic search." ai_embeddings_semantic_search_enabled: "Enable full-page semantic search."
ai_embeddings_semantic_related_include_closed_topics: "Include closed topics in semantic search results" ai_embeddings_semantic_related_include_closed_topics: "Include closed topics in semantic search results"

View File

@ -200,6 +200,9 @@ discourse_ai:
client: true client: true
ai_embeddings_semantic_related_topics: 5 ai_embeddings_semantic_related_topics: 5
ai_embeddings_semantic_related_include_closed_topics: true ai_embeddings_semantic_related_include_closed_topics: true
ai_embeddings_backfill_batch_size:
default: 250
hidden: true
ai_embeddings_pg_connection_string: ai_embeddings_pg_connection_string:
default: "" default: ""
hidden: true hidden: true

View File

@ -11,6 +11,7 @@ module DiscourseAi
require_relative "vector_representations/bge_large_en" require_relative "vector_representations/bge_large_en"
require_relative "strategies/truncation" require_relative "strategies/truncation"
require_relative "jobs/regular/generate_embeddings" require_relative "jobs/regular/generate_embeddings"
require_relative "jobs/scheduled/embeddings_backfill"
require_relative "semantic_related" require_relative "semantic_related"
require_relative "semantic_topic_query" require_relative "semantic_topic_query"

View File

@ -2,6 +2,8 @@
module Jobs module Jobs
class GenerateEmbeddings < ::Jobs::Base class GenerateEmbeddings < ::Jobs::Base
sidekiq_options queue: "low"
def execute(args) def execute(args)
return unless SiteSetting.ai_embeddings_enabled return unless SiteSetting.ai_embeddings_enabled
return if (topic_id = args[:topic_id]).blank? return if (topic_id = args[:topic_id]).blank?

View File

@ -0,0 +1,69 @@
# frozen_string_literal: true
module Jobs
class EmbeddingsBackfill < ::Jobs::Scheduled
every 15.minutes
sidekiq_options queue: "low"
cluster_concurrency 1
def execute(args)
return unless SiteSetting.ai_embeddings_enabled
limit = SiteSetting.ai_embeddings_backfill_batch_size
rebaked = 0
strategy = DiscourseAi::Embeddings::Strategies::Truncation.new
vector_rep =
DiscourseAi::Embeddings::VectorRepresentations::Base.current_representation(strategy)
table_name = vector_rep.table_name
topics =
Topic
.joins("LEFT JOIN #{table_name} ON #{table_name}.topic_id = topics.id")
.where(archetype: Archetype.default)
.where(deleted_at: nil)
.limit(limit - rebaked)
# First, we'll try to backfill embeddings for topics that have none
topics
.where("#{table_name}.topic_id IS NULL")
.find_each do |t|
vector_rep.generate_topic_representation_from(t)
rebaked += 1
end
vector_rep.consider_indexing
return if rebaked >= limit
# Then, we'll try to backfill embeddings for topics that have outdated
# embeddings, be it model or strategy version
topics
.where(<<~SQL)
#{table_name}.model_version < #{vector_rep.version}
OR
#{table_name}.strategy_version < #{strategy.version}
SQL
.find_each do |t|
vector_rep.generate_topic_representation_from(t)
rebaked += 1
end
return if rebaked >= limit
# Finally, we'll try to backfill embeddings for topics that have outdated
# embeddings due to edits or new replies. Here we only do 10% of the limit
topics
.where("#{table_name}.updated_at < ?", 7.days.ago)
.order("random()")
.limit((limit - rebaked) / 10)
.pluck(:id)
.each do |id|
vector_rep.generate_topic_representation_from(Topic.find_by(id: id))
rebaked += 1
end
rebaked
end
end
end

View File

@ -30,7 +30,7 @@ module DiscourseAi
info << topic.title info << topic.title
info << "\n\n" info << "\n\n"
info << topic.category.name info << topic.category.name if topic&.category&.name
if SiteSetting.tagging_enabled if SiteSetting.tagging_enabled
info << "\n\n" info << "\n\n"
info << topic.tags.pluck(:name).join(", ") info << topic.tags.pluck(:name).join(", ")

View File

@ -12,20 +12,82 @@ module DiscourseAi
@strategy = strategy @strategy = strategy
end end
def create_index(lists, probes) def consider_indexing(memory: "100MB")
index_name = "#{table_name}_search" # Using extension maintainer's recommendation for ivfflat indexes
# Results are not as good as without indexes, but it's much faster
# Disk usage is ~1x the size of the table, so this doubles table total size
count = DB.query_single("SELECT count(*) FROM #{table_name};").first
lists = [count < 1_000_000 ? count / 1000 : Math.sqrt(count).to_i, 10].max
probes = [count < 1_000_000 ? lists / 10 : Math.sqrt(lists).to_i, 1].max
existing_index = DB.query_single(<<~SQL, index_name: index_name).first
SELECT
indexdef
FROM
pg_indexes
WHERE
indexname = :index_name
LIMIT 1
SQL
if !existing_index.present?
Rails.logger.info("Index #{index_name} does not exist, creating...")
return create_index!(memory, lists, probes)
end
existing_index_age =
DB
.query_single(
"SELECT pg_catalog.obj_description((:index_name)::regclass, 'pg_class');",
index_name: index_name,
)
.first
.to_i || 0
new_rows =
DB.query_single(
"SELECT count(*) FROM #{table_name} WHERE created_at > '#{Time.at(existing_index_age)}';",
).first
existing_lists = existing_index.match(/lists='(\d+)'/)&.captures&.first&.to_i
if existing_index_age > 0 && existing_index_age < 1.hour.ago.to_i
if new_rows > 10_000
Rails.logger.info(
"Index #{index_name} is #{existing_index_age} seconds old, and there are #{new_rows} new rows, updating...",
)
return create_index!(memory, lists, probes)
elsif existing_lists != lists
Rails.logger.info(
"Index #{index_name} already exists, but lists is #{existing_lists} instead of #{lists}, updating...",
)
return create_index!(memory, lists, probes)
end
end
Rails.logger.info(
"Index #{index_name} kept. #{Time.now.to_i - existing_index_age} seconds old, #{new_rows} new rows, #{existing_lists} lists, #{probes} probes.",
)
end
def create_index!(memory, lists, probes)
DB.exec("SET work_mem TO '#{memory}';")
DB.exec("SET maintenance_work_mem TO '#{memory}';")
DB.exec(<<~SQL) DB.exec(<<~SQL)
DROP INDEX IF EXISTS #{index_name}; DROP INDEX IF EXISTS #{index_name};
CREATE INDEX IF NOT EXISTS CREATE INDEX IF NOT EXISTS
#{index} #{index_name}
ON ON
#{table_name} #{table_name}
USING USING
ivfflat (embeddings #{pg_index_type}) ivfflat (embeddings #{pg_index_type})
WITH WITH
(lists = #{lists}); (lists = #{lists});
SQL SQL
DB.exec("COMMENT ON INDEX #{index_name} IS '#{Time.now.to_i}';")
DB.exec("RESET work_mem;")
DB.exec("RESET maintenance_work_mem;")
database = DB.query_single("SELECT current_database();").first
DB.exec("ALTER DATABASE #{database} SET ivfflat.probes = #{probes};")
end end
def vector_from(text) def vector_from(text)
@ -35,12 +97,21 @@ module DiscourseAi
def generate_topic_representation_from(target, persist: true) def generate_topic_representation_from(target, persist: true)
text = @strategy.prepare_text_from(target, tokenizer, max_sequence_length - 2) text = @strategy.prepare_text_from(target, tokenizer, max_sequence_length - 2)
vector_from(text).tap do |vector| new_digest = OpenSSL::Digest::SHA1.hexdigest(text)
if persist current_digest = DB.query_single(<<~SQL, topic_id: target.id).first
digest = OpenSSL::Digest::SHA1.hexdigest(text) SELECT
save_to_db(target, vector, digest) digest
end FROM
end #{table_name}
WHERE
topic_id = :topic_id
LIMIT 1
SQL
return if current_digest == new_digest
vector = vector_from(text)
save_to_db(target, vector, new_digest) if persist
end end
def topic_id_from_representation(raw_vector) def topic_id_from_representation(raw_vector)
@ -107,6 +178,10 @@ module DiscourseAi
"ai_topic_embeddings_#{id}_#{@strategy.id}" "ai_topic_embeddings_#{id}_#{@strategy.id}"
end end
def index_name
"#{table_name}_search"
end
def name def name
raise NotImplementedError raise NotImplementedError
end end

View File

@ -23,20 +23,8 @@ end
desc "Creates indexes for embeddings" desc "Creates indexes for embeddings"
task "ai:embeddings:index", [:work_mem] => [:environment] do |_, args| task "ai:embeddings:index", [:work_mem] => [:environment] do |_, args|
# Using extension maintainer's recommendation for ivfflat indexes
# Results are not as good as without indexes, but it's much faster
# Disk usage is ~1x the size of the table, so this doubles table total size
count = Topic.count
lists = count < 1_000_000 ? count / 1000 : Math.sqrt(count).to_i
probes = count < 1_000_000 ? lists / 10 : Math.sqrt(lists).to_i
vector_representation_klass = DiscourseAi::Embeddings::Vectors::Base.find_vector_representation
strategy = DiscourseAi::Embeddings::Strategies::Truncation.new strategy = DiscourseAi::Embeddings::Strategies::Truncation.new
vector_rep = DiscourseAi::Embeddings::VectorRepresentations::Base.current_representation(strategy)
DB.exec("SET work_mem TO '#{args[:work_mem] || "100MB"}';") vector_rep.consider_indexing(memory: args[:work_mem] || "100MB")
DB.exec("SET maintenance_work_mem TO '#{args[:work_mem] || "100MB"}';")
vector_representation_klass.new(strategy).create_index(lists, probes)
DB.exec("RESET work_mem;")
DB.exec("RESET maintenance_work_mem;")
DB.exec("ALTER SYSTEM SET ivfflat.probes = #{probes};")
end end