From 791fad1e6ac974d9280b31d19752f24810235157 Mon Sep 17 00:00:00 2001 From: Rafael dos Santos Silva Date: Mon, 14 Oct 2024 13:26:03 -0300 Subject: [PATCH] FEATURE: Index embeddings using bit vectors (#824) On very large sites, the rare cache misses for Related Topics can take around 200ms, which affects our p99 metric on the topic page. In order to mitigate this impact, we now have several tools at our disposal. First, one is to migrate the index embedding type from halfvec to bit and change the related topic query to leverage the new bit index by changing the search algorithm from inner product to Hamming distance. This will reduce our index sizes by 90%, severely reducing the impact of embeddings on our storage. By making the related query a bit smarter, we can have zero impact on recall by using the index to over-capture N*2 results, then re-ordering those N*2 using the full halfvec vectors and taking the top N. The expected impact is to go from 200ms to <20ms for cache misses and from a 2.5GB index to a 250MB index on a large site. Another tool is migrating our index type from IVFFLAT to HNSW, which can increase the cache misses performance even further, eventually putting us in the under 5ms territory. Co-authored-by: Roman Rizzi --- app/jobs/scheduled/embeddings_backfill.rb | 4 - ...ove_embeddings_to_single_table_per_type.rb | 1 - ...40_create_binary_indexes_for_embeddings.rb | 27 ++ ...41008055831_drop_old_embeddings_indexes.rb | 37 +++ lib/embeddings/vector_representations/base.rb | 238 ++++++------------ lib/tasks/modules/embeddings/database.rake | 8 - 6 files changed, 147 insertions(+), 168 deletions(-) create mode 100644 db/migrate/20241008054440_create_binary_indexes_for_embeddings.rb create mode 100644 db/post_migrate/20241008055831_drop_old_embeddings_indexes.rb diff --git a/app/jobs/scheduled/embeddings_backfill.rb b/app/jobs/scheduled/embeddings_backfill.rb index 28b7c25a..3300b479 100644 --- a/app/jobs/scheduled/embeddings_backfill.rb +++ b/app/jobs/scheduled/embeddings_backfill.rb @@ -35,8 +35,6 @@ module Jobs rebaked += populate_topic_embeddings(vector_rep, topics) - vector_rep.consider_indexing - return if rebaked >= limit # Then, we'll try to backfill embeddings for topics that have outdated @@ -82,8 +80,6 @@ module Jobs rebaked += 1 end - vector_rep.consider_indexing - return if rebaked >= limit # Then, we'll try to backfill embeddings for posts that have outdated diff --git a/db/migrate/20240611170905_move_embeddings_to_single_table_per_type.rb b/db/migrate/20240611170905_move_embeddings_to_single_table_per_type.rb index 55563763..a21a24b8 100644 --- a/db/migrate/20240611170905_move_embeddings_to_single_table_per_type.rb +++ b/db/migrate/20240611170905_move_embeddings_to_single_table_per_type.rb @@ -150,7 +150,6 @@ class MoveEmbeddingsToSingleTablePerType < ActiveRecord::Migration[7.0] strategy = DiscourseAi::Embeddings::Strategies::Truncation.new vector_rep = DiscourseAi::Embeddings::VectorRepresentations::Base.current_representation(strategy) - vector_rep.consider_indexing rescue StandardError => e Rails.logger.error("Failed to index embeddings: #{e}") end diff --git a/db/migrate/20241008054440_create_binary_indexes_for_embeddings.rb b/db/migrate/20241008054440_create_binary_indexes_for_embeddings.rb new file mode 100644 index 00000000..aa31a4e3 --- /dev/null +++ b/db/migrate/20241008054440_create_binary_indexes_for_embeddings.rb @@ -0,0 +1,27 @@ +# frozen_string_literal: true + +class CreateBinaryIndexesForEmbeddings < ActiveRecord::Migration[7.1] + def up + %w[topic post document_fragment].each do |type| + # our supported embeddings models IDs and dimensions + [ + [1, 768], + [2, 1536], + [3, 1024], + [4, 1024], + [5, 768], + [6, 1536], + [7, 2000], + [8, 1024], + ].each { |model_id, dimensions| execute <<-SQL } + CREATE INDEX ai_#{type}_embeddings_#{model_id}_1_search_bit ON ai_#{type}_embeddings + USING hnsw ((binary_quantize(embeddings)::bit(#{dimensions})) bit_hamming_ops) + WHERE model_id = #{model_id} AND strategy_id = 1; + SQL + end + end + + def down + raise ActiveRecord::IrreversibleMigration + end +end diff --git a/db/post_migrate/20241008055831_drop_old_embeddings_indexes.rb b/db/post_migrate/20241008055831_drop_old_embeddings_indexes.rb new file mode 100644 index 00000000..347d2b59 --- /dev/null +++ b/db/post_migrate/20241008055831_drop_old_embeddings_indexes.rb @@ -0,0 +1,37 @@ +# frozen_string_literal: true +class DropOldEmbeddingsIndexes < ActiveRecord::Migration[7.1] + def up + execute <<~SQL + DROP INDEX IF EXISTS ai_topic_embeddings_1_1_search; + DROP INDEX IF EXISTS ai_topic_embeddings_2_1_search; + DROP INDEX IF EXISTS ai_topic_embeddings_3_1_search; + DROP INDEX IF EXISTS ai_topic_embeddings_4_1_search; + DROP INDEX IF EXISTS ai_topic_embeddings_5_1_search; + DROP INDEX IF EXISTS ai_topic_embeddings_6_1_search; + DROP INDEX IF EXISTS ai_topic_embeddings_7_1_search; + DROP INDEX IF EXISTS ai_topic_embeddings_8_1_search; + + DROP INDEX IF EXISTS ai_post_embeddings_1_1_search; + DROP INDEX IF EXISTS ai_post_embeddings_2_1_search; + DROP INDEX IF EXISTS ai_post_embeddings_3_1_search; + DROP INDEX IF EXISTS ai_post_embeddings_4_1_search; + DROP INDEX IF EXISTS ai_post_embeddings_5_1_search; + DROP INDEX IF EXISTS ai_post_embeddings_6_1_search; + DROP INDEX IF EXISTS ai_post_embeddings_7_1_search; + DROP INDEX IF EXISTS ai_post_embeddings_8_1_search; + + DROP INDEX IF EXISTS ai_document_fragment_embeddings_1_1_search; + DROP INDEX IF EXISTS ai_document_fragment_embeddings_2_1_search; + DROP INDEX IF EXISTS ai_document_fragment_embeddings_3_1_search; + DROP INDEX IF EXISTS ai_document_fragment_embeddings_4_1_search; + DROP INDEX IF EXISTS ai_document_fragment_embeddings_5_1_search; + DROP INDEX IF EXISTS ai_document_fragment_embeddings_6_1_search; + DROP INDEX IF EXISTS ai_document_fragment_embeddings_7_1_search; + DROP INDEX IF EXISTS ai_document_fragment_embeddings_8_1_search; + SQL + end + + def down + raise ActiveRecord::IrreversibleMigration + end +end diff --git a/lib/embeddings/vector_representations/base.rb b/lib/embeddings/vector_representations/base.rb index db4d4a82..d8236181 100644 --- a/lib/embeddings/vector_representations/base.rb +++ b/lib/embeddings/vector_representations/base.rb @@ -46,113 +46,6 @@ module DiscourseAi @strategy = strategy end - def consider_indexing(memory: "100MB") - [topic_table_name, post_table_name].each do |table_name| - index_name = index_name(table_name) - # Using extension maintainer's recommendation for ivfflat indexes - # Results are not as good as without indexes, but it's much faster - # Disk usage is ~1x the size of the table, so this doubles table total size - count = - DB.query_single( - "SELECT count(*) FROM #{table_name} WHERE model_id = #{id} AND strategy_id = #{@strategy.id};", - ).first - lists = [count < 1_000_000 ? count / 1000 : Math.sqrt(count).to_i, 10].max - probes = [count < 1_000_000 ? lists / 10 : Math.sqrt(lists).to_i, 1].max - Discourse.cache.write("#{table_name}-#{id}-#{@strategy.id}-probes", probes) - - existing_index = DB.query_single(<<~SQL, index_name: index_name).first - SELECT - indexdef - FROM - pg_indexes - WHERE - indexname = :index_name - AND schemaname = 'public' - LIMIT 1 - SQL - - if !existing_index.present? - Rails.logger.info("Index #{index_name} does not exist, creating...") - return create_index!(table_name, memory, lists, probes) - end - - existing_index_age = - DB - .query_single( - "SELECT pg_catalog.obj_description((:index_name)::regclass, 'pg_class');", - index_name: index_name, - ) - .first - .to_i || 0 - new_rows = - DB.query_single( - "SELECT count(*) FROM #{table_name} WHERE model_id = #{id} AND strategy_id = #{@strategy.id} AND created_at > '#{Time.at(existing_index_age)}';", - ).first - existing_lists = existing_index.match(/lists='(\d+)'/)&.captures&.first&.to_i - - if existing_index_age > 0 && - existing_index_age < - ( - if SiteSetting.ai_embeddings_semantic_related_topics_enabled - 1.hour.ago.to_i - else - 1.day.ago.to_i - end - ) - if new_rows > 10_000 - Rails.logger.info( - "Index #{index_name} is #{existing_index_age} seconds old, and there are #{new_rows} new rows, updating...", - ) - return create_index!(table_name, memory, lists, probes) - elsif existing_lists != lists - Rails.logger.info( - "Index #{index_name} already exists, but lists is #{existing_lists} instead of #{lists}, updating...", - ) - return create_index!(table_name, memory, lists, probes) - end - end - - Rails.logger.info( - "Index #{index_name} kept. #{Time.now.to_i - existing_index_age} seconds old, #{new_rows} new rows, #{existing_lists} lists, #{probes} probes.", - ) - end - end - - def create_index!(table_name, memory, lists, probes) - tries = 0 - index_name = index_name(table_name) - DB.exec("SET work_mem TO '#{memory}';") - DB.exec("SET maintenance_work_mem TO '#{memory}';") - begin - DB.exec(<<~SQL) - DROP INDEX IF EXISTS #{index_name}; - CREATE INDEX IF NOT EXISTS - #{index_name} - ON - #{table_name} - USING - ivfflat ((embeddings::halfvec(#{dimensions})) #{pg_index_type}) - WITH - (lists = #{lists}) - WHERE - model_id = #{id} AND strategy_id = #{@strategy.id}; - SQL - rescue PG::ProgramLimitExceeded => e - parsed_error = e.message.match(/memory required is (\d+ [A-Z]{2}), ([a-z_]+)/) - if parsed_error[1].present? && parsed_error[2].present? - DB.exec("SET #{parsed_error[2]} TO '#{parsed_error[1].tr(" ", "")}';") - tries += 1 - retry if tries < 3 - else - raise e - end - end - - DB.exec("COMMENT ON INDEX #{index_name} IS '#{Time.now.to_i}';") - DB.exec("RESET work_mem;") - DB.exec("RESET maintenance_work_mem;") - end - def vector_from(text, asymetric: false) raise NotImplementedError end @@ -224,14 +117,23 @@ module DiscourseAi def asymmetric_topics_similarity_search(raw_vector, limit:, offset:, return_distance: false) results = DB.query(<<~SQL, query_embedding: raw_vector, limit: limit, offset: offset) - #{probes_sql(topic_table_name)} + WITH candidates AS ( + SELECT + topic_id, + embeddings::halfvec(#{dimensions}) AS embeddings + FROM + #{topic_table_name} + WHERE + model_id = #{id} AND strategy_id = #{@strategy.id} + ORDER BY + binary_quantize(embeddings)::bit(#{dimensions}) <~> binary_quantize('[:query_embedding]'::halfvec(#{dimensions})) + LIMIT :limit * 2 + ) SELECT topic_id, embeddings::halfvec(#{dimensions}) #{pg_function} '[:query_embedding]'::halfvec(#{dimensions}) AS distance FROM - #{topic_table_name} - WHERE - model_id = #{id} AND strategy_id = #{@strategy.id} + candidates ORDER BY embeddings::halfvec(#{dimensions}) #{pg_function} '[:query_embedding]'::halfvec(#{dimensions}) LIMIT :limit @@ -250,18 +152,23 @@ module DiscourseAi def asymmetric_posts_similarity_search(raw_vector, limit:, offset:, return_distance: false) results = DB.query(<<~SQL, query_embedding: raw_vector, limit: limit, offset: offset) - #{probes_sql(post_table_name)} + WITH candidates AS ( + SELECT + post_id, + embeddings::halfvec(#{dimensions}) AS embeddings + FROM + #{post_table_name} + WHERE + model_id = #{id} AND strategy_id = #{@strategy.id} + ORDER BY + binary_quantize(embeddings)::bit(#{dimensions}) <~> binary_quantize('[:query_embedding]'::halfvec(#{dimensions})) + LIMIT :limit * 2 + ) SELECT post_id, embeddings::halfvec(#{dimensions}) #{pg_function} '[:query_embedding]'::halfvec(#{dimensions}) AS distance FROM - #{post_table_name} - INNER JOIN - posts AS p ON p.id = post_id - INNER JOIN - topics AS t ON t.id = p.topic_id AND t.archetype = 'regular' - WHERE - model_id = #{id} AND strategy_id = #{@strategy.id} + candidates ORDER BY embeddings::halfvec(#{dimensions}) #{pg_function} '[:query_embedding]'::halfvec(#{dimensions}) LIMIT :limit @@ -286,32 +193,41 @@ module DiscourseAi offset:, return_distance: false ) + # A too low limit exacerbates the the recall loss of binary quantization + binary_search_limit = [limit * 2, 100].max results = DB.query( <<~SQL, - #{probes_sql(post_table_name)} - SELECT - rag_document_fragment_id, - embeddings::halfvec(#{dimensions}) #{pg_function} '[:query_embedding]'::halfvec(#{dimensions}) AS distance - FROM - #{rag_fragments_table_name} - INNER JOIN - rag_document_fragments AS rdf ON rdf.id = rag_document_fragment_id - WHERE - model_id = #{id} AND - strategy_id = #{@strategy.id} AND - rdf.target_id = :target_id AND - rdf.target_type = :target_type - ORDER BY - embeddings::halfvec(#{dimensions}) #{pg_function} '[:query_embedding]'::halfvec(#{dimensions}) - LIMIT :limit - OFFSET :offset - SQL + WITH candidates AS ( + SELECT + rag_document_fragment_id, + embeddings::halfvec(#{dimensions}) AS embeddings + FROM + #{rag_fragments_table_name} + INNER JOIN + rag_document_fragments ON rag_document_fragments.id = rag_document_fragment_id + WHERE + model_id = #{id} AND strategy_id = #{@strategy.id} + ORDER BY + binary_quantize(embeddings)::bit(#{dimensions}) <~> binary_quantize('[:query_embedding]'::halfvec(#{dimensions})) + LIMIT :binary_search_limit + ) + SELECT + rag_document_fragment_id, + embeddings::halfvec(#{dimensions}) #{pg_function} '[:query_embedding]'::halfvec(#{dimensions}) AS distance + FROM + candidates + ORDER BY + embeddings::halfvec(#{dimensions}) #{pg_function} '[:query_embedding]'::halfvec(#{dimensions}) + LIMIT :limit + OFFSET :offset + SQL query_embedding: raw_vector, target_id: target_id, target_type: target_type, limit: limit, offset: offset, + binary_search_limit: binary_search_limit, ) if return_distance @@ -326,17 +242,8 @@ module DiscourseAi def symmetric_topics_similarity_search(topic) DB.query(<<~SQL, topic_id: topic.id).map(&:topic_id) - #{probes_sql(topic_table_name)} - SELECT - topic_id - FROM - #{topic_table_name} - WHERE - model_id = #{id} AND - strategy_id = #{@strategy.id} - ORDER BY - embeddings::halfvec(#{dimensions}) #{pg_function} ( - SELECT + WITH le_target AS ( + SELECT embeddings FROM #{topic_table_name} @@ -345,8 +252,34 @@ module DiscourseAi strategy_id = #{@strategy.id} AND topic_id = :topic_id LIMIT 1 - )::halfvec(#{dimensions}) - LIMIT 100 + ) + SELECT topic_id FROM ( + SELECT + topic_id, embeddings + FROM + #{topic_table_name} + WHERE + model_id = #{id} AND + strategy_id = #{@strategy.id} + ORDER BY + binary_quantize(embeddings)::bit(#{dimensions}) <~> ( + SELECT + binary_quantize(embeddings)::bit(#{dimensions}) + FROM + le_target + LIMIT 1 + ) + LIMIT 200 + ) AS widenet + ORDER BY + embeddings::halfvec(#{dimensions}) #{pg_function} ( + SELECT + embeddings::halfvec(#{dimensions}) + FROM + le_target + LIMIT 1 + ) + LIMIT 100; SQL rescue PG::Error => e Rails.logger.error( @@ -384,11 +317,6 @@ module DiscourseAi "#{table_name}_#{id}_#{@strategy.id}_search" end - def probes_sql(table_name) - probes = Discourse.cache.read("#{table_name}-#{id}-#{@strategy.id}-probes") - probes.present? ? "SET LOCAL ivfflat.probes TO #{probes};" : "" - end - def name raise NotImplementedError end diff --git a/lib/tasks/modules/embeddings/database.rake b/lib/tasks/modules/embeddings/database.rake index 1e6a55dc..83f652a0 100644 --- a/lib/tasks/modules/embeddings/database.rake +++ b/lib/tasks/modules/embeddings/database.rake @@ -44,11 +44,3 @@ task "ai:embeddings:backfill", %i[model concurrency] => [:environment] do |_, ar end end end - -desc "Creates indexes for embeddings" -task "ai:embeddings:index", [:work_mem] => [:environment] do |_, args| - strategy = DiscourseAi::Embeddings::Strategies::Truncation.new - vector_rep = DiscourseAi::Embeddings::VectorRepresentations::Base.current_representation(strategy) - - vector_rep.consider_indexing(memory: args[:work_mem] || "100MB") -end