diff --git a/config/locales/server.en.yml b/config/locales/server.en.yml index df5e25c9..1dcf5dc1 100644 --- a/config/locales/server.en.yml +++ b/config/locales/server.en.yml @@ -60,6 +60,7 @@ en: ai_embeddings_generate_for_pms: "Generate embeddings for personal messages." ai_embeddings_semantic_related_topics_enabled: "Use Semantic Search for related topics." ai_embeddings_semantic_related_topics: "Maximum number of topics to show in related topic section." + ai_embeddings_backfill_batch_size: "Number of embeddings to backfill every 15 minutes." ai_embeddings_pg_connection_string: "PostgreSQL connection string for the embeddings module. Needs pgvector extension enabled and a series of tables created. See docs for more info." ai_embeddings_semantic_search_enabled: "Enable full-page semantic search." ai_embeddings_semantic_related_include_closed_topics: "Include closed topics in semantic search results" diff --git a/config/settings.yml b/config/settings.yml index 82622e86..33625e81 100644 --- a/config/settings.yml +++ b/config/settings.yml @@ -200,6 +200,9 @@ discourse_ai: client: true ai_embeddings_semantic_related_topics: 5 ai_embeddings_semantic_related_include_closed_topics: true + ai_embeddings_backfill_batch_size: + default: 250 + hidden: true ai_embeddings_pg_connection_string: default: "" hidden: true diff --git a/lib/modules/embeddings/entry_point.rb b/lib/modules/embeddings/entry_point.rb index be3e577e..c2582930 100644 --- a/lib/modules/embeddings/entry_point.rb +++ b/lib/modules/embeddings/entry_point.rb @@ -11,6 +11,7 @@ module DiscourseAi require_relative "vector_representations/bge_large_en" require_relative "strategies/truncation" require_relative "jobs/regular/generate_embeddings" + require_relative "jobs/scheduled/embeddings_backfill" require_relative "semantic_related" require_relative "semantic_topic_query" diff --git a/lib/modules/embeddings/jobs/regular/generate_embeddings.rb b/lib/modules/embeddings/jobs/regular/generate_embeddings.rb index a76f1ad8..b67cbb1f 100644 --- a/lib/modules/embeddings/jobs/regular/generate_embeddings.rb +++ b/lib/modules/embeddings/jobs/regular/generate_embeddings.rb @@ -2,6 +2,8 @@ module Jobs class GenerateEmbeddings < ::Jobs::Base + sidekiq_options queue: "low" + def execute(args) return unless SiteSetting.ai_embeddings_enabled return if (topic_id = args[:topic_id]).blank? diff --git a/lib/modules/embeddings/jobs/scheduled/embeddings_backfill.rb b/lib/modules/embeddings/jobs/scheduled/embeddings_backfill.rb new file mode 100644 index 00000000..fc59fff4 --- /dev/null +++ b/lib/modules/embeddings/jobs/scheduled/embeddings_backfill.rb @@ -0,0 +1,69 @@ +# frozen_string_literal: true + +module Jobs + class EmbeddingsBackfill < ::Jobs::Scheduled + every 15.minutes + sidekiq_options queue: "low" + cluster_concurrency 1 + + def execute(args) + return unless SiteSetting.ai_embeddings_enabled + + limit = SiteSetting.ai_embeddings_backfill_batch_size + rebaked = 0 + + strategy = DiscourseAi::Embeddings::Strategies::Truncation.new + vector_rep = + DiscourseAi::Embeddings::VectorRepresentations::Base.current_representation(strategy) + table_name = vector_rep.table_name + + topics = + Topic + .joins("LEFT JOIN #{table_name} ON #{table_name}.topic_id = topics.id") + .where(archetype: Archetype.default) + .where(deleted_at: nil) + .limit(limit - rebaked) + + # First, we'll try to backfill embeddings for topics that have none + topics + .where("#{table_name}.topic_id IS NULL") + .find_each do |t| + vector_rep.generate_topic_representation_from(t) + rebaked += 1 + end + + vector_rep.consider_indexing + + return if rebaked >= limit + + # Then, we'll try to backfill embeddings for topics that have outdated + # embeddings, be it model or strategy version + topics + .where(<<~SQL) + #{table_name}.model_version < #{vector_rep.version} + OR + #{table_name}.strategy_version < #{strategy.version} + SQL + .find_each do |t| + vector_rep.generate_topic_representation_from(t) + rebaked += 1 + end + + return if rebaked >= limit + + # Finally, we'll try to backfill embeddings for topics that have outdated + # embeddings due to edits or new replies. Here we only do 10% of the limit + topics + .where("#{table_name}.updated_at < ?", 7.days.ago) + .order("random()") + .limit((limit - rebaked) / 10) + .pluck(:id) + .each do |id| + vector_rep.generate_topic_representation_from(Topic.find_by(id: id)) + rebaked += 1 + end + + rebaked + end + end +end diff --git a/lib/modules/embeddings/strategies/truncation.rb b/lib/modules/embeddings/strategies/truncation.rb index 4b2c977a..e88693cb 100644 --- a/lib/modules/embeddings/strategies/truncation.rb +++ b/lib/modules/embeddings/strategies/truncation.rb @@ -30,7 +30,7 @@ module DiscourseAi info << topic.title info << "\n\n" - info << topic.category.name + info << topic.category.name if topic&.category&.name if SiteSetting.tagging_enabled info << "\n\n" info << topic.tags.pluck(:name).join(", ") diff --git a/lib/modules/embeddings/vector_representations/base.rb b/lib/modules/embeddings/vector_representations/base.rb index 7e4c66a8..caf20244 100644 --- a/lib/modules/embeddings/vector_representations/base.rb +++ b/lib/modules/embeddings/vector_representations/base.rb @@ -12,20 +12,82 @@ module DiscourseAi @strategy = strategy end - def create_index(lists, probes) - index_name = "#{table_name}_search" + def consider_indexing(memory: "100MB") + # Using extension maintainer's recommendation for ivfflat indexes + # Results are not as good as without indexes, but it's much faster + # Disk usage is ~1x the size of the table, so this doubles table total size + count = DB.query_single("SELECT count(*) FROM #{table_name};").first + lists = [count < 1_000_000 ? count / 1000 : Math.sqrt(count).to_i, 10].max + probes = [count < 1_000_000 ? lists / 10 : Math.sqrt(lists).to_i, 1].max + existing_index = DB.query_single(<<~SQL, index_name: index_name).first + SELECT + indexdef + FROM + pg_indexes + WHERE + indexname = :index_name + LIMIT 1 + SQL + + if !existing_index.present? + Rails.logger.info("Index #{index_name} does not exist, creating...") + return create_index!(memory, lists, probes) + end + + existing_index_age = + DB + .query_single( + "SELECT pg_catalog.obj_description((:index_name)::regclass, 'pg_class');", + index_name: index_name, + ) + .first + .to_i || 0 + new_rows = + DB.query_single( + "SELECT count(*) FROM #{table_name} WHERE created_at > '#{Time.at(existing_index_age)}';", + ).first + existing_lists = existing_index.match(/lists='(\d+)'/)&.captures&.first&.to_i + + if existing_index_age > 0 && existing_index_age < 1.hour.ago.to_i + if new_rows > 10_000 + Rails.logger.info( + "Index #{index_name} is #{existing_index_age} seconds old, and there are #{new_rows} new rows, updating...", + ) + return create_index!(memory, lists, probes) + elsif existing_lists != lists + Rails.logger.info( + "Index #{index_name} already exists, but lists is #{existing_lists} instead of #{lists}, updating...", + ) + return create_index!(memory, lists, probes) + end + end + + Rails.logger.info( + "Index #{index_name} kept. #{Time.now.to_i - existing_index_age} seconds old, #{new_rows} new rows, #{existing_lists} lists, #{probes} probes.", + ) + end + + def create_index!(memory, lists, probes) + DB.exec("SET work_mem TO '#{memory}';") + DB.exec("SET maintenance_work_mem TO '#{memory}';") DB.exec(<<~SQL) DROP INDEX IF EXISTS #{index_name}; CREATE INDEX IF NOT EXISTS - #{index} + #{index_name} ON #{table_name} USING ivfflat (embeddings #{pg_index_type}) WITH (lists = #{lists}); - SQL + SQL + DB.exec("COMMENT ON INDEX #{index_name} IS '#{Time.now.to_i}';") + DB.exec("RESET work_mem;") + DB.exec("RESET maintenance_work_mem;") + + database = DB.query_single("SELECT current_database();").first + DB.exec("ALTER DATABASE #{database} SET ivfflat.probes = #{probes};") end def vector_from(text) @@ -35,12 +97,21 @@ module DiscourseAi def generate_topic_representation_from(target, persist: true) text = @strategy.prepare_text_from(target, tokenizer, max_sequence_length - 2) - vector_from(text).tap do |vector| - if persist - digest = OpenSSL::Digest::SHA1.hexdigest(text) - save_to_db(target, vector, digest) - end - end + new_digest = OpenSSL::Digest::SHA1.hexdigest(text) + current_digest = DB.query_single(<<~SQL, topic_id: target.id).first + SELECT + digest + FROM + #{table_name} + WHERE + topic_id = :topic_id + LIMIT 1 + SQL + return if current_digest == new_digest + + vector = vector_from(text) + + save_to_db(target, vector, new_digest) if persist end def topic_id_from_representation(raw_vector) @@ -107,6 +178,10 @@ module DiscourseAi "ai_topic_embeddings_#{id}_#{@strategy.id}" end + def index_name + "#{table_name}_search" + end + def name raise NotImplementedError end diff --git a/lib/tasks/modules/embeddings/database.rake b/lib/tasks/modules/embeddings/database.rake index 6bec6765..c89e7926 100644 --- a/lib/tasks/modules/embeddings/database.rake +++ b/lib/tasks/modules/embeddings/database.rake @@ -23,20 +23,8 @@ end desc "Creates indexes for embeddings" task "ai:embeddings:index", [:work_mem] => [:environment] do |_, args| - # Using extension maintainer's recommendation for ivfflat indexes - # Results are not as good as without indexes, but it's much faster - # Disk usage is ~1x the size of the table, so this doubles table total size - count = Topic.count - lists = count < 1_000_000 ? count / 1000 : Math.sqrt(count).to_i - probes = count < 1_000_000 ? lists / 10 : Math.sqrt(lists).to_i - - vector_representation_klass = DiscourseAi::Embeddings::Vectors::Base.find_vector_representation strategy = DiscourseAi::Embeddings::Strategies::Truncation.new + vector_rep = DiscourseAi::Embeddings::VectorRepresentations::Base.current_representation(strategy) - DB.exec("SET work_mem TO '#{args[:work_mem] || "100MB"}';") - DB.exec("SET maintenance_work_mem TO '#{args[:work_mem] || "100MB"}';") - vector_representation_klass.new(strategy).create_index(lists, probes) - DB.exec("RESET work_mem;") - DB.exec("RESET maintenance_work_mem;") - DB.exec("ALTER SYSTEM SET ivfflat.probes = #{probes};") + vector_rep.consider_indexing(memory: args[:work_mem] || "100MB") end