DEV: re-implement bulk sentiment classifier (#1449)

New implementation uses core concurrent job queue, it is more
robust and predictable than the one shipped in Concurrent.

Additionally:

- Trickles through updates during bulk classification
- Reports errors if we fail during a bulk classification

* push concurrency down to 40. 100 feels quite high.
This commit is contained in:
Sam 2025-06-20 16:06:03 +10:00 committed by GitHub
parent baaa3d199a
commit eab6dd3f8e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -44,57 +44,69 @@ module DiscourseAi
Post.from(Arel.sql("(#{unioned_queries}) as posts")) Post.from(Arel.sql("(#{unioned_queries}) as posts"))
end end
CONCURRENT_CLASSFICATIONS = 40
def bulk_classify!(relation) def bulk_classify!(relation)
http_pool_size = 100
pool = pool =
Concurrent::CachedThreadPool.new( Scheduler::ThreadPool.new(
min_threads: 0, min_threads: 0,
max_threads: http_pool_size, max_threads: CONCURRENT_CLASSFICATIONS,
idletime: 30, idle_time: 30,
) )
available_classifiers = classifiers available_classifiers = classifiers
return if available_classifiers.blank? return if available_classifiers.blank?
promised_classifications = results = Queue.new
relation queued = 0
.map do |record|
text = prepare_text(record)
next if text.blank?
Concurrent::Promises relation.each do |record|
.fulfilled_future({ target: record, text: text }, pool) text = prepare_text(record)
.then_on(pool) do |w_text| next if text.blank?
results = Concurrent::Hash.new
already_classified = w_text[:target].sentiment_classifications.map(&:model_used)
classifiers_for_target = already_classified = record.sentiment_classifications.pluck(&:model_used)
available_classifiers.reject do |ac| missing_classifiers =
already_classified.include?(ac[:model_name]) available_classifiers.reject { |ac| already_classified.include?(ac[:model_name]) }
end
promised_target_results = missing_classifiers.each do |classifier|
classifiers_for_target.map do |cft| pool.post do
Concurrent::Promises.future_on(pool) do result = { target: record, classifier: classifier, text: text }
results[cft[:model_name]] = request_with(cft[:client], w_text[:text]) begin
end result[:classification] = request_with(classifier[:client], text)
end rescue StandardError => e
result[:error] = e
Concurrent::Promises end
.zip(*promised_target_results) results << result
.then_on(pool) { |_| w_text.merge(classification: results) }
end
.flat(1)
end end
.compact queued += 1
end
end
Concurrent::Promises errors = []
.zip(*promised_classifications)
.value! while queued > 0
.each { |r| store_classification(r[:target], r[:classification]) } result = results.pop
if result[:error]
errors << result
else
store_classification(
result[:target],
[[result[:classifier][:model_name], result[:classification]]],
)
end
queued -= 1
end
if errors.any?
example_posts = errors.map { |e| e[:target].id }.take(5).join(", ")
Discourse.warn_exception(
errors[0][:error],
"Discourse AI: Errors during bulk classification: Failed to classify #{errors.count} posts (example ids: #{example_posts})",
)
end
ensure ensure
pool.shutdown pool.shutdown
pool.wait_for_termination pool.wait_for_termination(timeout: 30)
end end
def classify!(target) def classify!(target)