PERF: .find_each instead of .find to save us from memory allocation peaks

also Fix embeddings rake task for new db structure
This commit is contained in:
Rafael dos Santos Silva 2023-07-13 18:59:25 -03:00 committed by GitHub
parent 5f0c617880
commit 703762a7a9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 31 additions and 37 deletions

View File

@ -52,7 +52,7 @@ module DiscourseAi
end end
t << "\n\n" t << "\n\n"
topic.posts.each do |post| topic.posts.find_each do |post|
t << post.raw t << post.raw
break if @tokenizer.size(t) >= @max_length break if @tokenizer.size(t) >= @max_length
t << "\n\n" t << "\n\n"

View File

@ -1,33 +1,21 @@
# frozen_string_literal: true # frozen_string_literal: true
desc "Creates tables to store embeddings"
task "ai:embeddings:create_table" => [:environment] do
DiscourseAi::Database::Connection.db.exec(<<~SQL)
CREATE EXTENSION IF NOT EXISTS vector;
SQL
DiscourseAi::Embeddings::Model.enabled_models.each do |model|
DiscourseAi::Database::Connection.db.exec(<<~SQL)
CREATE TABLE IF NOT EXISTS topic_embeddings_#{model.name.underscore} (
topic_id bigint PRIMARY KEY,
embedding vector(#{model.dimensions})
);
SQL
end
end
desc "Backfill embeddings for all topics" desc "Backfill embeddings for all topics"
task "ai:embeddings:backfill", [:start_topic] => [:environment] do |_, args| task "ai:embeddings:backfill", [:start_topic] => [:environment] do |_, args|
public_categories = Category.where(read_restricted: false).pluck(:id) public_categories = Category.where(read_restricted: false).pluck(:id)
topic_embeddings = DiscourseAi::Embeddings::Topic.new manager = DiscourseAi::Embeddings::Manager.new(Topic.first)
Topic Topic
.where("id >= ?", args[:start_topic] || 0) .joins(
"LEFT JOIN #{manager.topic_embeddings_table} ON #{manager.topic_embeddings_table}.topic_id = topics.id",
)
.where("#{manager.topic_embeddings_table}.topic_id IS NULL")
.where("topics.id >= ?", args[:start_topic].to_i || 0)
.where("category_id IN (?)", public_categories) .where("category_id IN (?)", public_categories)
.where(deleted_at: nil) .where(deleted_at: nil)
.order(id: :asc) .order("topics.id ASC")
.find_each do |t| .find_each do |t|
print "." print "."
topic_embeddings.generate_and_store_embeddings_for(t) DiscourseAi::Embeddings::Manager.new(t).generate!
end end
end end
@ -35,24 +23,30 @@ desc "Creates indexes for embeddings"
task "ai:embeddings:index", [:work_mem] => [:environment] do |_, args| task "ai:embeddings:index", [:work_mem] => [:environment] do |_, args|
# Using extension maintainer's recommendation for ivfflat indexes # Using extension maintainer's recommendation for ivfflat indexes
# Results are not as good as without indexes, but it's much faster # Results are not as good as without indexes, but it's much faster
# Disk usage is ~1x the size of the table, so this double table total size # Disk usage is ~1x the size of the table, so this doubles table total size
count = Topic.count count = Topic.count
lists = count < 1_000_000 ? count / 1000 : Math.sqrt(count).to_i lists = count < 1_000_000 ? count / 1000 : Math.sqrt(count).to_i
probes = count < 1_000_000 ? lists / 10 : Math.sqrt(lists).to_i probes = count < 1_000_000 ? lists / 10 : Math.sqrt(lists).to_i
DiscourseAi::Database::Connection.db.exec("SET work_mem TO '#{args[:work_mem] || "1GB"}';") manager = DiscourseAi::Embeddings::Manager.new(Topic.first)
DiscourseAi::Embeddings::Model.enabled_models.each do |model| table = manager.topic_embeddings_table
DiscourseAi::Database::Connection.db.exec(<<~SQL) index = "#{table}_search"
CREATE INDEX IF NOT EXISTS
topic_embeddings_#{model.name.underscore}_search DB.exec("SET work_mem TO '#{args[:work_mem] || "1GB"}';")
ON DB.exec(<<~SQL)
topic_embeddings_#{model.name.underscore} DROP INDEX IF EXISTS #{index};
USING CREATE INDEX IF NOT EXISTS
ivfflat (embedding #{model.pg_index}) #{index}
WITH ON
(lists = #{lists}); #{table}
SQL USING
end ivfflat (embeddings #{manager.model.pg_index_type})
DiscourseAi::Database::Connection.db.exec("RESET work_mem;") WITH
DiscourseAi::Database::Connection.db.exec("SET ivfflat.probes = #{probes};") (lists = #{lists})
WHERE
model_version = #{manager.model.version} AND
strategy_version = #{manager.strategy.version};
SQL
DB.exec("RESET work_mem;")
DB.exec("SET ivfflat.probes = #{probes};")
end end