discourse-ai/lib/sentiment/post_classification.rb
Sam eab6dd3f8e
DEV: re-implement bulk sentiment classifier (#1449)
New implementation uses core concurrent job queue, it is more
robust and predictable than the one shipped in Concurrent.

Additionally:

- Trickles through updates during bulk classification
- Reports errors if we fail during a bulk classification

* push concurrency down to 40. 100 feels quite high.
2025-06-20 16:06:03 +10:00

202 lines
6.0 KiB
Ruby

# frozen_string_literal: true
module DiscourseAi
module Sentiment
class PostClassification
def self.backfill_query(from_post_id: nil, max_age_days: nil)
available_classifier_names =
DiscourseAi::Sentiment::SentimentSiteSettingJsonSchema.values.map { _1.model_name }
queries =
available_classifier_names.map do |classifier_name|
base_query =
Post
.includes(:sentiment_classifications)
.joins("INNER JOIN topics ON topics.id = posts.topic_id")
.where(post_type: Post.types[:regular])
.where.not(topics: { archetype: Archetype.private_message })
.where(posts: { deleted_at: nil })
.where(topics: { deleted_at: nil })
.joins(<<~SQL)
LEFT JOIN classification_results crs
ON crs.target_id = posts.id
AND crs.target_type = 'Post'
AND crs.classification_type = 'sentiment'
AND crs.model_used = '#{classifier_name}'
SQL
.where("crs.id IS NULL")
base_query =
base_query.where("posts.id >= ?", from_post_id.to_i) if from_post_id.present?
if max_age_days.present?
base_query =
base_query.where(
"posts.created_at > current_date - INTERVAL '#{max_age_days.to_i} DAY'",
)
end
base_query
end
unioned_queries = queries.map(&:to_sql).join(" UNION ")
Post.from(Arel.sql("(#{unioned_queries}) as posts"))
end
CONCURRENT_CLASSFICATIONS = 40
def bulk_classify!(relation)
pool =
Scheduler::ThreadPool.new(
min_threads: 0,
max_threads: CONCURRENT_CLASSFICATIONS,
idle_time: 30,
)
available_classifiers = classifiers
return if available_classifiers.blank?
results = Queue.new
queued = 0
relation.each do |record|
text = prepare_text(record)
next if text.blank?
already_classified = record.sentiment_classifications.pluck(&:model_used)
missing_classifiers =
available_classifiers.reject { |ac| already_classified.include?(ac[:model_name]) }
missing_classifiers.each do |classifier|
pool.post do
result = { target: record, classifier: classifier, text: text }
begin
result[:classification] = request_with(classifier[:client], text)
rescue StandardError => e
result[:error] = e
end
results << result
end
queued += 1
end
end
errors = []
while queued > 0
result = results.pop
if result[:error]
errors << result
else
store_classification(
result[:target],
[[result[:classifier][:model_name], result[:classification]]],
)
end
queued -= 1
end
if errors.any?
example_posts = errors.map { |e| e[:target].id }.take(5).join(", ")
Discourse.warn_exception(
errors[0][:error],
"Discourse AI: Errors during bulk classification: Failed to classify #{errors.count} posts (example ids: #{example_posts})",
)
end
ensure
pool.shutdown
pool.wait_for_termination(timeout: 30)
end
def classify!(target)
return if target.blank?
available_classifiers = classifiers
return if available_classifiers.blank?
to_classify = prepare_text(target)
return if to_classify.blank?
already_classified = target.sentiment_classifications.map(&:model_used)
classifiers_for_target =
available_classifiers.reject { |ac| already_classified.include?(ac[:model_name]) }
results =
classifiers_for_target.reduce({}) do |memo, cft|
memo[cft[:model_name]] = request_with(cft[:client], to_classify)
memo
end
store_classification(target, results)
end
def classifiers
DiscourseAi::Sentiment::SentimentSiteSettingJsonSchema.values.map do |config|
api_endpoint = config.endpoint
if api_endpoint.present? && api_endpoint.start_with?("srv://")
service = DiscourseAi::Utils::DnsSrv.lookup(api_endpoint.delete_prefix("srv://"))
api_endpoint = "https://#{service.target}:#{service.port}"
end
{
model_name: config.model_name,
client:
DiscourseAi::Inference::HuggingFaceTextEmbeddings.new(api_endpoint, config.api_key),
}
end
end
def has_classifiers?
classifiers.present?
end
private
def prepare_text(target)
content =
if target.post_number == 1
"#{target.topic.title}\n#{target.raw}"
else
target.raw
end
Tokenizer::BertTokenizer.truncate(content, 512)
end
def request_with(client, content)
result = client.classify_by_sentiment!(content)
transform_result(result)
end
def transform_result(result)
hash_result = {}
result.each { |r| hash_result[r[:label]] = r[:score] }
hash_result
end
def store_classification(target, classification)
attrs =
classification.map do |model_name, classifications|
{
model_used: model_name,
target_id: target.id,
target_type: target.class.sti_name,
classification_type: :sentiment,
classification: classifications,
updated_at: DateTime.now,
created_at: DateTime.now,
}
end
ClassificationResult.upsert_all(
attrs,
unique_by: %i[target_id target_type model_used],
update_only: %i[classification],
)
end
end
end
end