FIX: ensure replies are never double streamed (#879)

The custom field "discourse_ai_bypass_ai_reply" was added so
we can signal the post created hook to bypass replying even
if it thinks it should.

Otherwise there are cases where we double answer user questions
leading to much confusion.

This also slightly refactors code making the controller smaller
This commit is contained in:
Sam 2024-10-30 20:24:39 +11:00 committed by GitHub
parent be0b78cacd
commit 34a59b623e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 138 additions and 97 deletions

View File

@ -74,33 +74,6 @@ module DiscourseAi
end end
end end
class << self
POOL_SIZE = 10
def thread_pool
@thread_pool ||=
Concurrent::CachedThreadPool.new(min_threads: 0, max_threads: POOL_SIZE, idletime: 30)
end
def schedule_block(&block)
# think about a better way to handle cross thread connections
if Rails.env.test?
block.call
return
end
db = RailsMultisite::ConnectionManagement.current_db
thread_pool.post do
begin
RailsMultisite::ConnectionManagement.with_connection(db) { block.call }
rescue StandardError => e
Discourse.warn_exception(e, message: "Discourse AI: Unable to stream reply")
end
end
end
end
CRLF = "\r\n"
def stream_reply def stream_reply
persona = persona =
AiPersona.find_by(name: params[:persona_name]) || AiPersona.find_by(name: params[:persona_name]) ||
@ -155,6 +128,9 @@ module DiscourseAi
topic_id: topic_id, topic_id: topic_id,
raw: params[:query], raw: params[:query],
skip_validations: true, skip_validations: true,
custom_fields: {
DiscourseAi::AiBot::Playground::BYPASS_AI_REPLY_CUSTOM_FIELD => true,
},
) )
else else
post = post =
@ -165,6 +141,9 @@ module DiscourseAi
archetype: Archetype.private_message, archetype: Archetype.private_message,
target_usernames: "#{user.username},#{persona.user.username}", target_usernames: "#{user.username},#{persona.user.username}",
skip_validations: true, skip_validations: true,
custom_fields: {
DiscourseAi::AiBot::Playground::BYPASS_AI_REPLY_CUSTOM_FIELD => true,
},
) )
topic = post.topic topic = post.topic
@ -175,81 +154,19 @@ module DiscourseAi
user = current_user user = current_user
self.class.queue_streamed_reply(io, persona, user, topic, post) DiscourseAi::AiBot::ResponseHttpStreamer.queue_streamed_reply(
io,
persona,
user,
topic,
post,
)
end end
private private
AI_STREAM_CONVERSATION_UNIQUE_ID = "ai-stream-conversation-unique-id" AI_STREAM_CONVERSATION_UNIQUE_ID = "ai-stream-conversation-unique-id"
# keeping this in a static method so we don't capture ENV and other bits
# this allows us to release memory earlier
def self.queue_streamed_reply(io, persona, user, topic, post)
schedule_block do
begin
io.write "HTTP/1.1 200 OK"
io.write CRLF
io.write "Content-Type: text/plain; charset=utf-8"
io.write CRLF
io.write "Transfer-Encoding: chunked"
io.write CRLF
io.write "Cache-Control: no-cache, no-store, must-revalidate"
io.write CRLF
io.write "Connection: close"
io.write CRLF
io.write "X-Accel-Buffering: no"
io.write CRLF
io.write "X-Content-Type-Options: nosniff"
io.write CRLF
io.write CRLF
io.flush
persona_class =
DiscourseAi::AiBot::Personas::Persona.find_by(id: persona.id, user: user)
bot = DiscourseAi::AiBot::Bot.as(persona.user, persona: persona_class.new)
data =
{ topic_id: topic.id, bot_user_id: persona.user.id, persona_id: persona.id }.to_json +
"\n\n"
io.write data.bytesize.to_s(16)
io.write CRLF
io.write data
io.write CRLF
DiscourseAi::AiBot::Playground
.new(bot)
.reply_to(post) do |partial|
next if partial.length == 0
data = { partial: partial }.to_json + "\n\n"
data.force_encoding("UTF-8")
io.write data.bytesize.to_s(16)
io.write CRLF
io.write data
io.write CRLF
io.flush
end
io.write "0"
io.write CRLF
io.write CRLF
io.flush
io.done
rescue StandardError => e
# make it a tiny bit easier to debug in dev, this is tricky
# multi-threaded code that exhibits various limitations in rails
p e if Rails.env.development?
Discourse.warn_exception(e, message: "Discourse AI: Unable to stream reply")
ensure
io.close
end
end
end
def stage_user def stage_user
unique_id = params[:user_unique_id].to_s unique_id = params[:user_unique_id].to_s
field = UserCustomField.find_by(name: AI_STREAM_CONVERSATION_UNIQUE_ID, value: unique_id) field = UserCustomField.find_by(name: AI_STREAM_CONVERSATION_UNIQUE_ID, value: unique_id)

View File

@ -3,6 +3,8 @@
module DiscourseAi module DiscourseAi
module AiBot module AiBot
class Playground class Playground
BYPASS_AI_REPLY_CUSTOM_FIELD = "discourse_ai_bypass_ai_reply"
attr_reader :bot attr_reader :bot
# An abstraction to manage the bot and topic interactions. # An abstraction to manage the bot and topic interactions.
@ -550,6 +552,7 @@ module DiscourseAi
return false if bot.bot_user.nil? return false if bot.bot_user.nil?
return false if post.topic.private_message? && post.post_type != Post.types[:regular] return false if post.topic.private_message? && post.post_type != Post.types[:regular]
return false if (SiteSetting.ai_bot_allowed_groups_map & post.user.group_ids).blank? return false if (SiteSetting.ai_bot_allowed_groups_map & post.user.group_ids).blank?
return false if post.custom_fields[BYPASS_AI_REPLY_CUSTOM_FIELD].present?
true true
end end

