mirror of
https://github.com/discourse/discourse-ai.git
synced 2025-06-30 11:32:18 +00:00
This commit introduces a new Forum Researcher persona specialized in deep forum content analysis along with comprehensive improvements to our AI infrastructure. Key additions: New Forum Researcher persona with advanced filtering and analysis capabilities Robust filtering system supporting tags, categories, dates, users, and keywords LLM formatter to efficiently process and chunk research results Infrastructure improvements: Implemented CancelManager class to centrally manage AI completion cancellations Replaced callback-based cancellation with a more robust pattern Added systematic cancellation monitoring with callbacks Other improvements: Added configurable default_enabled flag to control which personas are enabled by default Updated translation strings for the new researcher functionality Added comprehensive specs for the new components Renames Researcher -> Web Researcher This change makes our AI platform more stable while adding powerful research capabilities that can analyze forum trends and surface relevant content.
130 lines
3.2 KiB
Ruby
130 lines
3.2 KiB
Ruby
# frozen_string_literal: true
|
|
#
|
|
# Chat streaming APIs are a bit slow, this ensures we properly buffer results
|
|
# and stream as quickly as possible.
|
|
|
|
module DiscourseAi
|
|
module AiBot
|
|
class ChatStreamer
|
|
attr_reader :reply,
|
|
:guardian,
|
|
:thread_id,
|
|
:force_thread,
|
|
:in_reply_to_id,
|
|
:channel,
|
|
:cancel_manager
|
|
|
|
def initialize(
|
|
message:,
|
|
channel:,
|
|
guardian:,
|
|
thread_id:,
|
|
in_reply_to_id:,
|
|
force_thread:,
|
|
cancel_manager: nil
|
|
)
|
|
@message = message
|
|
@channel = channel
|
|
@guardian = guardian
|
|
@thread_id = thread_id
|
|
@force_thread = force_thread
|
|
@in_reply_to_id = in_reply_to_id
|
|
|
|
@queue = Queue.new
|
|
|
|
db = RailsMultisite::ConnectionManagement.current_db
|
|
@worker_thread =
|
|
Thread.new { RailsMultisite::ConnectionManagement.with_connection(db) { run } }
|
|
|
|
@client_id =
|
|
ChatSDK::Channel.start_reply(
|
|
channel_id: message.chat_channel_id,
|
|
guardian: guardian,
|
|
thread_id: thread_id,
|
|
)
|
|
|
|
@cancel_manager = cancel_manager
|
|
end
|
|
|
|
def <<(partial)
|
|
return if partial.to_s.empty?
|
|
# we throw away leading spaces prior to message creation for now
|
|
# by design
|
|
return if partial.to_s.blank? && !@reply
|
|
|
|
if @client_id
|
|
ChatSDK::Channel.stop_reply(
|
|
channel_id: @message.chat_channel_id,
|
|
client_id: @client_id,
|
|
guardian: @guardian,
|
|
thread_id: @thread_id,
|
|
)
|
|
@client_id = nil
|
|
end
|
|
|
|
if @reply
|
|
@queue << partial
|
|
else
|
|
create_reply(partial)
|
|
end
|
|
end
|
|
|
|
def create_reply(message)
|
|
@reply =
|
|
ChatSDK::Message.create(
|
|
raw: message,
|
|
channel_id: channel.id,
|
|
thread_id: thread_id,
|
|
guardian: guardian,
|
|
force_thread: force_thread,
|
|
in_reply_to_id: in_reply_to_id,
|
|
enforce_membership: !channel.direct_message_channel?,
|
|
)
|
|
|
|
ChatSDK::Message.start_stream(message_id: @reply.id, guardian: @guardian)
|
|
|
|
if trailing = message.scan(/\s*\z/).first
|
|
@queue << trailing
|
|
end
|
|
end
|
|
|
|
def done
|
|
@queue << :done
|
|
@worker_thread.join
|
|
ChatSDK::Message.stop_stream(message_id: @reply.id, guardian: @guardian) if @reply
|
|
@reply
|
|
end
|
|
|
|
private
|
|
|
|
def run
|
|
done = false
|
|
while !done
|
|
buffer = +""
|
|
popped = @queue.pop
|
|
break if popped == :done
|
|
|
|
buffer << popped
|
|
|
|
begin
|
|
while true
|
|
popped = @queue.pop(true)
|
|
if popped == :done
|
|
done = true
|
|
break
|
|
end
|
|
buffer << popped
|
|
end
|
|
rescue ThreadError
|
|
end
|
|
|
|
streaming = ChatSDK::Message.stream(message_id: reply.id, raw: buffer, guardian: guardian)
|
|
if !streaming
|
|
@cancel_manager.cancel! if @cancel_manager
|
|
end
|
|
end
|
|
end
|
|
end
|
|
end
|
|
end
|