discourse-ai/lib/ai_bot/response_http_streamer.rb

133 lines
4.2 KiB
Ruby

# frozen_string_literal: true
module DiscourseAi
module AiBot
class ResponseHttpStreamer
CRLF = "\r\n"
POOL_SIZE = 10
class << self
def thread_pool
@thread_pool ||=
Concurrent::CachedThreadPool.new(min_threads: 0, max_threads: POOL_SIZE, idletime: 30)
end
def schedule_block(&block)
# think about a better way to handle cross thread connections
if Rails.env.test?
block.call
return
end
db = RailsMultisite::ConnectionManagement.current_db
thread_pool.post do
begin
RailsMultisite::ConnectionManagement.with_connection(db) { block.call }
rescue StandardError => e
Discourse.warn_exception(e, message: "Discourse AI: Unable to stream reply")
end
end
end
# keeping this in a static method so we don't capture ENV and other bits
# this allows us to release memory earlier
def queue_streamed_reply(
io:,
persona:,
user:,
topic:,
query:,
custom_instructions:,
current_user:
)
schedule_block do
begin
post_params = {
raw: query,
skip_validations: true,
custom_fields: {
DiscourseAi::AiBot::Playground::BYPASS_AI_REPLY_CUSTOM_FIELD => true,
},
}
if topic
post_params[:topic_id] = topic.id
else
post_params[:title] = I18n.t("discourse_ai.ai_bot.default_pm_prefix")
post_params[:archetype] = Archetype.private_message
post_params[:target_usernames] = "#{user.username},#{persona.user.username}"
end
post = PostCreator.create!(user, post_params)
topic = post.topic
io.write "HTTP/1.1 200 OK"
io.write CRLF
io.write "Content-Type: text/plain; charset=utf-8"
io.write CRLF
io.write "Transfer-Encoding: chunked"
io.write CRLF
io.write "Cache-Control: no-cache, no-store, must-revalidate"
io.write CRLF
io.write "Connection: close"
io.write CRLF
io.write "X-Accel-Buffering: no"
io.write CRLF
io.write "X-Content-Type-Options: nosniff"
io.write CRLF
io.write CRLF
io.flush
persona_class =
DiscourseAi::AiBot::Personas::Persona.find_by(id: persona.id, user: current_user)
bot = DiscourseAi::AiBot::Bot.as(persona.user, persona: persona_class.new)
data =
{
topic_id: topic.id,
bot_user_id: persona.user.id,
persona_id: persona.id,
}.to_json + "\n\n"
io.write data.bytesize.to_s(16)
io.write CRLF
io.write data
io.write CRLF
DiscourseAi::AiBot::Playground
.new(bot)
.reply_to(post, custom_instructions: custom_instructions) do |partial|
next if partial.length == 0
data = { partial: partial }.to_json + "\n\n"
data.force_encoding("UTF-8")
io.write data.bytesize.to_s(16)
io.write CRLF
io.write data
io.write CRLF
io.flush
end
io.write "0"
io.write CRLF
io.write CRLF
io.flush
io.done if io.respond_to?(:done)
rescue StandardError => e
# make it a tiny bit easier to debug in dev, this is tricky
# multi-threaded code that exhibits various limitations in rails
p e if Rails.env.development? || Rails.env.test?
Discourse.warn_exception(e, message: "Discourse AI: Unable to stream reply")
ensure
io.close
end
end
end
end
end
end
end