discourse-ai/lib/ai_bot/chat_streamer.rb
Sam c34fcc8a95
FEATURE: forum researcher persona for deep research (#1313)
This commit introduces a new Forum Researcher persona specialized in deep forum content analysis along with comprehensive improvements to our AI infrastructure.

Key additions:

    New Forum Researcher persona with advanced filtering and analysis capabilities
    Robust filtering system supporting tags, categories, dates, users, and keywords
    LLM formatter to efficiently process and chunk research results

Infrastructure improvements:

    Implemented CancelManager class to centrally manage AI completion cancellations
    Replaced callback-based cancellation with a more robust pattern
    Added systematic cancellation monitoring with callbacks

Other improvements:

    Added configurable default_enabled flag to control which personas are enabled by default
    Updated translation strings for the new researcher functionality
    Added comprehensive specs for the new components

    Renames Researcher -> Web Researcher

This change makes our AI platform more stable while adding powerful research capabilities that can analyze forum trends and surface relevant content.
2025-05-14 12:36:16 +10:00

130 lines
3.2 KiB
Ruby

# frozen_string_literal: true
#
# Chat streaming APIs are a bit slow, this ensures we properly buffer results
# and stream as quickly as possible.
module DiscourseAi
module AiBot
class ChatStreamer
attr_reader :reply,
:guardian,
:thread_id,
:force_thread,
:in_reply_to_id,
:channel,
:cancel_manager
def initialize(
message:,
channel:,
guardian:,
thread_id:,
in_reply_to_id:,
force_thread:,
cancel_manager: nil
)
@message = message
@channel = channel
@guardian = guardian
@thread_id = thread_id
@force_thread = force_thread
@in_reply_to_id = in_reply_to_id
@queue = Queue.new
db = RailsMultisite::ConnectionManagement.current_db
@worker_thread =
Thread.new { RailsMultisite::ConnectionManagement.with_connection(db) { run } }
@client_id =
ChatSDK::Channel.start_reply(
channel_id: message.chat_channel_id,
guardian: guardian,
thread_id: thread_id,
)
@cancel_manager = cancel_manager
end
def <<(partial)
return if partial.to_s.empty?
# we throw away leading spaces prior to message creation for now
# by design
return if partial.to_s.blank? && !@reply
if @client_id
ChatSDK::Channel.stop_reply(
channel_id: @message.chat_channel_id,
client_id: @client_id,
guardian: @guardian,
thread_id: @thread_id,
)
@client_id = nil
end
if @reply
@queue << partial
else
create_reply(partial)
end
end
def create_reply(message)
@reply =
ChatSDK::Message.create(
raw: message,
channel_id: channel.id,
thread_id: thread_id,
guardian: guardian,
force_thread: force_thread,
in_reply_to_id: in_reply_to_id,
enforce_membership: !channel.direct_message_channel?,
)
ChatSDK::Message.start_stream(message_id: @reply.id, guardian: @guardian)
if trailing = message.scan(/\s*\z/).first
@queue << trailing
end
end
def done
@queue << :done
@worker_thread.join
ChatSDK::Message.stop_stream(message_id: @reply.id, guardian: @guardian) if @reply
@reply
end
private
def run
done = false
while !done
buffer = +""
popped = @queue.pop
break if popped == :done
buffer << popped
begin
while true
popped = @queue.pop(true)
if popped == :done
done = true
break
end
buffer << popped
end
rescue ThreadError
end
streaming = ChatSDK::Message.stream(message_id: reply.id, raw: buffer, guardian: guardian)
if !streaming
@cancel_manager.cancel! if @cancel_manager
end
end
end
end
end
end