DEV: Move to single table per embeddings type (#561)

Also move us to halfvecs for speed and disk usage gains
This commit is contained in:
Rafael dos Santos Silva 2024-08-08 11:55:20 -03:00 committed by GitHub
parent 20efc9285e
commit 1686a8a683
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 276 additions and 38 deletions

View File

@ -0,0 +1,17 @@
# frozen_string_literal: true
class UpgradePgvector070 < ActiveRecord::Migration[7.0]
def up
minimum_target_version = "0.7.0"
installed_version =
DB.query_single("SELECT extversion FROM pg_extension WHERE extname = 'vector';").first
if Gem::Version.new(installed_version) < Gem::Version.new(minimum_target_version)
DB.exec("ALTER EXTENSION vector UPDATE TO '0.7.0';")
end
end
def down
raise ActiveRecord::IrreversibleMigration
end
end

View File

@ -0,0 +1,158 @@
# frozen_string_literal: true
class MoveEmbeddingsToSingleTablePerType < ActiveRecord::Migration[7.0]
def up
create_table :ai_topic_embeddings, id: false do |t|
t.integer :topic_id, null: false
t.integer :model_id, null: false
t.integer :model_version, null: false
t.integer :strategy_id, null: false
t.integer :strategy_version, null: false
t.text :digest, null: false
t.column :embeddings, "halfvec", null: false
t.timestamps
t.index %i[model_id strategy_id topic_id],
unique: true,
name: "index_ai_topic_embeddings_on_model_strategy_topic"
end
create_table :ai_post_embeddings, id: false do |t|
t.integer :post_id, null: false
t.integer :model_id, null: false
t.integer :model_version, null: false
t.integer :strategy_id, null: false
t.integer :strategy_version, null: false
t.text :digest, null: false
t.column :embeddings, "halfvec", null: false
t.timestamps
t.index %i[model_id strategy_id post_id],
unique: true,
name: "index_ai_post_embeddings_on_model_strategy_post"
end
create_table :ai_document_fragment_embeddings, id: false do |t|
t.integer :rag_document_fragment_id, null: false
t.integer :model_id, null: false
t.integer :model_version, null: false
t.integer :strategy_id, null: false
t.integer :strategy_version, null: false
t.text :digest, null: false
t.column :embeddings, "halfvec", null: false
t.timestamps
t.index %i[model_id strategy_id rag_document_fragment_id],
unique: true,
name: "index_ai_fragment_embeddings_on_model_strategy_fragment"
end
# Copy data from old tables to new tables
execute <<-SQL
INSERT INTO ai_topic_embeddings (topic_id, model_id, model_version, strategy_id, strategy_version, digest, embeddings, created_at, updated_at)
SELECT topic_id, 1, model_version, 1, strategy_version, digest, embeddings, created_at, updated_at
FROM ai_topic_embeddings_1_1;
INSERT INTO ai_topic_embeddings (topic_id, model_id, model_version, strategy_id, strategy_version, digest, embeddings, created_at, updated_at)
SELECT topic_id, 2, model_version, 1, strategy_version, digest, embeddings, created_at, updated_at
FROM ai_topic_embeddings_2_1;
INSERT INTO ai_topic_embeddings (topic_id, model_id, model_version, strategy_id, strategy_version, digest, embeddings, created_at, updated_at)
SELECT topic_id, 3, model_version, 1, strategy_version, digest, embeddings, created_at, updated_at
FROM ai_topic_embeddings_3_1;
INSERT INTO ai_topic_embeddings (topic_id, model_id, model_version, strategy_id, strategy_version, digest, embeddings, created_at, updated_at)
SELECT topic_id, 4, model_version, 1, strategy_version, digest, embeddings, created_at, updated_at
FROM ai_topic_embeddings_4_1;
INSERT INTO ai_topic_embeddings (topic_id, model_id, model_version, strategy_id, strategy_version, digest, embeddings, created_at, updated_at)
SELECT topic_id, 5, model_version, 1, strategy_version, digest, embeddings, created_at, updated_at
FROM ai_topic_embeddings_5_1;
INSERT INTO ai_topic_embeddings (topic_id, model_id, model_version, strategy_id, strategy_version, digest, embeddings, created_at, updated_at)
SELECT topic_id, 6, model_version, 1, strategy_version, digest, embeddings, created_at, updated_at
FROM ai_topic_embeddings_6_1;
INSERT INTO ai_topic_embeddings (topic_id, model_id, model_version, strategy_id, strategy_version, digest, embeddings, created_at, updated_at)
SELECT topic_id, 7, model_version, 1, strategy_version, digest, embeddings, created_at, updated_at
FROM ai_topic_embeddings_7_1;
INSERT INTO ai_topic_embeddings (topic_id, model_id, model_version, strategy_id, strategy_version, digest, embeddings, created_at, updated_at)
SELECT topic_id, 8, model_version, 1, strategy_version, digest, embeddings, created_at, updated_at
FROM ai_topic_embeddings_8_1;
INSERT INTO ai_post_embeddings (post_id, model_id, model_version, strategy_id, strategy_version, digest, embeddings, created_at, updated_at)
SELECT post_id, 1, model_version, 1, strategy_version, digest, embeddings, created_at, updated_at
FROM ai_post_embeddings_1_1;
INSERT INTO ai_post_embeddings (post_id, model_id, model_version, strategy_id, strategy_version, digest, embeddings, created_at, updated_at)
SELECT post_id, 2, model_version, 1, strategy_version, digest, embeddings, created_at, updated_at
FROM ai_post_embeddings_2_1;
INSERT INTO ai_post_embeddings (post_id, model_id, model_version, strategy_id, strategy_version, digest, embeddings, created_at, updated_at)
SELECT post_id, 3, model_version, 1, strategy_version, digest, embeddings, created_at, updated_at
FROM ai_post_embeddings_3_1;
INSERT INTO ai_post_embeddings (post_id, model_id, model_version, strategy_id, strategy_version, digest, embeddings, created_at, updated_at)
SELECT post_id, 4, model_version, 1, strategy_version, digest, embeddings, created_at, updated_at
FROM ai_post_embeddings_4_1;
INSERT INTO ai_post_embeddings (post_id, model_id, model_version, strategy_id, strategy_version, digest, embeddings, created_at, updated_at)
SELECT post_id, 5, model_version, 1, strategy_version, digest, embeddings, created_at, updated_at
FROM ai_post_embeddings_5_1;
INSERT INTO ai_post_embeddings (post_id, model_id, model_version, strategy_id, strategy_version, digest, embeddings, created_at, updated_at)
SELECT post_id, 6, model_version, 1, strategy_version, digest, embeddings, created_at, updated_at
FROM ai_post_embeddings_6_1;
INSERT INTO ai_post_embeddings (post_id, model_id, model_version, strategy_id, strategy_version, digest, embeddings, created_at, updated_at)
SELECT post_id, 7, model_version, 1, strategy_version, digest, embeddings, created_at, updated_at
FROM ai_post_embeddings_7_1;
INSERT INTO ai_post_embeddings (post_id, model_id, model_version, strategy_id, strategy_version, digest, embeddings, created_at, updated_at)
SELECT post_id, 8, model_version, 1, strategy_version, digest, embeddings, created_at, updated_at
FROM ai_post_embeddings_8_1;
INSERT INTO ai_document_fragment_embeddings (rag_document_fragment_id, model_id, model_version, strategy_id, strategy_version, digest, embeddings, created_at, updated_at)
SELECT rag_document_fragment_id, 1, model_version, 1, strategy_version, digest, embeddings, created_at, updated_at
FROM ai_document_fragment_embeddings_1_1;
INSERT INTO ai_document_fragment_embeddings (rag_document_fragment_id, model_id, model_version, strategy_id, strategy_version, digest, embeddings, created_at, updated_at)
SELECT rag_document_fragment_id, 2, model_version, 1, strategy_version, digest, embeddings, created_at, updated_at
FROM ai_document_fragment_embeddings_2_1;
INSERT INTO ai_document_fragment_embeddings (rag_document_fragment_id, model_id, model_version, strategy_id, strategy_version, digest, embeddings, created_at, updated_at)
SELECT rag_document_fragment_id, 3, model_version, 1, strategy_version, digest, embeddings, created_at, updated_at
FROM ai_document_fragment_embeddings_3_1;
INSERT INTO ai_document_fragment_embeddings (rag_document_fragment_id, model_id, model_version, strategy_id, strategy_version, digest, embeddings, created_at, updated_at)
SELECT rag_document_fragment_id, 4, model_version, 1, strategy_version, digest, embeddings, created_at, updated_at
FROM ai_document_fragment_embeddings_4_1;
INSERT INTO ai_document_fragment_embeddings (rag_document_fragment_id, model_id, model_version, strategy_id, strategy_version, digest, embeddings, created_at, updated_at)
SELECT rag_document_fragment_id, 5, model_version, 1, strategy_version, digest, embeddings, created_at, updated_at
FROM ai_document_fragment_embeddings_5_1;
INSERT INTO ai_document_fragment_embeddings (rag_document_fragment_id, model_id, model_version, strategy_id, strategy_version, digest, embeddings, created_at, updated_at)
SELECT rag_document_fragment_id, 6, model_version, 1, strategy_version, digest, embeddings, created_at, updated_at
FROM ai_document_fragment_embeddings_6_1;
INSERT INTO ai_document_fragment_embeddings (rag_document_fragment_id, model_id, model_version, strategy_id, strategy_version, digest, embeddings, created_at, updated_at)
SELECT rag_document_fragment_id, 7, model_version, 1, strategy_version, digest, embeddings, created_at, updated_at
FROM ai_document_fragment_embeddings_7_1;
INSERT INTO ai_document_fragment_embeddings (rag_document_fragment_id, model_id, model_version, strategy_id, strategy_version, digest, embeddings, created_at, updated_at)
SELECT rag_document_fragment_id, 8, model_version, 1, strategy_version, digest, embeddings, created_at, updated_at
FROM ai_document_fragment_embeddings_8_1;
SQL
begin
strategy = DiscourseAi::Embeddings::Strategies::Truncation.new
vector_rep =
DiscourseAi::Embeddings::VectorRepresentations::Base.current_representation(strategy)
vector_rep.consider_indexing
rescue StandardError => e
Rails.logger.error("Failed to index embeddings: #{e}")
end
end
end

View File

@ -0,0 +1,30 @@
# frozen_string_literal: true
class DropOldEmbeddingsTables < ActiveRecord::Migration[7.0]
def up
drop_table :ai_topic_embeddings_1_1
drop_table :ai_topic_embeddings_2_1
drop_table :ai_topic_embeddings_3_1
drop_table :ai_topic_embeddings_4_1
drop_table :ai_topic_embeddings_5_1
drop_table :ai_topic_embeddings_6_1
drop_table :ai_topic_embeddings_7_1
drop_table :ai_topic_embeddings_8_1
drop_table :ai_post_embeddings_1_1
drop_table :ai_post_embeddings_2_1
drop_table :ai_post_embeddings_3_1
drop_table :ai_post_embeddings_4_1
drop_table :ai_post_embeddings_5_1
drop_table :ai_post_embeddings_6_1
drop_table :ai_post_embeddings_7_1
drop_table :ai_post_embeddings_8_1
drop_table :ai_document_fragment_embeddings_1_1
drop_table :ai_document_fragment_embeddings_2_1
drop_table :ai_document_fragment_embeddings_3_1
drop_table :ai_document_fragment_embeddings_4_1
drop_table :ai_document_fragment_embeddings_5_1
drop_table :ai_document_fragment_embeddings_6_1
drop_table :ai_document_fragment_embeddings_7_1
drop_table :ai_document_fragment_embeddings_8_1
end
end

View File

@ -53,7 +53,7 @@ module DiscourseAi
end
def pg_index_type
"vector_ip_ops"
"halfvec_ip_ops"
end
def tokenizer

View File

@ -52,10 +52,13 @@ module DiscourseAi
# Using extension maintainer's recommendation for ivfflat indexes
# Results are not as good as without indexes, but it's much faster
# Disk usage is ~1x the size of the table, so this doubles table total size
count = DB.query_single("SELECT count(*) FROM #{table_name};").first
count =
DB.query_single(
"SELECT count(*) FROM #{table_name} WHERE model_id = #{id} AND strategy_id = #{@strategy.id};",
).first
lists = [count < 1_000_000 ? count / 1000 : Math.sqrt(count).to_i, 10].max
probes = [count < 1_000_000 ? lists / 10 : Math.sqrt(lists).to_i, 1].max
Discourse.cache.write("#{table_name}-probes", probes)
Discourse.cache.write("#{table_name}-#{id}-#{@strategy.id}-probes", probes)
existing_index = DB.query_single(<<~SQL, index_name: index_name).first
SELECT
@ -83,7 +86,7 @@ module DiscourseAi
.to_i || 0
new_rows =
DB.query_single(
"SELECT count(*) FROM #{table_name} WHERE created_at > '#{Time.at(existing_index_age)}';",
"SELECT count(*) FROM #{table_name} WHERE model_id = #{id} AND strategy_id = #{@strategy.id} AND created_at > '#{Time.at(existing_index_age)}';",
).first
existing_lists = existing_index.match(/lists='(\d+)'/)&.captures&.first&.to_i
@ -128,9 +131,11 @@ module DiscourseAi
ON
#{table_name}
USING
ivfflat (embeddings #{pg_index_type})
ivfflat ((embeddings::halfvec(#{dimensions})) #{pg_index_type})
WITH
(lists = #{lists});
(lists = #{lists})
WHERE
model_id = #{id} AND strategy_id = #{@strategy.id};
SQL
rescue PG::ProgramLimitExceeded => e
parsed_error = e.message.match(/memory required is (\d+ [A-Z]{2}), ([a-z_]+)/)
@ -175,6 +180,8 @@ module DiscourseAi
FROM
#{table_name(target)}
WHERE
model_id = #{id} AND
strategy_id = #{@strategy.id} AND
#{target_column} = :target_id
LIMIT 1
SQL
@ -191,8 +198,11 @@ module DiscourseAi
topic_id
FROM
#{topic_table_name}
WHERE
model_id = #{id} AND
strategy_id = #{@strategy.id}
ORDER BY
embeddings #{pg_function} '[:query_embedding]'
embeddings::halfvec(#{dimensions}) #{pg_function} '[:query_embedding]'::halfvec(#{dimensions})
LIMIT 1
SQL
end
@ -203,8 +213,11 @@ module DiscourseAi
post_id
FROM
#{post_table_name}
WHERE
model_id = #{id} AND
strategy_id = #{@strategy.id}
ORDER BY
embeddings #{pg_function} '[:query_embedding]'
embeddings::halfvec(#{dimensions}) #{pg_function} '[:query_embedding]'::halfvec(#{dimensions})
LIMIT 1
SQL
end
@ -214,11 +227,13 @@ module DiscourseAi
#{probes_sql(topic_table_name)}
SELECT
topic_id,
embeddings #{pg_function} '[:query_embedding]' AS distance
embeddings::halfvec(#{dimensions}) #{pg_function} '[:query_embedding]'::halfvec(#{dimensions}) AS distance
FROM
#{topic_table_name}
WHERE
model_id = #{id} AND strategy_id = #{@strategy.id}
ORDER BY
embeddings #{pg_function} '[:query_embedding]'
embeddings::halfvec(#{dimensions}) #{pg_function} '[:query_embedding]'::halfvec(#{dimensions})
LIMIT :limit
OFFSET :offset
SQL
@ -238,15 +253,17 @@ module DiscourseAi
#{probes_sql(post_table_name)}
SELECT
post_id,
embeddings #{pg_function} '[:query_embedding]' AS distance
embeddings::halfvec(#{dimensions}) #{pg_function} '[:query_embedding]'::halfvec(#{dimensions}) AS distance
FROM
#{post_table_name}
INNER JOIN
posts AS p ON p.id = post_id
INNER JOIN
topics AS t ON t.id = p.topic_id AND t.archetype = 'regular'
WHERE
model_id = #{id} AND strategy_id = #{@strategy.id}
ORDER BY
embeddings #{pg_function} '[:query_embedding]'
embeddings::halfvec(#{dimensions}) #{pg_function} '[:query_embedding]'::halfvec(#{dimensions})
LIMIT :limit
OFFSET :offset
SQL
@ -274,15 +291,17 @@ module DiscourseAi
#{probes_sql(post_table_name)}
SELECT
rag_document_fragment_id,
embeddings #{pg_function} '[:query_embedding]' AS distance
embeddings::halfvec(#{dimensions}) #{pg_function} '[:query_embedding]'::halfvec(#{dimensions}) AS distance
FROM
#{rag_fragments_table_name}
INNER JOIN
rag_document_fragments AS rdf ON rdf.id = rag_document_fragment_id
WHERE
model_id = #{id} AND
strategy_id = #{@strategy.id} AND
rdf.ai_persona_id = :persona_id
ORDER BY
embeddings #{pg_function} '[:query_embedding]'
embeddings::halfvec(#{dimensions}) #{pg_function} '[:query_embedding]'::halfvec(#{dimensions})
LIMIT :limit
OFFSET :offset
SQL
@ -309,16 +328,21 @@ module DiscourseAi
topic_id
FROM
#{topic_table_name}
WHERE
model_id = #{id} AND
strategy_id = #{@strategy.id}
ORDER BY
embeddings #{pg_function} (
embeddings::halfvec(#{dimensions}) #{pg_function} (
SELECT
embeddings
FROM
#{topic_table_name}
WHERE
model_id = #{id} AND
strategy_id = #{@strategy.id} AND
topic_id = :topic_id
LIMIT 1
)
)::halfvec(#{dimensions})
LIMIT 100
SQL
rescue PG::Error => e
@ -329,15 +353,15 @@ module DiscourseAi
end
def topic_table_name
"ai_topic_embeddings_#{id}_#{@strategy.id}"
"ai_topic_embeddings"
end
def post_table_name
"ai_post_embeddings_#{id}_#{@strategy.id}"
"ai_post_embeddings"
end
def rag_fragments_table_name
"ai_document_fragment_embeddings_#{id}_#{@strategy.id}"
"ai_document_fragment_embeddings"
end
def table_name(target)
@ -354,11 +378,11 @@ module DiscourseAi
end
def index_name(table_name)
"#{table_name}_search"
"#{table_name}_#{id}_#{@strategy.id}_search"
end
def probes_sql(table_name)
probes = Discourse.cache.read("#{table_name}-probes")
probes = Discourse.cache.read("#{table_name}-#{id}-#{@strategy.id}-probes")
probes.present? ? "SET LOCAL ivfflat.probes TO #{probes};" : ""
end
@ -400,9 +424,9 @@ module DiscourseAi
if target.is_a?(Topic)
DB.exec(
<<~SQL,
INSERT INTO #{topic_table_name} (topic_id, model_version, strategy_version, digest, embeddings, created_at, updated_at)
VALUES (:topic_id, :model_version, :strategy_version, :digest, '[:embeddings]', CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
ON CONFLICT (topic_id)
INSERT INTO #{topic_table_name} (topic_id, model_id, model_version, strategy_id, strategy_version, digest, embeddings, created_at, updated_at)
VALUES (:topic_id, :model_id, :model_version, :strategy_id, :strategy_version, :digest, '[:embeddings]', CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
ON CONFLICT (strategy_id, model_id, topic_id)
DO UPDATE SET
model_version = :model_version,
strategy_version = :strategy_version,
@ -411,7 +435,9 @@ module DiscourseAi
updated_at = CURRENT_TIMESTAMP
SQL
topic_id: target.id,
model_id: id,
model_version: version,
strategy_id: @strategy.id,
strategy_version: @strategy.version,
digest: digest,
embeddings: vector,
@ -419,9 +445,9 @@ module DiscourseAi
elsif target.is_a?(Post)
DB.exec(
<<~SQL,
INSERT INTO #{post_table_name} (post_id, model_version, strategy_version, digest, embeddings, created_at, updated_at)
VALUES (:post_id, :model_version, :strategy_version, :digest, '[:embeddings]', CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
ON CONFLICT (post_id)
INSERT INTO #{post_table_name} (post_id, model_id, model_version, strategy_id, strategy_version, digest, embeddings, created_at, updated_at)
VALUES (:post_id, :model_id, :model_version, :strategy_id, :strategy_version, :digest, '[:embeddings]', CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
ON CONFLICT (model_id, strategy_id, post_id)
DO UPDATE SET
model_version = :model_version,
strategy_version = :strategy_version,
@ -430,7 +456,9 @@ module DiscourseAi
updated_at = CURRENT_TIMESTAMP
SQL
post_id: target.id,
model_id: id,
model_version: version,
strategy_id: @strategy.id,
strategy_version: @strategy.version,
digest: digest,
embeddings: vector,
@ -438,9 +466,9 @@ module DiscourseAi
elsif target.is_a?(RagDocumentFragment)
DB.exec(
<<~SQL,
INSERT INTO #{rag_fragments_table_name} (rag_document_fragment_id, model_version, strategy_version, digest, embeddings, created_at, updated_at)
VALUES (:fragment_id, :model_version, :strategy_version, :digest, '[:embeddings]', CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
ON CONFLICT (rag_document_fragment_id)
INSERT INTO #{rag_fragments_table_name} (rag_document_fragment_id, model_id, model_version, strategy_id, strategy_version, digest, embeddings, created_at, updated_at)
VALUES (:fragment_id, :model_id, :model_version, :strategy_id, :strategy_version, :digest, '[:embeddings]', CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
ON CONFLICT (model_id, strategy_id, rag_document_fragment_id)
DO UPDATE SET
model_version = :model_version,
strategy_version = :strategy_version,
@ -449,7 +477,9 @@ module DiscourseAi
updated_at = CURRENT_TIMESTAMP
SQL
fragment_id: target.id,
model_id: id,
model_version: version,
strategy_id: @strategy.id,
strategy_version: @strategy.version,
digest: digest,
embeddings: vector,

View File

@ -78,7 +78,7 @@ module DiscourseAi
end
def pg_index_type
"vector_ip_ops"
"halfvec_ip_ops"
end
def tokenizer

View File

@ -44,7 +44,7 @@ module DiscourseAi
end
def pg_index_type
"vector_ip_ops"
"halfvec_ip_ops"
end
def tokenizer

View File

@ -39,7 +39,7 @@ module DiscourseAi
end
def pg_index_type
"vector_cosine_ops"
"halfvec_cosine_ops"
end
def vector_from(text, asymetric: false)

View File

@ -65,7 +65,7 @@ module DiscourseAi
end
def pg_index_type
"vector_cosine_ops"
"halfvec_cosine_ops"
end
def tokenizer

View File

@ -41,7 +41,7 @@ module DiscourseAi
end
def pg_index_type
"vector_cosine_ops"
"halfvec_cosine_ops"
end
def vector_from(text, asymetric: false)

View File

@ -39,7 +39,7 @@ module DiscourseAi
end
def pg_index_type
"vector_cosine_ops"
"halfvec_cosine_ops"
end
def vector_from(text, asymetric: false)

View File

@ -39,7 +39,7 @@ module DiscourseAi
end
def pg_index_type
"vector_cosine_ops"
"halfvec_cosine_ops"
end
def vector_from(text, asymetric: false)

View File

@ -339,7 +339,10 @@ RSpec.describe DiscourseAi::AiBot::Personas::Persona do
fab!(:llm_model) { Fabricate(:fake_model) }
it "will run the question consolidator" do
context_embedding = [0.049382, 0.9999]
strategy = DiscourseAi::Embeddings::Strategies::Truncation.new
vector_rep =
DiscourseAi::Embeddings::VectorRepresentations::Base.current_representation(strategy)
context_embedding = vector_rep.dimensions.times.map { rand(-1.0...1.0) }
EmbeddingsGenerationStubs.discourse_service(
SiteSetting.ai_embeddings_model,
consolidated_question,