FEATURE: Index embeddings using bit vectors (#824)
On very large sites, the rare cache misses for Related Topics can take around 200ms, which affects our p99 metric on the topic page. In order to mitigate this impact, we now have several tools at our disposal. First, one is to migrate the index embedding type from halfvec to bit and change the related topic query to leverage the new bit index by changing the search algorithm from inner product to Hamming distance. This will reduce our index sizes by 90%, severely reducing the impact of embeddings on our storage. By making the related query a bit smarter, we can have zero impact on recall by using the index to over-capture N*2 results, then re-ordering those N*2 using the full halfvec vectors and taking the top N. The expected impact is to go from 200ms to <20ms for cache misses and from a 2.5GB index to a 250MB index on a large site. Another tool is migrating our index type from IVFFLAT to HNSW, which can increase the cache misses performance even further, eventually putting us in the under 5ms territory. Co-authored-by: Roman Rizzi <roman@discourse.org>
This commit is contained in:
parent
6615104389
commit
791fad1e6a
|
@ -35,8 +35,6 @@ module Jobs
|
|||
|
||||
rebaked += populate_topic_embeddings(vector_rep, topics)
|
||||
|
||||
vector_rep.consider_indexing
|
||||
|
||||
return if rebaked >= limit
|
||||
|
||||
# Then, we'll try to backfill embeddings for topics that have outdated
|
||||
|
@ -82,8 +80,6 @@ module Jobs
|
|||
rebaked += 1
|
||||
end
|
||||
|
||||
vector_rep.consider_indexing
|
||||
|
||||
return if rebaked >= limit
|
||||
|
||||
# Then, we'll try to backfill embeddings for posts that have outdated
|
||||
|
|
|
@ -150,7 +150,6 @@ class MoveEmbeddingsToSingleTablePerType < ActiveRecord::Migration[7.0]
|
|||
strategy = DiscourseAi::Embeddings::Strategies::Truncation.new
|
||||
vector_rep =
|
||||
DiscourseAi::Embeddings::VectorRepresentations::Base.current_representation(strategy)
|
||||
vector_rep.consider_indexing
|
||||
rescue StandardError => e
|
||||
Rails.logger.error("Failed to index embeddings: #{e}")
|
||||
end
|
||||
|
|
|
@ -0,0 +1,27 @@
|
|||
# frozen_string_literal: true
|
||||
|
||||
class CreateBinaryIndexesForEmbeddings < ActiveRecord::Migration[7.1]
|
||||
def up
|
||||
%w[topic post document_fragment].each do |type|
|
||||
# our supported embeddings models IDs and dimensions
|
||||
[
|
||||
[1, 768],
|
||||
[2, 1536],
|
||||
[3, 1024],
|
||||
[4, 1024],
|
||||
[5, 768],
|
||||
[6, 1536],
|
||||
[7, 2000],
|
||||
[8, 1024],
|
||||
].each { |model_id, dimensions| execute <<-SQL }
|
||||
CREATE INDEX ai_#{type}_embeddings_#{model_id}_1_search_bit ON ai_#{type}_embeddings
|
||||
USING hnsw ((binary_quantize(embeddings)::bit(#{dimensions})) bit_hamming_ops)
|
||||
WHERE model_id = #{model_id} AND strategy_id = 1;
|
||||
SQL
|
||||
end
|
||||
end
|
||||
|
||||
def down
|
||||
raise ActiveRecord::IrreversibleMigration
|
||||
end
|
||||
end
|
|
@ -0,0 +1,37 @@
|
|||
# frozen_string_literal: true
|
||||
class DropOldEmbeddingsIndexes < ActiveRecord::Migration[7.1]
|
||||
def up
|
||||
execute <<~SQL
|
||||
DROP INDEX IF EXISTS ai_topic_embeddings_1_1_search;
|
||||
DROP INDEX IF EXISTS ai_topic_embeddings_2_1_search;
|
||||
DROP INDEX IF EXISTS ai_topic_embeddings_3_1_search;
|
||||
DROP INDEX IF EXISTS ai_topic_embeddings_4_1_search;
|
||||
DROP INDEX IF EXISTS ai_topic_embeddings_5_1_search;
|
||||
DROP INDEX IF EXISTS ai_topic_embeddings_6_1_search;
|
||||
DROP INDEX IF EXISTS ai_topic_embeddings_7_1_search;
|
||||
DROP INDEX IF EXISTS ai_topic_embeddings_8_1_search;
|
||||
|
||||
DROP INDEX IF EXISTS ai_post_embeddings_1_1_search;
|
||||
DROP INDEX IF EXISTS ai_post_embeddings_2_1_search;
|
||||
DROP INDEX IF EXISTS ai_post_embeddings_3_1_search;
|
||||
DROP INDEX IF EXISTS ai_post_embeddings_4_1_search;
|
||||
DROP INDEX IF EXISTS ai_post_embeddings_5_1_search;
|
||||
DROP INDEX IF EXISTS ai_post_embeddings_6_1_search;
|
||||
DROP INDEX IF EXISTS ai_post_embeddings_7_1_search;
|
||||
DROP INDEX IF EXISTS ai_post_embeddings_8_1_search;
|
||||
|
||||
DROP INDEX IF EXISTS ai_document_fragment_embeddings_1_1_search;
|
||||
DROP INDEX IF EXISTS ai_document_fragment_embeddings_2_1_search;
|
||||
DROP INDEX IF EXISTS ai_document_fragment_embeddings_3_1_search;
|
||||
DROP INDEX IF EXISTS ai_document_fragment_embeddings_4_1_search;
|
||||
DROP INDEX IF EXISTS ai_document_fragment_embeddings_5_1_search;
|
||||
DROP INDEX IF EXISTS ai_document_fragment_embeddings_6_1_search;
|
||||
DROP INDEX IF EXISTS ai_document_fragment_embeddings_7_1_search;
|
||||
DROP INDEX IF EXISTS ai_document_fragment_embeddings_8_1_search;
|
||||
SQL
|
||||
end
|
||||
|
||||
def down
|
||||
raise ActiveRecord::IrreversibleMigration
|
||||
end
|
||||
end
|
|
@ -46,113 +46,6 @@ module DiscourseAi
|
|||
@strategy = strategy
|
||||
end
|
||||
|
||||
def consider_indexing(memory: "100MB")
|
||||
[topic_table_name, post_table_name].each do |table_name|
|
||||
index_name = index_name(table_name)
|
||||
# Using extension maintainer's recommendation for ivfflat indexes
|
||||
# Results are not as good as without indexes, but it's much faster
|
||||
# Disk usage is ~1x the size of the table, so this doubles table total size
|
||||
count =
|
||||
DB.query_single(
|
||||
"SELECT count(*) FROM #{table_name} WHERE model_id = #{id} AND strategy_id = #{@strategy.id};",
|
||||
).first
|
||||
lists = [count < 1_000_000 ? count / 1000 : Math.sqrt(count).to_i, 10].max
|
||||
probes = [count < 1_000_000 ? lists / 10 : Math.sqrt(lists).to_i, 1].max
|
||||
Discourse.cache.write("#{table_name}-#{id}-#{@strategy.id}-probes", probes)
|
||||
|
||||
existing_index = DB.query_single(<<~SQL, index_name: index_name).first
|
||||
SELECT
|
||||
indexdef
|
||||
FROM
|
||||
pg_indexes
|
||||
WHERE
|
||||
indexname = :index_name
|
||||
AND schemaname = 'public'
|
||||
LIMIT 1
|
||||
SQL
|
||||
|
||||
if !existing_index.present?
|
||||
Rails.logger.info("Index #{index_name} does not exist, creating...")
|
||||
return create_index!(table_name, memory, lists, probes)
|
||||
end
|
||||
|
||||
existing_index_age =
|
||||
DB
|
||||
.query_single(
|
||||
"SELECT pg_catalog.obj_description((:index_name)::regclass, 'pg_class');",
|
||||
index_name: index_name,
|
||||
)
|
||||
.first
|
||||
.to_i || 0
|
||||
new_rows =
|
||||
DB.query_single(
|
||||
"SELECT count(*) FROM #{table_name} WHERE model_id = #{id} AND strategy_id = #{@strategy.id} AND created_at > '#{Time.at(existing_index_age)}';",
|
||||
).first
|
||||
existing_lists = existing_index.match(/lists='(\d+)'/)&.captures&.first&.to_i
|
||||
|
||||
if existing_index_age > 0 &&
|
||||
existing_index_age <
|
||||
(
|
||||
if SiteSetting.ai_embeddings_semantic_related_topics_enabled
|
||||
1.hour.ago.to_i
|
||||
else
|
||||
1.day.ago.to_i
|
||||
end
|
||||
)
|
||||
if new_rows > 10_000
|
||||
Rails.logger.info(
|
||||
"Index #{index_name} is #{existing_index_age} seconds old, and there are #{new_rows} new rows, updating...",
|
||||
)
|
||||
return create_index!(table_name, memory, lists, probes)
|
||||
elsif existing_lists != lists
|
||||
Rails.logger.info(
|
||||
"Index #{index_name} already exists, but lists is #{existing_lists} instead of #{lists}, updating...",
|
||||
)
|
||||
return create_index!(table_name, memory, lists, probes)
|
||||
end
|
||||
end
|
||||
|
||||
Rails.logger.info(
|
||||
"Index #{index_name} kept. #{Time.now.to_i - existing_index_age} seconds old, #{new_rows} new rows, #{existing_lists} lists, #{probes} probes.",
|
||||
)
|
||||
end
|
||||
end
|
||||
|
||||
def create_index!(table_name, memory, lists, probes)
|
||||
tries = 0
|
||||
index_name = index_name(table_name)
|
||||
DB.exec("SET work_mem TO '#{memory}';")
|
||||
DB.exec("SET maintenance_work_mem TO '#{memory}';")
|
||||
begin
|
||||
DB.exec(<<~SQL)
|
||||
DROP INDEX IF EXISTS #{index_name};
|
||||
CREATE INDEX IF NOT EXISTS
|
||||
#{index_name}
|
||||
ON
|
||||
#{table_name}
|
||||
USING
|
||||
ivfflat ((embeddings::halfvec(#{dimensions})) #{pg_index_type})
|
||||
WITH
|
||||
(lists = #{lists})
|
||||
WHERE
|
||||
model_id = #{id} AND strategy_id = #{@strategy.id};
|
||||
SQL
|
||||
rescue PG::ProgramLimitExceeded => e
|
||||
parsed_error = e.message.match(/memory required is (\d+ [A-Z]{2}), ([a-z_]+)/)
|
||||
if parsed_error[1].present? && parsed_error[2].present?
|
||||
DB.exec("SET #{parsed_error[2]} TO '#{parsed_error[1].tr(" ", "")}';")
|
||||
tries += 1
|
||||
retry if tries < 3
|
||||
else
|
||||
raise e
|
||||
end
|
||||
end
|
||||
|
||||
DB.exec("COMMENT ON INDEX #{index_name} IS '#{Time.now.to_i}';")
|
||||
DB.exec("RESET work_mem;")
|
||||
DB.exec("RESET maintenance_work_mem;")
|
||||
end
|
||||
|
||||
def vector_from(text, asymetric: false)
|
||||
raise NotImplementedError
|
||||
end
|
||||
|
@ -224,14 +117,23 @@ module DiscourseAi
|
|||
|
||||
def asymmetric_topics_similarity_search(raw_vector, limit:, offset:, return_distance: false)
|
||||
results = DB.query(<<~SQL, query_embedding: raw_vector, limit: limit, offset: offset)
|
||||
#{probes_sql(topic_table_name)}
|
||||
WITH candidates AS (
|
||||
SELECT
|
||||
topic_id,
|
||||
embeddings::halfvec(#{dimensions}) AS embeddings
|
||||
FROM
|
||||
#{topic_table_name}
|
||||
WHERE
|
||||
model_id = #{id} AND strategy_id = #{@strategy.id}
|
||||
ORDER BY
|
||||
binary_quantize(embeddings)::bit(#{dimensions}) <~> binary_quantize('[:query_embedding]'::halfvec(#{dimensions}))
|
||||
LIMIT :limit * 2
|
||||
)
|
||||
SELECT
|
||||
topic_id,
|
||||
embeddings::halfvec(#{dimensions}) #{pg_function} '[:query_embedding]'::halfvec(#{dimensions}) AS distance
|
||||
FROM
|
||||
#{topic_table_name}
|
||||
WHERE
|
||||
model_id = #{id} AND strategy_id = #{@strategy.id}
|
||||
candidates
|
||||
ORDER BY
|
||||
embeddings::halfvec(#{dimensions}) #{pg_function} '[:query_embedding]'::halfvec(#{dimensions})
|
||||
LIMIT :limit
|
||||
|
@ -250,18 +152,23 @@ module DiscourseAi
|
|||
|
||||
def asymmetric_posts_similarity_search(raw_vector, limit:, offset:, return_distance: false)
|
||||
results = DB.query(<<~SQL, query_embedding: raw_vector, limit: limit, offset: offset)
|
||||
#{probes_sql(post_table_name)}
|
||||
WITH candidates AS (
|
||||
SELECT
|
||||
post_id,
|
||||
embeddings::halfvec(#{dimensions}) AS embeddings
|
||||
FROM
|
||||
#{post_table_name}
|
||||
WHERE
|
||||
model_id = #{id} AND strategy_id = #{@strategy.id}
|
||||
ORDER BY
|
||||
binary_quantize(embeddings)::bit(#{dimensions}) <~> binary_quantize('[:query_embedding]'::halfvec(#{dimensions}))
|
||||
LIMIT :limit * 2
|
||||
)
|
||||
SELECT
|
||||
post_id,
|
||||
embeddings::halfvec(#{dimensions}) #{pg_function} '[:query_embedding]'::halfvec(#{dimensions}) AS distance
|
||||
FROM
|
||||
#{post_table_name}
|
||||
INNER JOIN
|
||||
posts AS p ON p.id = post_id
|
||||
INNER JOIN
|
||||
topics AS t ON t.id = p.topic_id AND t.archetype = 'regular'
|
||||
WHERE
|
||||
model_id = #{id} AND strategy_id = #{@strategy.id}
|
||||
candidates
|
||||
ORDER BY
|
||||
embeddings::halfvec(#{dimensions}) #{pg_function} '[:query_embedding]'::halfvec(#{dimensions})
|
||||
LIMIT :limit
|
||||
|
@ -286,32 +193,41 @@ module DiscourseAi
|
|||
offset:,
|
||||
return_distance: false
|
||||
)
|
||||
# A too low limit exacerbates the the recall loss of binary quantization
|
||||
binary_search_limit = [limit * 2, 100].max
|
||||
results =
|
||||
DB.query(
|
||||
<<~SQL,
|
||||
#{probes_sql(post_table_name)}
|
||||
SELECT
|
||||
rag_document_fragment_id,
|
||||
embeddings::halfvec(#{dimensions}) #{pg_function} '[:query_embedding]'::halfvec(#{dimensions}) AS distance
|
||||
FROM
|
||||
#{rag_fragments_table_name}
|
||||
INNER JOIN
|
||||
rag_document_fragments AS rdf ON rdf.id = rag_document_fragment_id
|
||||
WHERE
|
||||
model_id = #{id} AND
|
||||
strategy_id = #{@strategy.id} AND
|
||||
rdf.target_id = :target_id AND
|
||||
rdf.target_type = :target_type
|
||||
ORDER BY
|
||||
embeddings::halfvec(#{dimensions}) #{pg_function} '[:query_embedding]'::halfvec(#{dimensions})
|
||||
LIMIT :limit
|
||||
OFFSET :offset
|
||||
SQL
|
||||
WITH candidates AS (
|
||||
SELECT
|
||||
rag_document_fragment_id,
|
||||
embeddings::halfvec(#{dimensions}) AS embeddings
|
||||
FROM
|
||||
#{rag_fragments_table_name}
|
||||
INNER JOIN
|
||||
rag_document_fragments ON rag_document_fragments.id = rag_document_fragment_id
|
||||
WHERE
|
||||
model_id = #{id} AND strategy_id = #{@strategy.id}
|
||||
ORDER BY
|
||||
binary_quantize(embeddings)::bit(#{dimensions}) <~> binary_quantize('[:query_embedding]'::halfvec(#{dimensions}))
|
||||
LIMIT :binary_search_limit
|
||||
)
|
||||
SELECT
|
||||
rag_document_fragment_id,
|
||||
embeddings::halfvec(#{dimensions}) #{pg_function} '[:query_embedding]'::halfvec(#{dimensions}) AS distance
|
||||
FROM
|
||||
candidates
|
||||
ORDER BY
|
||||
embeddings::halfvec(#{dimensions}) #{pg_function} '[:query_embedding]'::halfvec(#{dimensions})
|
||||
LIMIT :limit
|
||||
OFFSET :offset
|
||||
SQL
|
||||
query_embedding: raw_vector,
|
||||
target_id: target_id,
|
||||
target_type: target_type,
|
||||
limit: limit,
|
||||
offset: offset,
|
||||
binary_search_limit: binary_search_limit,
|
||||
)
|
||||
|
||||
if return_distance
|
||||
|
@ -326,17 +242,8 @@ module DiscourseAi
|
|||
|
||||
def symmetric_topics_similarity_search(topic)
|
||||
DB.query(<<~SQL, topic_id: topic.id).map(&:topic_id)
|
||||
#{probes_sql(topic_table_name)}
|
||||
SELECT
|
||||
topic_id
|
||||
FROM
|
||||
#{topic_table_name}
|
||||
WHERE
|
||||
model_id = #{id} AND
|
||||
strategy_id = #{@strategy.id}
|
||||
ORDER BY
|
||||
embeddings::halfvec(#{dimensions}) #{pg_function} (
|
||||
SELECT
|
||||
WITH le_target AS (
|
||||
SELECT
|
||||
embeddings
|
||||
FROM
|
||||
#{topic_table_name}
|
||||
|
@ -345,8 +252,34 @@ module DiscourseAi
|
|||
strategy_id = #{@strategy.id} AND
|
||||
topic_id = :topic_id
|
||||
LIMIT 1
|
||||
)::halfvec(#{dimensions})
|
||||
LIMIT 100
|
||||
)
|
||||
SELECT topic_id FROM (
|
||||
SELECT
|
||||
topic_id, embeddings
|
||||
FROM
|
||||
#{topic_table_name}
|
||||
WHERE
|
||||
model_id = #{id} AND
|
||||
strategy_id = #{@strategy.id}
|
||||
ORDER BY
|
||||
binary_quantize(embeddings)::bit(#{dimensions}) <~> (
|
||||
SELECT
|
||||
binary_quantize(embeddings)::bit(#{dimensions})
|
||||
FROM
|
||||
le_target
|
||||
LIMIT 1
|
||||
)
|
||||
LIMIT 200
|
||||
) AS widenet
|
||||
ORDER BY
|
||||
embeddings::halfvec(#{dimensions}) #{pg_function} (
|
||||
SELECT
|
||||
embeddings::halfvec(#{dimensions})
|
||||
FROM
|
||||
le_target
|
||||
LIMIT 1
|
||||
)
|
||||
LIMIT 100;
|
||||
SQL
|
||||
rescue PG::Error => e
|
||||
Rails.logger.error(
|
||||
|
@ -384,11 +317,6 @@ module DiscourseAi
|
|||
"#{table_name}_#{id}_#{@strategy.id}_search"
|
||||
end
|
||||
|
||||
def probes_sql(table_name)
|
||||
probes = Discourse.cache.read("#{table_name}-#{id}-#{@strategy.id}-probes")
|
||||
probes.present? ? "SET LOCAL ivfflat.probes TO #{probes};" : ""
|
||||
end
|
||||
|
||||
def name
|
||||
raise NotImplementedError
|
||||
end
|
||||
|
|
|
@ -44,11 +44,3 @@ task "ai:embeddings:backfill", %i[model concurrency] => [:environment] do |_, ar
|
|||
end
|
||||
end
|
||||
end
|
||||
|
||||
desc "Creates indexes for embeddings"
|
||||
task "ai:embeddings:index", [:work_mem] => [:environment] do |_, args|
|
||||
strategy = DiscourseAi::Embeddings::Strategies::Truncation.new
|
||||
vector_rep = DiscourseAi::Embeddings::VectorRepresentations::Base.current_representation(strategy)
|
||||
|
||||
vector_rep.consider_indexing(memory: args[:work_mem] || "100MB")
|
||||
end
|
||||
|
|
Loading…
Reference in New Issue