Fixes #2616 - Trailers preventing client from processing all the data. (#2623)

* Fixes #2616 - Trailers preventing client from processing all the data.

Trailer handling was erroneously firing the response success event
before all the response content events happened.

Now the trailer handling uses a poison-pill DATA frame to make sure that
all response content events happen before the response success event.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2018-06-05 18:55:55 +02:00 committed by GitHub
parent f6be906caf
commit e5554831e0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 74 additions and 5 deletions

View File

@ -19,6 +19,7 @@
package org.eclipse.jetty.http2.client.http;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.Collections;
import java.util.List;
@ -43,6 +44,7 @@ import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.frames.PushPromiseFrame;
import org.eclipse.jetty.http2.frames.ResetFrame;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.util.Retainable;
@ -101,11 +103,11 @@ public class HttpReceiverOverHTTP2 extends HttpReceiver implements Stream.Listen
}
}
}
else
else // Response trailers.
{
HttpFields trailers = metaData.getFields();
trailers.forEach(httpResponse::trailer);
responseSuccess(exchange);
notifyContent(exchange, new DataFrame(stream.getId(), BufferUtil.EMPTY_BUFFER, true), Callback.NOOP);
}
}
@ -153,8 +155,7 @@ public class HttpReceiverOverHTTP2 extends HttpReceiver implements Stream.Listen
}
else
{
contentNotifier.offer(new DataInfo(exchange, frame, callback));
contentNotifier.iterate();
notifyContent(exchange, frame, callback);
}
}
@ -177,6 +178,12 @@ public class HttpReceiverOverHTTP2 extends HttpReceiver implements Stream.Listen
return true;
}
private void notifyContent(HttpExchange exchange, DataFrame frame, Callback callback)
{
contentNotifier.offer(new DataInfo(exchange, frame, callback));
contentNotifier.iterate();
}
private class ContentNotifier extends IteratingCallback implements Retainable
{
private final Queue<DataInfo> queue = new ArrayDeque<>();
@ -208,7 +215,11 @@ public class HttpReceiverOverHTTP2 extends HttpReceiver implements Stream.Listen
}
this.dataInfo = dataInfo;
responseContent(dataInfo.exchange, dataInfo.frame.getData(), this);
ByteBuffer buffer = dataInfo.frame.getData();
if (buffer.hasRemaining())
responseContent(dataInfo.exchange, buffer, this);
else
succeeded();
return Action.SCHEDULED;
}

View File

@ -18,8 +18,11 @@
package org.eclipse.jetty.http.client;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@ -32,6 +35,7 @@ import org.eclipse.jetty.client.HttpRequest;
import org.eclipse.jetty.client.HttpResponse;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.util.BytesContentProvider;
import org.eclipse.jetty.client.util.InputStreamResponseListener;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpStatus;
@ -224,4 +228,58 @@ public class HttpTrailersTest extends AbstractTest
Assert.assertEquals(HttpStatus.OK_200, response.getStatus());
Assert.assertNull(failure.get());
}
@Test
public void testResponseTrailersWithLargeContent() throws Exception
{
byte[] content = new byte[1024 * 1024];
new Random().nextBytes(content);
String trailerName = "Trailer";
String trailerValue = "value";
start(new AbstractHandler.ErrorDispatchHandler()
{
@Override
protected void doNonErrorHandle(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
jettyRequest.setHandled(true);
HttpFields trailers = new HttpFields();
trailers.put(trailerName, trailerValue);
Response jettyResponse = (Response)response;
jettyResponse.setTrailers(() -> trailers);
// Write a large content
response.getOutputStream().write(content);
}
});
InputStreamResponseListener listener = new InputStreamResponseListener();
client.newRequest(newURI())
.timeout(15, TimeUnit.SECONDS)
.send(listener);
org.eclipse.jetty.client.api.Response response = listener.get(5, TimeUnit.SECONDS);
Assert.assertEquals(HttpStatus.OK_200, response.getStatus());
InputStream input = listener.getInputStream();
ByteArrayOutputStream output = new ByteArrayOutputStream();
// Read slowly.
while (true)
{
int read = input.read();
if (read < 0)
break;
output.write(read);
}
Assert.assertArrayEquals(content, output.toByteArray());
// Wait for the request/response cycle to complete.
listener.await(5, TimeUnit.SECONDS);
HttpResponse httpResponse = (HttpResponse)response;
HttpFields trailers = httpResponse.getTrailers();
Assert.assertNotNull(trailers);
Assert.assertEquals(trailerValue, trailers.get(trailerName));
}
}