diff --git a/lib/completions/endpoints/base.rb b/lib/completions/endpoints/base.rb index ef92a162..633ae9a9 100644 --- a/lib/completions/endpoints/base.rb +++ b/lib/completions/endpoints/base.rb @@ -90,30 +90,41 @@ module DiscourseAi leftover = "" function_buffer = build_buffer # Nokogiri document + prev_processed_partials = 0 response.read_body do |chunk| if cancelled http.finish - return + break end decoded_chunk = decode(chunk) response_raw << decoded_chunk - # Buffering for extremely slow streaming. - if (leftover + decoded_chunk).length < "data: [DONE]".length - leftover += decoded_chunk + redo_chunk = leftover + decoded_chunk + + raw_partials = partials_from(redo_chunk) + + raw_partials = + raw_partials[prev_processed_partials..-1] if prev_processed_partials > 0 + + if raw_partials.blank? || (raw_partials.size == 1 && raw_partials.first.blank?) + leftover = redo_chunk next end - partials_from(leftover + decoded_chunk).each do |raw_partial| + json_error = false + + raw_partials.each do |raw_partial| + json_error = false + prev_processed_partials += 1 + next if cancelled next if raw_partial.blank? begin partial = extract_completion_from(raw_partial) next if partial.nil? - leftover = "" if has_tool?(response_data, partial) function_buffer = add_to_buffer(function_buffer, response_data, partial) @@ -134,9 +145,17 @@ module DiscourseAi yield partial, cancel if partial end rescue JSON::ParserError - leftover += decoded_chunk + leftover = redo_chunk + json_error = true end end + + if json_error + prev_processed_partials -= 1 + else + leftover = "" + end + prev_processed_partials = 0 if leftover.blank? end rescue IOError, StandardError raise if !cancelled diff --git a/lib/inference/open_ai_completions.rb b/lib/inference/open_ai_completions.rb index 6affab7d..71dd8bdc 100644 --- a/lib/inference/open_ai_completions.rb +++ b/lib/inference/open_ai_completions.rb @@ -45,7 +45,7 @@ module ::DiscourseAi end headers = { "Content-Type" => "application/json" } - if url.host.include? ("azure") + if url.host.include?("azure") headers["api-key"] = SiteSetting.ai_openai_api_key else headers["Authorization"] = "Bearer #{SiteSetting.ai_openai_api_key}" @@ -131,7 +131,7 @@ module ::DiscourseAi response.read_body do |chunk| if cancelled http.finish - return + break end response_raw << chunk diff --git a/spec/lib/completions/endpoints/open_ai_spec.rb b/spec/lib/completions/endpoints/open_ai_spec.rb index 929c087e..2b72164f 100644 --- a/spec/lib/completions/endpoints/open_ai_spec.rb +++ b/spec/lib/completions/endpoints/open_ai_spec.rb @@ -114,4 +114,103 @@ RSpec.describe DiscourseAi::Completions::Endpoints::OpenAi do end it_behaves_like "an endpoint that can communicate with a completion service" + + context "when chunked encoding returns partial chunks" do + # See: https://github.com/bblimke/webmock/issues/629 + let(:mock_net_http) do + Class.new(Net::HTTP) do + def request(*) + super do |response| + response.instance_eval do + def read_body(*, &) + @body.each(&) + end + end + + yield response if block_given? + + response + end + end + end + end + + let(:remove_original_net_http) { Net.send(:remove_const, :HTTP) } + let(:original_http) { remove_original_net_http } + let(:stub_net_http) { Net.send(:const_set, :HTTP, mock_net_http) } + + let(:remove_stubbed_net_http) { Net.send(:remove_const, :HTTP) } + let(:restore_net_http) { Net.send(:const_set, :HTTP, original_http) } + + before do + mock_net_http + remove_original_net_http + stub_net_http + end + + after do + remove_stubbed_net_http + restore_net_http + end + + it "will automatically recover from a bad payload" do + # this should not happen, but lets ensure nothing bad happens + # the row with test1 is invalid json + raw_data = <<~TEXT +d|a|t|a|:| |{|"choices":[{"delta":{"content":"test,"}}]} + +data: {"choices":[{"delta":{"content":"test1,"}}] + +data: {"choices":[{"delta":|{"content":"test2,"}}]} + +data: {"choices":[{"delta":{"content":"test3,"}}]|} + +data: {"choices":[{|"|d|elta":{"content":"test4"}}]|} + +data: [D|ONE] + TEXT + + chunks = raw_data.split("|") + + stub_request(:post, "https://api.openai.com/v1/chat/completions").to_return( + status: 200, + body: chunks, + ) + + partials = [] + llm = DiscourseAi::Completions::Llm.proxy("gpt-3.5-turbo") + llm.completion!({ insts: "test" }, Discourse.system_user) { |partial| partials << partial } + + expect(partials.join).to eq("test,test2,test3,test4") + end + + it "supports chunked encoding properly" do + raw_data = <<~TEXT +da|ta: {"choices":[{"delta":{"content":"test,"}}]} + +data: {"choices":[{"delta":{"content":"test1,"}}]} + +data: {"choices":[{"delta":|{"content":"test2,"}}]} + +data: {"choices":[{"delta":{"content":"test3,"}}]|} + +data: {"choices":[{|"|d|elta":{"content":"test4"}}]|} + +data: [D|ONE] + TEXT + + chunks = raw_data.split("|") + + stub_request(:post, "https://api.openai.com/v1/chat/completions").to_return( + status: 200, + body: chunks, + ) + + partials = [] + llm = DiscourseAi::Completions::Llm.proxy("gpt-3.5-turbo") + llm.completion!({ insts: "test" }, Discourse.system_user) { |partial| partials << partial } + + expect(partials.join).to eq("test,test1,test2,test3,test4") + end + end end diff --git a/spec/shared/inference/openai_completions_spec.rb b/spec/shared/inference/openai_completions_spec.rb index ad90dd30..2c353d79 100644 --- a/spec/shared/inference/openai_completions_spec.rb +++ b/spec/shared/inference/openai_completions_spec.rb @@ -304,18 +304,20 @@ describe DiscourseAi::Inference::OpenAiCompletions do restore_net_http end - it "supports extremely slow streaming under new interface" do + it "recovers from chunked payload issues" do raw_data = <<~TEXT -data: {"choices":[{"delta":{"content":"test"}}]} +da|ta: |{"choices":[{"delta":{"content"|:"test"}}]} data: {"choices":[{"delta":{"content":"test1"}}]} -data: {"choices":[{"delta":{"content":"test2"}}]} +data: {"choices":[{"delta":{"conte|nt":"test2"}}]|} + +data: {"ch|oices":[{"delta|":{"content":"test3"}}]} data: [DONE] TEXT - chunks = raw_data.split("") + chunks = raw_data.split("|") stub_request(:post, "https://api.openai.com/v1/chat/completions").to_return( status: 200, @@ -323,10 +325,19 @@ data: [DONE] ) partials = [] - llm = DiscourseAi::Completions::Llm.proxy("gpt-3.5-turbo") - llm.completion!({ insts: "test" }, user) { |partial| partials << partial } + DiscourseAi::Inference::OpenAiCompletions.perform!([], "gpt-3.5-turbo") do |partial, cancel| + partials << partial + end - expect(partials.join).to eq("testtest1test2") + expect(partials.length).to eq(4) + expect(partials).to eq( + [ + { choices: [{ delta: { content: "test" } }] }, + { choices: [{ delta: { content: "test1" } }] }, + { choices: [{ delta: { content: "test2" } }] }, + { choices: [{ delta: { content: "test3" } }] }, + ], + ) end it "support extremely slow streaming" do