FIX: under certain conditions we would get duplicate data from llm (#373)

Previously endpoint/base would `+=` decoded_chunk to leftover

This could lead to cases where the leftover buffer had duplicate
previously processed data

Fix ensures we properly skip previously decoded data.
This commit is contained in:
Sam 2023-12-21 04:28:05 +11:00 committed by GitHub
parent 09238d33fa
commit af2e692761
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 145 additions and 16 deletions

View File

@ -90,30 +90,41 @@ module DiscourseAi
leftover = ""
function_buffer = build_buffer # Nokogiri document
prev_processed_partials = 0
response.read_body do |chunk|
if cancelled
http.finish
return
break
end
decoded_chunk = decode(chunk)
response_raw << decoded_chunk
# Buffering for extremely slow streaming.
if (leftover + decoded_chunk).length < "data: [DONE]".length
leftover += decoded_chunk
redo_chunk = leftover + decoded_chunk
raw_partials = partials_from(redo_chunk)
raw_partials =
raw_partials[prev_processed_partials..-1] if prev_processed_partials > 0
if raw_partials.blank? || (raw_partials.size == 1 && raw_partials.first.blank?)
leftover = redo_chunk
next
end
partials_from(leftover + decoded_chunk).each do |raw_partial|
json_error = false
raw_partials.each do |raw_partial|
json_error = false
prev_processed_partials += 1
next if cancelled
next if raw_partial.blank?
begin
partial = extract_completion_from(raw_partial)
next if partial.nil?
leftover = ""
if has_tool?(response_data, partial)
function_buffer = add_to_buffer(function_buffer, response_data, partial)
@ -134,9 +145,17 @@ module DiscourseAi
yield partial, cancel if partial
end
rescue JSON::ParserError
leftover += decoded_chunk
leftover = redo_chunk
json_error = true
end
end
if json_error
prev_processed_partials -= 1
else
leftover = ""
end
prev_processed_partials = 0 if leftover.blank?
end
rescue IOError, StandardError
raise if !cancelled

View File

@ -45,7 +45,7 @@ module ::DiscourseAi
end
headers = { "Content-Type" => "application/json" }
if url.host.include? ("azure")
if url.host.include?("azure")
headers["api-key"] = SiteSetting.ai_openai_api_key
else
headers["Authorization"] = "Bearer #{SiteSetting.ai_openai_api_key}"
@ -131,7 +131,7 @@ module ::DiscourseAi
response.read_body do |chunk|
if cancelled
http.finish
return
break
end
response_raw << chunk

View File

@ -114,4 +114,103 @@ RSpec.describe DiscourseAi::Completions::Endpoints::OpenAi do
end
it_behaves_like "an endpoint that can communicate with a completion service"
context "when chunked encoding returns partial chunks" do
# See: https://github.com/bblimke/webmock/issues/629
let(:mock_net_http) do
Class.new(Net::HTTP) do
def request(*)
super do |response|
response.instance_eval do
def read_body(*, &)
@body.each(&)
end
end
yield response if block_given?
response
end
end
end
end
let(:remove_original_net_http) { Net.send(:remove_const, :HTTP) }
let(:original_http) { remove_original_net_http }
let(:stub_net_http) { Net.send(:const_set, :HTTP, mock_net_http) }
let(:remove_stubbed_net_http) { Net.send(:remove_const, :HTTP) }
let(:restore_net_http) { Net.send(:const_set, :HTTP, original_http) }
before do
mock_net_http
remove_original_net_http
stub_net_http
end
after do
remove_stubbed_net_http
restore_net_http
end
it "will automatically recover from a bad payload" do
# this should not happen, but lets ensure nothing bad happens
# the row with test1 is invalid json
raw_data = <<~TEXT
d|a|t|a|:| |{|"choices":[{"delta":{"content":"test,"}}]}
data: {"choices":[{"delta":{"content":"test1,"}}]
data: {"choices":[{"delta":|{"content":"test2,"}}]}
data: {"choices":[{"delta":{"content":"test3,"}}]|}
data: {"choices":[{|"|d|elta":{"content":"test4"}}]|}
data: [D|ONE]
TEXT
chunks = raw_data.split("|")
stub_request(:post, "https://api.openai.com/v1/chat/completions").to_return(
status: 200,
body: chunks,
)
partials = []
llm = DiscourseAi::Completions::Llm.proxy("gpt-3.5-turbo")
llm.completion!({ insts: "test" }, Discourse.system_user) { |partial| partials << partial }
expect(partials.join).to eq("test,test2,test3,test4")
end
it "supports chunked encoding properly" do
raw_data = <<~TEXT
da|ta: {"choices":[{"delta":{"content":"test,"}}]}
data: {"choices":[{"delta":{"content":"test1,"}}]}
data: {"choices":[{"delta":|{"content":"test2,"}}]}
data: {"choices":[{"delta":{"content":"test3,"}}]|}
data: {"choices":[{|"|d|elta":{"content":"test4"}}]|}
data: [D|ONE]
TEXT
chunks = raw_data.split("|")
stub_request(:post, "https://api.openai.com/v1/chat/completions").to_return(
status: 200,
body: chunks,
)
partials = []
llm = DiscourseAi::Completions::Llm.proxy("gpt-3.5-turbo")
llm.completion!({ insts: "test" }, Discourse.system_user) { |partial| partials << partial }
expect(partials.join).to eq("test,test1,test2,test3,test4")
end
end
end

View File

@ -304,18 +304,20 @@ describe DiscourseAi::Inference::OpenAiCompletions do
restore_net_http
end
it "supports extremely slow streaming under new interface" do
it "recovers from chunked payload issues" do
raw_data = <<~TEXT
data: {"choices":[{"delta":{"content":"test"}}]}
da|ta: |{"choices":[{"delta":{"content"|:"test"}}]}
data: {"choices":[{"delta":{"content":"test1"}}]}
data: {"choices":[{"delta":{"content":"test2"}}]}
data: {"choices":[{"delta":{"conte|nt":"test2"}}]|}
data: {"ch|oices":[{"delta|":{"content":"test3"}}]}
data: [DONE]
TEXT
chunks = raw_data.split("")
chunks = raw_data.split("|")
stub_request(:post, "https://api.openai.com/v1/chat/completions").to_return(
status: 200,
@ -323,10 +325,19 @@ data: [DONE]
)
partials = []
llm = DiscourseAi::Completions::Llm.proxy("gpt-3.5-turbo")
llm.completion!({ insts: "test" }, user) { |partial| partials << partial }
DiscourseAi::Inference::OpenAiCompletions.perform!([], "gpt-3.5-turbo") do |partial, cancel|
partials << partial
end
expect(partials.join).to eq("testtest1test2")
expect(partials.length).to eq(4)
expect(partials).to eq(
[
{ choices: [{ delta: { content: "test" } }] },
{ choices: [{ delta: { content: "test1" } }] },
{ choices: [{ delta: { content: "test2" } }] },
{ choices: [{ delta: { content: "test3" } }] },
],
)
end
it "support extremely slow streaming" do