View File

@ -0,0 +1,105 @@
# frozen_string_literal: true
module DiscourseAi
module AiBot
class ResponseHttpStreamer
CRLF = "\r\n"
POOL_SIZE = 10
class << self
def thread_pool
@thread_pool ||=
Concurrent::CachedThreadPool.new(min_threads: 0, max_threads: POOL_SIZE, idletime: 30)
end
def schedule_block(&block)
# think about a better way to handle cross thread connections
if Rails.env.test?
block.call
return
end
db = RailsMultisite::ConnectionManagement.current_db
thread_pool.post do
begin
RailsMultisite::ConnectionManagement.with_connection(db) { block.call }
rescue StandardError => e
Discourse.warn_exception(e, message: "Discourse AI: Unable to stream reply")
end
end
end
# keeping this in a static method so we don't capture ENV and other bits
# this allows us to release memory earlier
def queue_streamed_reply(io, persona, user, topic, post)
schedule_block do
begin
io.write "HTTP/1.1 200 OK"
io.write CRLF
io.write "Content-Type: text/plain; charset=utf-8"
io.write CRLF
io.write "Transfer-Encoding: chunked"
io.write CRLF
io.write "Cache-Control: no-cache, no-store, must-revalidate"
io.write CRLF
io.write "Connection: close"
io.write CRLF
io.write "X-Accel-Buffering: no"
io.write CRLF
io.write "X-Content-Type-Options: nosniff"
io.write CRLF
io.write CRLF
io.flush
persona_class =
DiscourseAi::AiBot::Personas::Persona.find_by(id: persona.id, user: user)
bot = DiscourseAi::AiBot::Bot.as(persona.user, persona: persona_class.new)
data =
{
topic_id: topic.id,
bot_user_id: persona.user.id,
persona_id: persona.id,
}.to_json + "\n\n"
io.write data.bytesize.to_s(16)
io.write CRLF
io.write data
io.write CRLF
DiscourseAi::AiBot::Playground
.new(bot)
.reply_to(post) do |partial|
next if partial.length == 0
data = { partial: partial }.to_json + "\n\n"
data.force_encoding("UTF-8")
io.write data.bytesize.to_s(16)
io.write CRLF
io.write data
io.write CRLF
io.flush
end
io.write "0"
io.write CRLF
io.write CRLF
io.flush
io.done
rescue StandardError => e
# make it a tiny bit easier to debug in dev, this is tricky
# multi-threaded code that exhibits various limitations in rails
p e if Rails.env.development?
Discourse.warn_exception(e, message: "Discourse AI: Unable to stream reply")
ensure
io.close
end
end
end
end
end
end
end

View File

@ -490,13 +490,16 @@ RSpec.describe DiscourseAi::Admin::AiPersonasController do
it "is able to create a new conversation" do it "is able to create a new conversation" do
Jobs.run_immediately! Jobs.run_immediately!
# trust level 0
SiteSetting.ai_bot_allowed_groups = "10"
fake_endpoint.fake_content = ["This is a test! Testing!", "An amazing title"] fake_endpoint.fake_content = ["This is a test! Testing!", "An amazing title"]
ai_persona.create_user! ai_persona.create_user!
ai_persona.update!( ai_persona.update!(
allowed_group_ids: [Group::AUTO_GROUPS[:staff]], allowed_group_ids: [Group::AUTO_GROUPS[:trust_level_0]],
default_llm: "custom:#{llm.id}", default_llm: "custom:#{llm.id}",
allow_personal_messages: true,
) )
io_out, io_in = IO.pipe io_out, io_in = IO.pipe
@ -530,6 +533,7 @@ RSpec.describe DiscourseAi::Admin::AiPersonasController do
expect(topic.topic_allowed_users.count).to eq(2) expect(topic.topic_allowed_users.count).to eq(2)
expect(topic.archetype).to eq(Archetype.private_message) expect(topic.archetype).to eq(Archetype.private_message)
expect(topic.title).to eq("An amazing title") expect(topic.title).to eq("An amazing title")
expect(topic.posts.count).to eq(2)
# now let's try to make a reply with a tool call # now let's try to make a reply with a tool call
function_call = <<~XML function_call = <<~XML
@ -546,6 +550,16 @@ RSpec.describe DiscourseAi::Admin::AiPersonasController do
ai_persona.update!(tools: ["Categories"]) ai_persona.update!(tools: ["Categories"])
# lets also unstage the user and add the user to tl0
# this will ensure there are no feedback loops
new_user = user_post.user
new_user.update!(staged: false)
Group.user_trust_level_change!(new_user.id, new_user.trust_level)
# double check this happened and user is in group
personas = AiPersona.allowed_modalities(user: new_user.reload, allow_personal_messages: true)
expect(personas.count).to eq(1)
io_out, io_in = IO.pipe io_out, io_in = IO.pipe
post "/admin/plugins/discourse-ai/ai-personas/stream-reply.json", post "/admin/plugins/discourse-ai/ai-personas/stream-reply.json",
@ -579,6 +593,8 @@ RSpec.describe DiscourseAi::Admin::AiPersonasController do
expect(user_post.user.custom_fields).to eq( expect(user_post.user.custom_fields).to eq(
{ "ai-stream-conversation-unique-id" => "site:test.com:user_id:1" }, { "ai-stream-conversation-unique-id" => "site:test.com:user_id:1" },
) )
expect(topic.posts.count).to eq(4)
end end
end end
end end