Merged branch 'jetty-9.4.x' into 'jetty-10.0.x'.
This commit is contained in:
commit
790051556f
|
@ -22,12 +22,10 @@ import java.nio.ByteBuffer;
|
|||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.RejectedExecutionException;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import org.eclipse.jetty.client.api.ContentProvider;
|
||||
import org.eclipse.jetty.client.api.Request;
|
||||
import org.eclipse.jetty.client.api.Result;
|
||||
import org.eclipse.jetty.http.HttpFields;
|
||||
import org.eclipse.jetty.http.HttpHeader;
|
||||
import org.eclipse.jetty.http.HttpHeaderValue;
|
||||
import org.eclipse.jetty.util.BufferUtil;
|
||||
|
@ -67,7 +65,6 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
|
|||
private final AtomicReference<SenderState> senderState = new AtomicReference<>(SenderState.IDLE);
|
||||
private final Callback commitCallback = new CommitCallback();
|
||||
private final IteratingCallback contentCallback = new ContentCallback();
|
||||
private final Callback trailersCallback = new TrailersCallback();
|
||||
private final Callback lastCallback = new LastCallback();
|
||||
private final HttpChannel channel;
|
||||
private HttpContent content;
|
||||
|
@ -444,15 +441,6 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
|
|||
*/
|
||||
protected abstract void sendContent(HttpExchange exchange, HttpContent content, Callback callback);
|
||||
|
||||
/**
|
||||
* Implementations should send the HTTP trailers and notify the given {@code callback} of the
|
||||
* result of this operation.
|
||||
*
|
||||
* @param exchange the exchange to send
|
||||
* @param callback the callback to notify
|
||||
*/
|
||||
protected abstract void sendTrailers(HttpExchange exchange, Callback callback);
|
||||
|
||||
protected void reset()
|
||||
{
|
||||
HttpContent content = this.content;
|
||||
|
@ -745,24 +733,14 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
|
|||
if (content == null)
|
||||
return;
|
||||
|
||||
HttpRequest request = exchange.getRequest();
|
||||
Supplier<HttpFields> trailers = request.getTrailers();
|
||||
boolean hasContent = content.hasContent();
|
||||
if (!hasContent)
|
||||
if (!content.hasContent())
|
||||
{
|
||||
if (trailers == null)
|
||||
{
|
||||
// No trailers or content to send, we are done.
|
||||
someToSuccess(exchange);
|
||||
}
|
||||
else
|
||||
{
|
||||
sendTrailers(exchange, lastCallback);
|
||||
}
|
||||
// No content to send, we are done.
|
||||
someToSuccess(exchange);
|
||||
}
|
||||
else
|
||||
{
|
||||
// Was any content sent while committing ?
|
||||
// Was any content sent while committing?
|
||||
ByteBuffer contentBuffer = content.getContent();
|
||||
if (contentBuffer != null)
|
||||
{
|
||||
|
@ -859,9 +837,7 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
|
|||
|
||||
if (lastContent)
|
||||
{
|
||||
HttpRequest request = exchange.getRequest();
|
||||
Supplier<HttpFields> trailers = request.getTrailers();
|
||||
sendContent(exchange, content, trailers == null ? lastCallback : trailersCallback);
|
||||
sendContent(exchange, content, lastCallback);
|
||||
return Action.IDLE;
|
||||
}
|
||||
|
||||
|
@ -925,28 +901,6 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
|
|||
}
|
||||
}
|
||||
|
||||
private class TrailersCallback implements Callback
|
||||
{
|
||||
@Override
|
||||
public void succeeded()
|
||||
{
|
||||
HttpExchange exchange = getHttpExchange();
|
||||
if (exchange == null)
|
||||
return;
|
||||
sendTrailers(exchange, lastCallback);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void failed(Throwable x)
|
||||
{
|
||||
HttpContent content = HttpSender.this.content;
|
||||
if (content == null)
|
||||
return;
|
||||
content.failed(x);
|
||||
anyToFailure(x);
|
||||
}
|
||||
}
|
||||
|
||||
private class LastCallback implements Callback
|
||||
{
|
||||
@Override
|
||||
|
|
|
@ -59,7 +59,7 @@ public class HttpSenderOverHTTP extends HttpSender
|
|||
{
|
||||
try
|
||||
{
|
||||
new HeadersCallback(exchange, content, callback, getHttpChannel().getHttpConnection()).iterate();
|
||||
new HeadersCallback(exchange, content, callback).iterate();
|
||||
}
|
||||
catch (Throwable x)
|
||||
{
|
||||
|
@ -83,8 +83,8 @@ public class HttpSenderOverHTTP extends HttpSender
|
|||
HttpGenerator.Result result = generator.generateRequest(null, null, chunk, contentBuffer, lastContent);
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Generated content ({} bytes) - {}/{}",
|
||||
contentBuffer == null ? -1 : contentBuffer.remaining(),
|
||||
result, generator);
|
||||
contentBuffer == null ? -1 : contentBuffer.remaining(),
|
||||
result, generator);
|
||||
switch (result)
|
||||
{
|
||||
case NEED_CHUNK:
|
||||
|
@ -94,8 +94,8 @@ public class HttpSenderOverHTTP extends HttpSender
|
|||
}
|
||||
case NEED_CHUNK_TRAILER:
|
||||
{
|
||||
callback.succeeded();
|
||||
return;
|
||||
chunk = bufferPool.acquire(httpClient.getRequestBufferSize(), false);
|
||||
break;
|
||||
}
|
||||
case FLUSH:
|
||||
{
|
||||
|
@ -138,21 +138,6 @@ public class HttpSenderOverHTTP extends HttpSender
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void sendTrailers(HttpExchange exchange, Callback callback)
|
||||
{
|
||||
try
|
||||
{
|
||||
new TrailersCallback(callback).iterate();
|
||||
}
|
||||
catch (Throwable x)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug(x);
|
||||
callback.failed(x);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void reset()
|
||||
{
|
||||
|
@ -191,19 +176,17 @@ public class HttpSenderOverHTTP extends HttpSender
|
|||
private final HttpExchange exchange;
|
||||
private final Callback callback;
|
||||
private final MetaData.Request metaData;
|
||||
private final HttpConnectionOverHTTP httpConnectionOverHTTP;
|
||||
private ByteBuffer headerBuffer;
|
||||
private ByteBuffer chunkBuffer;
|
||||
private ByteBuffer contentBuffer;
|
||||
private boolean lastContent;
|
||||
private boolean generated;
|
||||
|
||||
public HeadersCallback(HttpExchange exchange, HttpContent content, Callback callback, HttpConnectionOverHTTP httpConnectionOverHTTP)
|
||||
public HeadersCallback(HttpExchange exchange, HttpContent content, Callback callback)
|
||||
{
|
||||
super(false);
|
||||
this.exchange = exchange;
|
||||
this.callback = callback;
|
||||
this.httpConnectionOverHTTP = httpConnectionOverHTTP;
|
||||
|
||||
HttpRequest request = exchange.getRequest();
|
||||
ContentProvider requestContent = request.getContent();
|
||||
|
@ -231,10 +214,10 @@ public class HttpSenderOverHTTP extends HttpSender
|
|||
HttpGenerator.Result result = generator.generateRequest(metaData, headerBuffer, chunkBuffer, contentBuffer, lastContent);
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Generated headers ({} bytes), chunk ({} bytes), content ({} bytes) - {}/{}",
|
||||
headerBuffer == null ? -1 : headerBuffer.remaining(),
|
||||
chunkBuffer == null ? -1 : chunkBuffer.remaining(),
|
||||
contentBuffer == null ? -1 : contentBuffer.remaining(),
|
||||
result, generator);
|
||||
headerBuffer == null ? -1 : headerBuffer.remaining(),
|
||||
chunkBuffer == null ? -1 : chunkBuffer.remaining(),
|
||||
contentBuffer == null ? -1 : contentBuffer.remaining(),
|
||||
result, generator);
|
||||
switch (result)
|
||||
{
|
||||
case NEED_HEADER:
|
||||
|
@ -249,7 +232,8 @@ public class HttpSenderOverHTTP extends HttpSender
|
|||
}
|
||||
case NEED_CHUNK_TRAILER:
|
||||
{
|
||||
return Action.SUCCEEDED;
|
||||
chunkBuffer = httpClient.getByteBufferPool().acquire(httpClient.getRequestBufferSize(), false);
|
||||
break;
|
||||
}
|
||||
case FLUSH:
|
||||
{
|
||||
|
@ -260,11 +244,8 @@ public class HttpSenderOverHTTP extends HttpSender
|
|||
chunkBuffer = BufferUtil.EMPTY_BUFFER;
|
||||
if (contentBuffer == null)
|
||||
contentBuffer = BufferUtil.EMPTY_BUFFER;
|
||||
|
||||
httpConnectionOverHTTP.addBytesOut( BufferUtil.length(headerBuffer)
|
||||
+ BufferUtil.length(chunkBuffer)
|
||||
+ BufferUtil.length(contentBuffer));
|
||||
|
||||
long bytes = headerBuffer.remaining() + chunkBuffer.remaining() + contentBuffer.remaining();
|
||||
getHttpChannel().getHttpConnection().addBytesOut(bytes);
|
||||
endPoint.write(this, headerBuffer, chunkBuffer, contentBuffer);
|
||||
generated = true;
|
||||
return Action.SCHEDULED;
|
||||
|
@ -331,83 +312,6 @@ public class HttpSenderOverHTTP extends HttpSender
|
|||
}
|
||||
}
|
||||
|
||||
private class TrailersCallback extends IteratingCallback
|
||||
{
|
||||
private final Callback callback;
|
||||
private ByteBuffer chunkBuffer;
|
||||
|
||||
public TrailersCallback(Callback callback)
|
||||
{
|
||||
this.callback = callback;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Action process() throws Throwable
|
||||
{
|
||||
while (true)
|
||||
{
|
||||
HttpGenerator.Result result = generator.generateRequest(null, null, chunkBuffer, null, true);
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Generated trailers {}/{}", result, generator);
|
||||
switch (result)
|
||||
{
|
||||
case NEED_CHUNK_TRAILER:
|
||||
{
|
||||
chunkBuffer = httpClient.getByteBufferPool().acquire(httpClient.getRequestBufferSize(), false);
|
||||
break;
|
||||
}
|
||||
case FLUSH:
|
||||
{
|
||||
EndPoint endPoint = getHttpChannel().getHttpConnection().getEndPoint();
|
||||
endPoint.write(this, chunkBuffer);
|
||||
return Action.SCHEDULED;
|
||||
}
|
||||
case SHUTDOWN_OUT:
|
||||
{
|
||||
shutdownOutput();
|
||||
return Action.SUCCEEDED;
|
||||
}
|
||||
case DONE:
|
||||
{
|
||||
return Action.SUCCEEDED;
|
||||
}
|
||||
default:
|
||||
{
|
||||
throw new IllegalStateException(result.toString());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void succeeded()
|
||||
{
|
||||
release();
|
||||
super.succeeded();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void failed(Throwable x)
|
||||
{
|
||||
release();
|
||||
callback.failed(x);
|
||||
super.failed(x);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void onCompleteSuccess()
|
||||
{
|
||||
super.onCompleteSuccess();
|
||||
callback.succeeded();
|
||||
}
|
||||
|
||||
private void release()
|
||||
{
|
||||
httpClient.getByteBufferPool().release(chunkBuffer);
|
||||
chunkBuffer = null;
|
||||
}
|
||||
}
|
||||
|
||||
private class ByteBufferRecyclerCallback extends Callback.Nested
|
||||
{
|
||||
private final ByteBufferPool pool;
|
||||
|
@ -435,7 +339,9 @@ public class HttpSenderOverHTTP extends HttpSender
|
|||
public void failed(Throwable x)
|
||||
{
|
||||
for (ByteBuffer buffer : buffers)
|
||||
{
|
||||
pool.release(buffer);
|
||||
}
|
||||
super.failed(x);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -125,10 +125,4 @@ public class HttpSenderOverFCGI extends HttpSender
|
|||
getHttpChannel().flush(result);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void sendTrailers(HttpExchange exchange, Callback callback)
|
||||
{
|
||||
callback.succeeded();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -26,6 +26,7 @@ import java.util.List;
|
|||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import javax.servlet.ServletInputStream;
|
||||
import javax.servlet.http.HttpServlet;
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
|
@ -58,7 +59,6 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
|
|||
import static org.junit.jupiter.api.Assertions.assertNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
|
||||
public class TrailersTest extends AbstractTest
|
||||
{
|
||||
@Test
|
||||
|
@ -286,7 +286,7 @@ public class TrailersTest extends AbstractTest
|
|||
|
||||
assertTrue(latch.await(5, TimeUnit.SECONDS));
|
||||
|
||||
assertTrue( frames.size()==3, frames.toString());
|
||||
assertEquals(3, frames.size(), frames.toString());
|
||||
|
||||
HeadersFrame headers = (HeadersFrame)frames.get(0);
|
||||
DataFrame data = (DataFrame)frames.get(1);
|
||||
|
@ -295,7 +295,7 @@ public class TrailersTest extends AbstractTest
|
|||
assertFalse(headers.isEndStream());
|
||||
assertFalse(data.isEndStream());
|
||||
assertTrue(trailers.isEndStream());
|
||||
assertTrue(trailers.getMetaData().getFields().get(trailerName).equals(trailerValue));
|
||||
assertEquals(trailers.getMetaData().getFields().get(trailerName), trailerValue);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -355,6 +355,5 @@ public class TrailersTest extends AbstractTest
|
|||
|
||||
assertTrue(serverLatch.await(5, TimeUnit.SECONDS));
|
||||
assertTrue(clientLatch.await(5, TimeUnit.SECONDS));
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -56,42 +56,57 @@ public class HttpSenderOverHTTP2 extends HttpSender
|
|||
String path = relativize(request.getPath());
|
||||
HttpURI uri = HttpURI.createHttpURI(request.getScheme(), request.getHost(), request.getPort(), path, null, request.getQuery(), null);
|
||||
MetaData.Request metaData = new MetaData.Request(request.getMethod(), uri, HttpVersion.HTTP_2, request.getHeaders());
|
||||
Supplier<HttpFields> trailers = request.getTrailers();
|
||||
metaData.setTrailerSupplier(trailers);
|
||||
HeadersFrame headersFrame = new HeadersFrame(metaData, null, trailers == null && !content.hasContent());
|
||||
HttpChannelOverHTTP2 channel = getHttpChannel();
|
||||
Promise<Stream> promise = new Promise<Stream>()
|
||||
{
|
||||
@Override
|
||||
public void succeeded(Stream stream)
|
||||
{
|
||||
channel.setStream(stream);
|
||||
((IStream)stream).setAttachment(channel);
|
||||
long idleTimeout = request.getIdleTimeout();
|
||||
if (idleTimeout >= 0)
|
||||
stream.setIdleTimeout(idleTimeout);
|
||||
Supplier<HttpFields> trailerSupplier = request.getTrailers();
|
||||
metaData.setTrailerSupplier(trailerSupplier);
|
||||
|
||||
if (content.hasContent() && !expects100Continue(request))
|
||||
HeadersFrame headersFrame;
|
||||
Promise<Stream> promise;
|
||||
if (content.hasContent())
|
||||
{
|
||||
headersFrame = new HeadersFrame(metaData, null, false);
|
||||
promise = new HeadersPromise(request, callback)
|
||||
{
|
||||
@Override
|
||||
public void succeeded(Stream stream)
|
||||
{
|
||||
boolean advanced = content.advance();
|
||||
boolean lastContent = trailers == null && content.isLast();
|
||||
if (advanced || lastContent)
|
||||
super.succeeded(stream);
|
||||
if (expects100Continue(request))
|
||||
{
|
||||
DataFrame dataFrame = new DataFrame(stream.getId(), content.getByteBuffer(), lastContent);
|
||||
stream.data(dataFrame, callback);
|
||||
return;
|
||||
// Don't send the content yet.
|
||||
callback.succeeded();
|
||||
}
|
||||
else
|
||||
{
|
||||
boolean advanced = content.advance();
|
||||
boolean lastContent = content.isLast();
|
||||
if (advanced || lastContent)
|
||||
sendContent(stream, content, trailerSupplier, callback);
|
||||
else
|
||||
callback.succeeded();
|
||||
}
|
||||
}
|
||||
callback.succeeded();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void failed(Throwable failure)
|
||||
};
|
||||
}
|
||||
else
|
||||
{
|
||||
HttpFields trailers = trailerSupplier == null ? null : trailerSupplier.get();
|
||||
boolean endStream = trailers == null || trailers.size() == 0;
|
||||
headersFrame = new HeadersFrame(metaData, null, endStream);
|
||||
promise = new HeadersPromise(request, callback)
|
||||
{
|
||||
callback.failed(failure);
|
||||
}
|
||||
};
|
||||
@Override
|
||||
public void succeeded(Stream stream)
|
||||
{
|
||||
super.succeeded(stream);
|
||||
if (endStream)
|
||||
callback.succeeded();
|
||||
else
|
||||
sendTrailers(stream, trailers, callback);
|
||||
}
|
||||
};
|
||||
}
|
||||
// TODO optimize the send of HEADERS and DATA frames.
|
||||
HttpChannelOverHTTP2 channel = getHttpChannel();
|
||||
channel.getSession().newStream(headersFrame, promise, channel.getStreamListener());
|
||||
}
|
||||
|
||||
|
@ -118,24 +133,67 @@ public class HttpSenderOverHTTP2 extends HttpSender
|
|||
{
|
||||
if (content.isConsumed())
|
||||
{
|
||||
// The superclass calls sendContent() one more time after the last content.
|
||||
// This is necessary for HTTP/1.1 to generate the terminal chunk (with trailers),
|
||||
// but it's not necessary for HTTP/2 so we just succeed the callback.
|
||||
callback.succeeded();
|
||||
}
|
||||
else
|
||||
{
|
||||
Stream stream = getHttpChannel().getStream();
|
||||
Supplier<HttpFields> trailers = exchange.getRequest().getTrailers();
|
||||
DataFrame frame = new DataFrame(stream.getId(), content.getByteBuffer(), trailers == null && content.isLast());
|
||||
stream.data(frame, callback);
|
||||
Supplier<HttpFields> trailerSupplier = exchange.getRequest().getTrailers();
|
||||
sendContent(stream, content, trailerSupplier, callback);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void sendTrailers(HttpExchange exchange, Callback callback)
|
||||
private void sendContent(Stream stream, HttpContent content, Supplier<HttpFields> trailerSupplier, Callback callback)
|
||||
{
|
||||
Supplier<HttpFields> trailers = exchange.getRequest().getTrailers();
|
||||
MetaData metaData = new MetaData(HttpVersion.HTTP_2, trailers.get());
|
||||
Stream stream = getHttpChannel().getStream();
|
||||
boolean lastContent = content.isLast();
|
||||
HttpFields trailers = null;
|
||||
boolean endStream = false;
|
||||
if (lastContent)
|
||||
{
|
||||
trailers = trailerSupplier == null ? null : trailerSupplier.get();
|
||||
endStream = trailers == null || trailers.size() == 0;
|
||||
}
|
||||
DataFrame dataFrame = new DataFrame(stream.getId(), content.getByteBuffer(), endStream);
|
||||
HttpFields fTrailers = trailers;
|
||||
stream.data(dataFrame, endStream || !lastContent ? callback : Callback.from(() -> sendTrailers(stream, fTrailers, callback), callback::failed));
|
||||
}
|
||||
|
||||
private void sendTrailers(Stream stream, HttpFields trailers, Callback callback)
|
||||
{
|
||||
MetaData metaData = new MetaData(HttpVersion.HTTP_2, trailers);
|
||||
HeadersFrame trailersFrame = new HeadersFrame(stream.getId(), metaData, null, true);
|
||||
stream.headers(trailersFrame, callback);
|
||||
}
|
||||
|
||||
private class HeadersPromise implements Promise<Stream>
|
||||
{
|
||||
private final HttpRequest request;
|
||||
private final Callback callback;
|
||||
|
||||
private HeadersPromise(HttpRequest request, Callback callback)
|
||||
{
|
||||
this.request = request;
|
||||
this.callback = callback;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void succeeded(Stream stream)
|
||||
{
|
||||
HttpChannelOverHTTP2 channel = getHttpChannel();
|
||||
channel.setStream(stream);
|
||||
((IStream)stream).setAttachment(channel);
|
||||
long idleTimeout = request.getIdleTimeout();
|
||||
if (idleTimeout >= 0)
|
||||
stream.setIdleTimeout(idleTimeout);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void failed(Throwable x)
|
||||
{
|
||||
callback.failed(x);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,190 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd.
|
||||
// ------------------------------------------------------------------------
|
||||
// All rights reserved. This program and the accompanying materials
|
||||
// are made available under the terms of the Eclipse Public License v1.0
|
||||
// and Apache License v2.0 which accompanies this distribution.
|
||||
//
|
||||
// The Eclipse Public License is available at
|
||||
// http://www.eclipse.org/legal/epl-v10.html
|
||||
//
|
||||
// The Apache License v2.0 is available at
|
||||
// http://www.opensource.org/licenses/apache2.0.php
|
||||
//
|
||||
// You may elect to redistribute this code under either of these licenses.
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.http2.client.http;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.eclipse.jetty.client.HttpRequest;
|
||||
import org.eclipse.jetty.client.api.ContentResponse;
|
||||
import org.eclipse.jetty.client.util.DeferredContentProvider;
|
||||
import org.eclipse.jetty.client.util.StringContentProvider;
|
||||
import org.eclipse.jetty.http.HttpFields;
|
||||
import org.eclipse.jetty.http.HttpStatus;
|
||||
import org.eclipse.jetty.http.HttpVersion;
|
||||
import org.eclipse.jetty.http.MetaData;
|
||||
import org.eclipse.jetty.http2.api.Stream;
|
||||
import org.eclipse.jetty.http2.api.server.ServerSessionListener;
|
||||
import org.eclipse.jetty.http2.frames.DataFrame;
|
||||
import org.eclipse.jetty.http2.frames.HeadersFrame;
|
||||
import org.eclipse.jetty.util.Callback;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
public class RequestTrailersTest extends AbstractTest
|
||||
{
|
||||
@Test
|
||||
public void testEmptyTrailersWithoutContent() throws Exception
|
||||
{
|
||||
testEmptyTrailers(null);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEmptyTrailersWithEagerContent() throws Exception
|
||||
{
|
||||
testEmptyTrailers("eager_content");
|
||||
}
|
||||
|
||||
private void testEmptyTrailers(String content) throws Exception
|
||||
{
|
||||
CountDownLatch trailersLatch = new CountDownLatch(1);
|
||||
start(new ServerSessionListener.Adapter()
|
||||
{
|
||||
@Override
|
||||
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
|
||||
{
|
||||
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields());
|
||||
HeadersFrame responseFrame = new HeadersFrame(stream.getId(), response, null, true);
|
||||
stream.headers(responseFrame, Callback.NOOP);
|
||||
return new Stream.Listener.Adapter()
|
||||
{
|
||||
@Override
|
||||
public void onHeaders(Stream stream, HeadersFrame frame)
|
||||
{
|
||||
trailersLatch.countDown();
|
||||
}
|
||||
};
|
||||
}
|
||||
});
|
||||
|
||||
HttpRequest request = (HttpRequest)client.newRequest("localhost", connector.getLocalPort());
|
||||
HttpFields trailers = new HttpFields();
|
||||
request.trailers(() -> trailers);
|
||||
if (content != null)
|
||||
request.content(new StringContentProvider(content));
|
||||
|
||||
ContentResponse response = request.send();
|
||||
assertEquals(HttpStatus.OK_200, response.getStatus());
|
||||
|
||||
// The client must not send the trailers.
|
||||
assertFalse(trailersLatch.await(1, TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEmptyTrailersWithDeferredContent() throws Exception
|
||||
{
|
||||
start(new ServerSessionListener.Adapter()
|
||||
{
|
||||
@Override
|
||||
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
|
||||
{
|
||||
return new Stream.Listener.Adapter()
|
||||
{
|
||||
@Override
|
||||
public void onData(Stream stream, DataFrame dataFrame, Callback callback)
|
||||
{
|
||||
callback.succeeded();
|
||||
// We should not receive an empty HEADERS frame for the
|
||||
// trailers, but instead a DATA frame with endStream=true.
|
||||
if (dataFrame.isEndStream())
|
||||
{
|
||||
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields());
|
||||
HeadersFrame responseFrame = new HeadersFrame(stream.getId(), response, null, true);
|
||||
stream.headers(responseFrame, Callback.NOOP);
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
});
|
||||
|
||||
HttpRequest request = (HttpRequest)client.newRequest("localhost", connector.getLocalPort());
|
||||
HttpFields trailers = new HttpFields();
|
||||
request.trailers(() -> trailers);
|
||||
DeferredContentProvider content = new DeferredContentProvider();
|
||||
request.content(content);
|
||||
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
request.send(result ->
|
||||
{
|
||||
assertTrue(result.isSucceeded());
|
||||
assertEquals(HttpStatus.OK_200, result.getResponse().getStatus());
|
||||
latch.countDown();
|
||||
});
|
||||
|
||||
// Send deferred content after a while.
|
||||
Thread.sleep(1000);
|
||||
content.offer(ByteBuffer.wrap("deferred_content".getBytes(StandardCharsets.UTF_8)));
|
||||
content.close();
|
||||
|
||||
assertTrue(latch.await(5, TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEmptyTrailersWithEmptyDeferredContent() throws Exception
|
||||
{
|
||||
start(new ServerSessionListener.Adapter()
|
||||
{
|
||||
@Override
|
||||
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
|
||||
{
|
||||
return new Stream.Listener.Adapter()
|
||||
{
|
||||
@Override
|
||||
public void onData(Stream stream, DataFrame dataFrame, Callback callback)
|
||||
{
|
||||
callback.succeeded();
|
||||
// We should not receive an empty HEADERS frame for the
|
||||
// trailers, but instead a DATA frame with endStream=true.
|
||||
if (dataFrame.isEndStream())
|
||||
{
|
||||
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields());
|
||||
HeadersFrame responseFrame = new HeadersFrame(stream.getId(), response, null, true);
|
||||
stream.headers(responseFrame, Callback.NOOP);
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
});
|
||||
|
||||
HttpRequest request = (HttpRequest)client.newRequest("localhost", connector.getLocalPort());
|
||||
HttpFields trailers = new HttpFields();
|
||||
request.trailers(() -> trailers);
|
||||
DeferredContentProvider content = new DeferredContentProvider();
|
||||
request.content(content);
|
||||
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
request.send(result ->
|
||||
{
|
||||
assertTrue(result.isSucceeded());
|
||||
assertEquals(HttpStatus.OK_200, result.getResponse().getStatus());
|
||||
latch.countDown();
|
||||
});
|
||||
|
||||
// Send deferred content after a while.
|
||||
Thread.sleep(1000);
|
||||
content.close();
|
||||
|
||||
assertTrue(latch.await(5, TimeUnit.SECONDS));
|
||||
}
|
||||
}
|
|
@ -307,9 +307,36 @@ abstract public class WriteFlusher
|
|||
|
||||
private void fail(Callback callback, Throwable... suppressed)
|
||||
{
|
||||
FailedState failed = (FailedState)_state.get();
|
||||
Throwable cause;
|
||||
loop:
|
||||
while (true)
|
||||
{
|
||||
State state = _state.get();
|
||||
|
||||
switch (state.getType())
|
||||
{
|
||||
case FAILED:
|
||||
{
|
||||
FailedState failed = (FailedState)state;
|
||||
cause = failed.getCause();
|
||||
break loop;
|
||||
}
|
||||
|
||||
case IDLE:
|
||||
for (Throwable t : suppressed)
|
||||
LOG.warn(t);
|
||||
return;
|
||||
|
||||
default:
|
||||
Throwable t = new IllegalStateException();
|
||||
if (!_state.compareAndSet(state, new FailedState(t)))
|
||||
continue;
|
||||
|
||||
cause = t;
|
||||
break loop;
|
||||
}
|
||||
}
|
||||
|
||||
Throwable cause = failed.getCause();
|
||||
for (Throwable t : suppressed)
|
||||
{
|
||||
if (t != cause)
|
||||
|
|
|
@ -18,14 +18,6 @@
|
|||
|
||||
package org.eclipse.jetty.io;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.instanceOf;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.nio.ByteBuffer;
|
||||
|
@ -43,10 +35,18 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
|||
import org.eclipse.jetty.util.BufferUtil;
|
||||
import org.eclipse.jetty.util.Callback;
|
||||
import org.eclipse.jetty.util.FutureCallback;
|
||||
import org.eclipse.jetty.util.log.StacklessLogging;
|
||||
import org.hamcrest.Matchers;
|
||||
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.instanceOf;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
public class WriteFlusherTest
|
||||
{
|
||||
@Test
|
||||
|
@ -159,6 +159,43 @@ public class WriteFlusherTest
|
|||
assertTrue(flusher.isIdle());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCallbackThrows() throws Exception
|
||||
{
|
||||
ByteArrayEndPoint endPoint = new ByteArrayEndPoint(new byte[0], 100);
|
||||
|
||||
AtomicBoolean incompleteFlush = new AtomicBoolean(false);
|
||||
WriteFlusher flusher = new WriteFlusher(endPoint)
|
||||
{
|
||||
@Override
|
||||
protected void onIncompleteFlush()
|
||||
{
|
||||
incompleteFlush.set(true);
|
||||
}
|
||||
};
|
||||
|
||||
FutureCallback callback = new FutureCallback()
|
||||
{
|
||||
@Override
|
||||
public void succeeded()
|
||||
{
|
||||
super.succeeded();
|
||||
throw new IllegalStateException();
|
||||
}
|
||||
};
|
||||
|
||||
try (StacklessLogging stacklessLogging = new StacklessLogging(WriteFlusher.class))
|
||||
{
|
||||
flusher.write(callback, BufferUtil.toBuffer("How now brown cow!"));
|
||||
callback.get(100, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
assertEquals("How now brown cow!", endPoint.takeOutputString());
|
||||
assertTrue(callback.isDone());
|
||||
assertFalse(incompleteFlush.get());
|
||||
assertTrue(flusher.isIdle());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCloseWhileBlocking() throws Exception
|
||||
{
|
||||
|
|
|
@ -499,7 +499,9 @@ public class WebAppClassLoader extends URLClassLoader implements ClassVisibility
|
|||
// Try the parent loader
|
||||
try
|
||||
{
|
||||
parent_class = _parent.loadClass(name);
|
||||
parent_class = _parent.loadClass(name);
|
||||
if (parent_class == null)
|
||||
throw new ClassNotFoundException("Bad ClassLoader: returned null for loadClass(" + name + ")");
|
||||
|
||||
// If the webapp is allowed to see this class
|
||||
if (Boolean.TRUE.equals(__loadServerClasses.get()) || !_context.isServerClass(parent_class))
|
||||
|
|
|
@ -34,6 +34,7 @@ import org.eclipse.jetty.server.ServerConnector;
|
|||
import org.eclipse.jetty.servlet.ServletContextHandler;
|
||||
import org.eclipse.jetty.util.BufferUtil;
|
||||
import org.eclipse.jetty.websocket.api.Session;
|
||||
import org.eclipse.jetty.websocket.api.StatusCode;
|
||||
import org.eclipse.jetty.websocket.client.WebSocketClient;
|
||||
import org.eclipse.jetty.websocket.core.Frame;
|
||||
import org.eclipse.jetty.websocket.core.OpCode;
|
||||
|
@ -140,8 +141,11 @@ public class WebSocketStatsTest
|
|||
upgradeSentBytes = statistics.getSentBytes();
|
||||
upgradeReceivedBytes = statistics.getReceivedBytes();
|
||||
|
||||
for (int i=0; i<numMessages; i++)
|
||||
for (int i = 0; i < numMessages; i++)
|
||||
{
|
||||
session.getRemote().sendString(msgText);
|
||||
}
|
||||
session.close(StatusCode.NORMAL, null);
|
||||
}
|
||||
assertTrue(socket.closeLatch.await(5, TimeUnit.SECONDS));
|
||||
assertTrue(wsConnectionClosed.await(5, TimeUnit.SECONDS));
|
||||
|
|
11
pom.xml
11
pom.xml
|
@ -514,7 +514,7 @@
|
|||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-javadoc-plugin</artifactId>
|
||||
<version>3.0.1</version>
|
||||
<version>3.1.0</version>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.codehaus.plexus</groupId>
|
||||
|
@ -528,18 +528,13 @@
|
|||
<charset>UTF-8</charset>
|
||||
<docencoding>UTF-8</docencoding>
|
||||
<encoding>UTF-8</encoding>
|
||||
<additionalparam>-html5</additionalparam>
|
||||
<additionalOptions>-html5</additionalOptions>
|
||||
<docfilessubdirs>true</docfilessubdirs>
|
||||
<detectLinks>false</detectLinks>
|
||||
<detectJavaApiLink>false</detectJavaApiLink>
|
||||
<show>protected</show>
|
||||
<attach>true</attach>
|
||||
<excludePackageNames>com.acme,org.slf4j*,org.mortbay*,*.jmh*,org.eclipse.jetty.embedded*,org.eclipse.jetty.example.asyncrest*,org.eclipse.jetty.test*</excludePackageNames>
|
||||
<!-- broken build with jdk 11.0.2
|
||||
<links>
|
||||
<link>https://docs.oracle.com/javase/8/docs/api/</link>
|
||||
</links>
|
||||
-->
|
||||
<excludePackageNames>com.*:org.slf4j*:org.mortbay*:*.jmh*:org.eclipse.jetty.embedded*:org.eclipse.jetty.example.asyncrest*:org.eclipse.jetty.test*</excludePackageNames>
|
||||
</configuration>
|
||||
</plugin>
|
||||
<plugin>
|
||||
|
|
Loading…
Reference in New Issue