Merged branch 'jetty-9.4.x' into 'master'.

This commit is contained in:
Simone Bordet 2016-09-26 12:21:05 +02:00
commit 79889f05f5
17 changed files with 1036 additions and 506 deletions

View File

@ -64,6 +64,10 @@ public class ContinueProtocolHandler implements ProtocolHandler
return new ContinueListener();
}
protected void onContinue(Request request)
{
}
protected class ContinueListener extends BufferingResponseListener
{
@Override
@ -72,7 +76,8 @@ public class ContinueProtocolHandler implements ProtocolHandler
// Handling of success must be done here and not from onComplete(),
// since the onComplete() is not invoked because the request is not completed yet.
HttpConversation conversation = ((HttpRequest)response.getRequest()).getConversation();
Request request = response.getRequest();
HttpConversation conversation = ((HttpRequest)request).getConversation();
// Mark the 100 Continue response as handled
conversation.setAttribute(ATTRIBUTE, Boolean.TRUE);
@ -88,6 +93,7 @@ public class ContinueProtocolHandler implements ProtocolHandler
// All good, continue
exchange.resetResponse();
exchange.proceed(null);
onContinue(request);
break;
}
default:
@ -98,7 +104,7 @@ public class ContinueProtocolHandler implements ProtocolHandler
List<Response.ResponseListener> listeners = exchange.getResponseListeners();
HttpContentResponse contentResponse = new HttpContentResponse(response, getContent(), getMediaType(), getEncoding());
notifier.forwardSuccess(listeners, contentResponse);
exchange.proceed(new HttpRequestException("Expectation failed", exchange.getRequest()));
exchange.proceed(new HttpRequestException("Expectation failed", request));
break;
}
}

View File

@ -35,7 +35,6 @@ import org.eclipse.jetty.http.HttpField;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpHeaderValue;
import org.eclipse.jetty.http.HttpScheme;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
@ -68,21 +67,23 @@ public abstract class HttpConnection implements Connection
@Override
public void send(Request request, Response.CompleteListener listener)
{
ArrayList<Response.ResponseListener> listeners = new ArrayList<>(2);
if (request.getTimeout() > 0)
HttpRequest httpRequest = (HttpRequest)request;
ArrayList<Response.ResponseListener> listeners = new ArrayList<>(httpRequest.getResponseListeners());
if (httpRequest.getTimeout() > 0)
{
TimeoutCompleteListener timeoutListener = new TimeoutCompleteListener(request);
TimeoutCompleteListener timeoutListener = new TimeoutCompleteListener(httpRequest);
timeoutListener.schedule(getHttpClient().getScheduler());
listeners.add(timeoutListener);
}
if (listener != null)
listeners.add(listener);
HttpExchange exchange = new HttpExchange(getHttpDestination(), (HttpRequest)request, listeners);
HttpExchange exchange = new HttpExchange(getHttpDestination(), httpRequest, listeners);
SendFailure result = send(exchange);
if (result != null)
request.abort(result.failure);
httpRequest.abort(result.failure);
}
protected abstract SendFailure send(HttpExchange exchange);

View File

@ -693,6 +693,11 @@ public class HttpRequest implements Request
client.send(request, responseListeners);
}
protected List<Response.ResponseListener> getResponseListeners()
{
return responseListeners;
}
@Override
public boolean abort(Throwable cause)
{

View File

@ -18,6 +18,7 @@
package org.eclipse.jetty.client;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.client.api.Connection;
@ -27,7 +28,7 @@ import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.http.HttpConnectionOverHTTP;
import org.eclipse.jetty.client.http.HttpDestinationOverHTTP;
import org.eclipse.jetty.client.util.FutureResponseListener;
import org.eclipse.jetty.toolchain.test.annotation.Slow;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.util.FuturePromise;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.junit.Assert;
@ -65,7 +66,6 @@ public class HttpClientExplicitConnectionTest extends AbstractHttpClientServerTe
}
}
@Slow
@Test
public void testExplicitConnectionIsClosedOnRemoteClose() throws Exception
{
@ -98,4 +98,26 @@ public class HttpClientExplicitConnectionTest extends AbstractHttpClientServerTe
Assert.assertTrue(connectionPool.getActiveConnections().isEmpty());
Assert.assertTrue(connectionPool.getIdleConnections().isEmpty());
}
@Test
public void testExplicitConnectionResponseListeners() throws Exception
{
start(new EmptyServerHandler());
Destination destination = client.getDestination(scheme, "localhost", connector.getLocalPort());
FuturePromise<Connection> futureConnection = new FuturePromise<>();
destination.newConnection(futureConnection);
Connection connection = futureConnection.get(5, TimeUnit.SECONDS);
CountDownLatch responseLatch = new CountDownLatch(1);
Request request = client.newRequest(destination.getHost(), destination.getPort())
.scheme(scheme)
.onResponseSuccess(response -> responseLatch.countDown());
FutureResponseListener listener = new FutureResponseListener(request);
connection.send(request, listener);
ContentResponse response = listener.get(5, TimeUnit.SECONDS);
Assert.assertEquals(HttpStatus.OK_200, response.getStatus());
Assert.assertTrue(responseLatch.await(5, TimeUnit.SECONDS));
}
}

View File

@ -997,8 +997,8 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
protected void abort(Throwable failure)
{
terminate();
notifyFailure(this, failure);
terminate();
}
public boolean isDisconnected()

View File

@ -162,8 +162,14 @@ public class HttpReceiverOverHTTP2 extends HttpReceiver implements Stream.Listen
{
dataInfo = queue.poll();
}
if (dataInfo == null)
{
DataInfo prevDataInfo = this.dataInfo;
if (prevDataInfo != null && prevDataInfo.last)
return Action.SUCCEEDED;
return Action.IDLE;
}
this.dataInfo = dataInfo;
responseContent(dataInfo.exchange, dataInfo.buffer, this);
@ -176,11 +182,15 @@ public class HttpReceiverOverHTTP2 extends HttpReceiver implements Stream.Listen
ByteBufferPool byteBufferPool = getHttpDestination().getHttpClient().getByteBufferPool();
byteBufferPool.release(dataInfo.buffer);
dataInfo.callback.succeeded();
if (dataInfo.last)
responseSuccess(dataInfo.exchange);
super.succeeded();
}
@Override
protected void onCompleteSuccess()
{
responseSuccess(dataInfo.exchange);
}
@Override
protected void onCompleteFailure(Throwable failure)
{

View File

@ -133,6 +133,12 @@ public class HTTP2ServerConnectionFactory extends AbstractHTTP2ServerConnectionF
getConnection().onSessionFailure(new IOException("HTTP/2 " + error + reason));
}
@Override
public void onFailure(Session session, Throwable failure)
{
getConnection().onSessionFailure(failure);
}
@Override
public void onHeaders(Stream stream, HeadersFrame frame)
{

View File

@ -44,7 +44,6 @@ import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Invocable.InvocationType;
public class HttpChannelOverHTTP2 extends HttpChannel
{
@ -297,8 +296,10 @@ public class HttpChannelOverHTTP2 extends HttpChannel
public void onFailure(Throwable failure)
{
onEarlyEOF();
getState().asyncError(failure);
if (onEarlyEOF())
handle();
else
getState().asyncError(failure);
}
protected void consumeInput()

View File

@ -40,7 +40,9 @@ import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.ContinueProtocolHandler;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.ProtocolHandlers;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.http.HttpField;
@ -80,6 +82,7 @@ import org.eclipse.jetty.util.thread.QueuedThreadPool;
*/
public abstract class AbstractProxyServlet extends HttpServlet
{
protected static final String CLIENT_REQUEST_ATTRIBUTE = "org.eclipse.jetty.proxy.clientRequest";
protected static final Set<String> HOP_HEADERS;
static
{
@ -324,8 +327,10 @@ public abstract class AbstractProxyServlet extends HttpServlet
// Content must not be decoded, otherwise the client gets confused.
client.getContentDecoderFactories().clear();
// No protocol handlers, pass everything to the client.
client.getProtocolHandlers().clear();
// Pass traffic to the client, only intercept what's necessary.
ProtocolHandlers protocolHandlers = client.getProtocolHandlers();
protocolHandlers.clear();
protocolHandlers.put(new ProxyContinueProtocolHandler());
return client;
}
@ -427,6 +432,11 @@ public abstract class AbstractProxyServlet extends HttpServlet
clientRequest.getHeader(HttpHeader.TRANSFER_ENCODING.asString()) != null;
}
protected boolean expects100Continue(HttpServletRequest request)
{
return HttpHeaderValue.CONTINUE.asString().equals(request.getHeader(HttpHeader.EXPECT.asString()));
}
protected void copyRequestHeaders(HttpServletRequest clientRequest, Request proxyRequest)
{
// First clear possibly existing headers, as we are going to copy those from the client request.
@ -638,6 +648,9 @@ public abstract class AbstractProxyServlet extends HttpServlet
int status = failure instanceof TimeoutException ?
HttpStatus.GATEWAY_TIMEOUT_504 :
HttpStatus.BAD_GATEWAY_502;
int serverStatus = serverResponse == null ? status : serverResponse.getStatus();
if (expects100Continue(clientRequest) && serverStatus >= HttpStatus.OK_200)
status = serverStatus;
sendProxyResponseError(clientRequest, proxyResponse, status);
}
}
@ -655,6 +668,12 @@ public abstract class AbstractProxyServlet extends HttpServlet
clientRequest.getAsyncContext().complete();
}
protected void onContinue(HttpServletRequest clientRequest, Request proxyRequest)
{
if (_log.isDebugEnabled())
_log.debug("{} handling 100 Continue", getRequestId(clientRequest));
}
/**
* <p>Utility class that implement transparent proxy functionalities.</p>
* <p>Configuration parameters:</p>
@ -733,4 +752,14 @@ public abstract class AbstractProxyServlet extends HttpServlet
return rewrittenURI.toString();
}
}
class ProxyContinueProtocolHandler extends ContinueProtocolHandler
{
@Override
protected void onContinue(Request request)
{
HttpServletRequest clientRequest = (HttpServletRequest)request.getAttributes().get(CLIENT_REQUEST_ATTRIBUTE);
AbstractProxyServlet.this.onContinue(clientRequest, request);
}
}
}

View File

@ -41,7 +41,6 @@ import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.ContentDecoder;
import org.eclipse.jetty.client.GZIPContentDecoder;
import org.eclipse.jetty.client.api.ContentProvider;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
@ -66,9 +65,10 @@ import org.eclipse.jetty.util.component.Destroyable;
*/
public class AsyncMiddleManServlet extends AbstractProxyServlet
{
private static final String PROXY_REQUEST_COMMITTED = AsyncMiddleManServlet.class.getName() + ".proxyRequestCommitted";
private static final String CLIENT_TRANSFORMER = AsyncMiddleManServlet.class.getName() + ".clientTransformer";
private static final String SERVER_TRANSFORMER = AsyncMiddleManServlet.class.getName() + ".serverTransformer";
private static final String PROXY_REQUEST_CONTENT_COMMITTED_ATTRIBUTE = AsyncMiddleManServlet.class.getName() + ".proxyRequestContentCommitted";
private static final String CLIENT_TRANSFORMER_ATTRIBUTE = AsyncMiddleManServlet.class.getName() + ".clientTransformer";
private static final String SERVER_TRANSFORMER_ATTRIBUTE = AsyncMiddleManServlet.class.getName() + ".serverTransformer";
private static final String CONTINUE_ACTION_ATTRIBUTE = AsyncMiddleManServlet.class.getName() + ".continueAction";
@Override
protected void service(HttpServletRequest clientRequest, HttpServletResponse proxyResponse) throws ServletException, IOException
@ -91,8 +91,6 @@ public class AsyncMiddleManServlet extends AbstractProxyServlet
.method(clientRequest.getMethod())
.version(HttpVersion.fromString(clientRequest.getProtocol()));
boolean hasContent = hasContent(clientRequest);
copyRequestHeaders(clientRequest, proxyRequest);
addProxyHeaders(clientRequest, proxyRequest);
@ -105,16 +103,43 @@ public class AsyncMiddleManServlet extends AbstractProxyServlet
// If there is content, the send of the proxy request
// is delayed and performed when the content arrives,
// to allow optimization of the Content-Length header.
if (hasContent)
proxyRequest.content(newProxyContentProvider(clientRequest, proxyResponse, proxyRequest));
if (hasContent(clientRequest))
{
DeferredContentProvider provider = newProxyContentProvider(clientRequest, proxyResponse, proxyRequest);
proxyRequest.content(provider);
if (expects100Continue(clientRequest))
{
proxyRequest.attribute(CLIENT_REQUEST_ATTRIBUTE, clientRequest);
proxyRequest.attribute(CONTINUE_ACTION_ATTRIBUTE, (Runnable)() ->
{
try
{
ServletInputStream input = clientRequest.getInputStream();
input.setReadListener(newProxyReadListener(clientRequest, proxyResponse, proxyRequest, provider));
}
catch (Throwable failure)
{
onClientRequestFailure(clientRequest, proxyRequest, proxyResponse, failure);
}
});
sendProxyRequest(clientRequest, proxyResponse, proxyRequest);
}
else
{
ServletInputStream input = clientRequest.getInputStream();
input.setReadListener(newProxyReadListener(clientRequest, proxyResponse, proxyRequest, provider));
}
}
else
{
sendProxyRequest(clientRequest, proxyResponse, proxyRequest);
}
}
protected ContentProvider newProxyContentProvider(final HttpServletRequest clientRequest, HttpServletResponse proxyResponse, Request proxyRequest) throws IOException
protected DeferredContentProvider newProxyContentProvider(final HttpServletRequest clientRequest, HttpServletResponse proxyResponse, Request proxyRequest) throws IOException
{
ServletInputStream input = clientRequest.getInputStream();
DeferredContentProvider provider = new DeferredContentProvider()
return new DeferredContentProvider()
{
@Override
public boolean offer(ByteBuffer buffer, Callback callback)
@ -124,8 +149,6 @@ public class AsyncMiddleManServlet extends AbstractProxyServlet
return super.offer(buffer, callback);
}
};
input.setReadListener(newProxyReadListener(clientRequest, proxyResponse, proxyRequest, provider));
return provider;
}
protected ReadListener newProxyReadListener(HttpServletRequest clientRequest, HttpServletResponse proxyResponse, Request proxyRequest, DeferredContentProvider provider)
@ -154,6 +177,14 @@ public class AsyncMiddleManServlet extends AbstractProxyServlet
return ContentTransformer.IDENTITY;
}
@Override
protected void onContinue(HttpServletRequest clientRequest, Request proxyRequest)
{
super.onContinue(clientRequest, proxyRequest);
Runnable action = (Runnable)proxyRequest.getAttributes().get(CONTINUE_ACTION_ATTRIBUTE);
action.run();
}
private void transform(ContentTransformer transformer, ByteBuffer input, boolean finished, List<ByteBuffer> output) throws IOException
{
try
@ -197,10 +228,10 @@ public class AsyncMiddleManServlet extends AbstractProxyServlet
private void cleanup(HttpServletRequest clientRequest)
{
ContentTransformer clientTransformer = (ContentTransformer)clientRequest.getAttribute(CLIENT_TRANSFORMER);
ContentTransformer clientTransformer = (ContentTransformer)clientRequest.getAttribute(CLIENT_TRANSFORMER_ATTRIBUTE);
if (clientTransformer instanceof Destroyable)
((Destroyable)clientTransformer).destroy();
ContentTransformer serverTransformer = (ContentTransformer)clientRequest.getAttribute(SERVER_TRANSFORMER);
ContentTransformer serverTransformer = (ContentTransformer)clientRequest.getAttribute(SERVER_TRANSFORMER_ATTRIBUTE);
if (serverTransformer instanceof Destroyable)
((Destroyable)serverTransformer).destroy();
}
@ -237,6 +268,7 @@ public class AsyncMiddleManServlet extends AbstractProxyServlet
private final Request proxyRequest;
private final DeferredContentProvider provider;
private final int contentLength;
private final boolean expects100Continue;
private int length;
protected ProxyReader(HttpServletRequest clientRequest, HttpServletResponse proxyResponse, Request proxyRequest, DeferredContentProvider provider)
@ -246,6 +278,7 @@ public class AsyncMiddleManServlet extends AbstractProxyServlet
this.proxyRequest = proxyRequest;
this.provider = provider;
this.contentLength = clientRequest.getContentLength();
this.expects100Continue = expects100Continue(clientRequest);
}
@Override
@ -321,15 +354,13 @@ public class AsyncMiddleManServlet extends AbstractProxyServlet
private void process(ByteBuffer content, Callback callback, boolean finished) throws IOException
{
ContentTransformer transformer = (ContentTransformer)clientRequest.getAttribute(CLIENT_TRANSFORMER);
ContentTransformer transformer = (ContentTransformer)clientRequest.getAttribute(CLIENT_TRANSFORMER_ATTRIBUTE);
if (transformer == null)
{
transformer = newClientRequestContentTransformer(clientRequest, proxyRequest);
clientRequest.setAttribute(CLIENT_TRANSFORMER, transformer);
clientRequest.setAttribute(CLIENT_TRANSFORMER_ATTRIBUTE, transformer);
}
boolean committed = clientRequest.getAttribute(PROXY_REQUEST_COMMITTED) != null;
int contentBytes = content.remaining();
// Skip transformation for empty non-last buffers.
@ -361,11 +392,15 @@ public class AsyncMiddleManServlet extends AbstractProxyServlet
if (_log.isDebugEnabled())
_log.debug("{} upstream content transformation {} -> {} bytes", getRequestId(clientRequest), contentBytes, newContentBytes);
if (!committed && (size > 0 || finished))
boolean contentCommitted = clientRequest.getAttribute(PROXY_REQUEST_CONTENT_COMMITTED_ATTRIBUTE) != null;
if (!contentCommitted && (size > 0 || finished))
{
proxyRequest.header(HttpHeader.CONTENT_LENGTH, null);
clientRequest.setAttribute(PROXY_REQUEST_COMMITTED, true);
sendProxyRequest(clientRequest, proxyResponse, proxyRequest);
clientRequest.setAttribute(PROXY_REQUEST_CONTENT_COMMITTED_ATTRIBUTE, true);
if (!expects100Continue)
{
proxyRequest.header(HttpHeader.CONTENT_LENGTH, null);
sendProxyRequest(clientRequest, proxyResponse, proxyRequest);
}
}
if (size == 0)
@ -401,6 +436,7 @@ public class AsyncMiddleManServlet extends AbstractProxyServlet
@Override
public void onBegin(Response serverResponse)
{
response = serverResponse;
proxyResponse.setStatus(serverResponse.getStatus());
}
@ -430,11 +466,11 @@ public class AsyncMiddleManServlet extends AbstractProxyServlet
clientRequest.setAttribute(WRITE_LISTENER_ATTRIBUTE, proxyWriter);
}
ContentTransformer transformer = (ContentTransformer)clientRequest.getAttribute(SERVER_TRANSFORMER);
ContentTransformer transformer = (ContentTransformer)clientRequest.getAttribute(SERVER_TRANSFORMER_ATTRIBUTE);
if (transformer == null)
{
transformer = newServerResponseContentTransformer(clientRequest, proxyResponse, serverResponse);
clientRequest.setAttribute(SERVER_TRANSFORMER, transformer);
clientRequest.setAttribute(SERVER_TRANSFORMER_ATTRIBUTE, transformer);
}
length += contentBytes;
@ -502,7 +538,7 @@ public class AsyncMiddleManServlet extends AbstractProxyServlet
if (contentLength < 0)
{
ProxyWriter proxyWriter = (ProxyWriter)clientRequest.getAttribute(WRITE_LISTENER_ATTRIBUTE);
ContentTransformer transformer = (ContentTransformer)clientRequest.getAttribute(SERVER_TRANSFORMER);
ContentTransformer transformer = (ContentTransformer)clientRequest.getAttribute(SERVER_TRANSFORMER_ATTRIBUTE);
transform(transformer, BufferUtil.EMPTY_BUFFER, true, buffers);
@ -544,7 +580,6 @@ public class AsyncMiddleManServlet extends AbstractProxyServlet
@Override
public void onComplete(Result result)
{
response = result.getResponse();
if (result.isSucceeded())
complete.succeeded();
else

View File

@ -18,9 +18,12 @@
package org.eclipse.jetty.proxy;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import javax.servlet.AsyncContext;
@ -29,13 +32,16 @@ import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.AsyncContentProvider;
import org.eclipse.jetty.client.api.ContentProvider;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.client.util.DeferredContentProvider;
import org.eclipse.jetty.client.util.InputStreamContentProvider;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IteratingCallback;
/**
* <p>Servlet 3.0 asynchronous proxy servlet.</p>
@ -47,6 +53,8 @@ import org.eclipse.jetty.util.Callback;
*/
public class ProxyServlet extends AbstractProxyServlet
{
private static final String CONTINUE_ACTION_ATTRIBUTE = ProxyServlet.class.getName() + ".continueAction";
@Override
protected void service(final HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException
{
@ -83,7 +91,30 @@ public class ProxyServlet extends AbstractProxyServlet
proxyRequest.timeout(getTimeout(), TimeUnit.MILLISECONDS);
if (hasContent(request))
proxyRequest.content(proxyRequestContent(request, response, proxyRequest));
{
if (expects100Continue(request))
{
DeferredContentProvider deferred = new DeferredContentProvider();
proxyRequest.content(deferred);
proxyRequest.attribute(CLIENT_REQUEST_ATTRIBUTE, request);
proxyRequest.attribute(CONTINUE_ACTION_ATTRIBUTE, (Runnable)() ->
{
try
{
ContentProvider provider = proxyRequestContent(request, response, proxyRequest);
new DelegatingContentProvider(request, proxyRequest, response, provider, deferred).iterate();
}
catch (Throwable failure)
{
onClientRequestFailure(request, proxyRequest, response, failure);
}
});
}
else
{
proxyRequest.content(proxyRequestContent(request, response, proxyRequest));
}
}
sendProxyRequest(request, response, proxyRequest);
}
@ -114,6 +145,15 @@ public class ProxyServlet extends AbstractProxyServlet
}
}
@Override
protected void onContinue(HttpServletRequest clientRequest, Request proxyRequest)
{
super.onContinue(clientRequest, proxyRequest);
Runnable action = (Runnable)proxyRequest.getAttributes().get(CONTINUE_ACTION_ATTRIBUTE);
Executor executor = getHttpClient().getExecutor();
executor.execute(action);
}
/**
* <p>Convenience extension of {@link ProxyServlet} that offers transparent proxy functionalities.</p>
*
@ -240,4 +280,81 @@ public class ProxyServlet extends AbstractProxyServlet
onClientRequestFailure(request, proxyRequest, response, failure);
}
}
private class DelegatingContentProvider extends IteratingCallback implements AsyncContentProvider.Listener
{
private final HttpServletRequest clientRequest;
private final Request proxyRequest;
private final HttpServletResponse proxyResponse;
private final Iterator<ByteBuffer> iterator;
private final DeferredContentProvider deferred;
private DelegatingContentProvider(HttpServletRequest clientRequest, Request proxyRequest, HttpServletResponse proxyResponse, ContentProvider provider, DeferredContentProvider deferred)
{
this.clientRequest = clientRequest;
this.proxyRequest = proxyRequest;
this.proxyResponse = proxyResponse;
this.iterator = provider.iterator();
this.deferred = deferred;
if (provider instanceof AsyncContentProvider)
((AsyncContentProvider)provider).setListener(this);
}
@Override
protected Action process() throws Exception
{
if (!iterator.hasNext())
return Action.SUCCEEDED;
ByteBuffer buffer = iterator.next();
if (buffer == null)
return Action.IDLE;
deferred.offer(buffer, this);
return Action.SCHEDULED;
}
@Override
public void succeeded()
{
if (iterator instanceof Callback)
((Callback)iterator).succeeded();
super.succeeded();
}
@Override
protected void onCompleteSuccess()
{
try
{
if (iterator instanceof Closeable)
((Closeable)iterator).close();
deferred.close();
}
catch (Throwable x)
{
_log.ignore(x);
}
}
@Override
protected void onCompleteFailure(Throwable failure)
{
if (iterator instanceof Callback)
((Callback)iterator).failed(failure);
onClientRequestFailure(clientRequest, proxyRequest, proxyResponse, failure);
}
@Override
public InvocationType getInvocationType()
{
return InvocationType.NON_BLOCKING;
}
@Override
public void onContent()
{
iterate();
}
}
}

View File

@ -53,6 +53,7 @@ import javax.servlet.Filter;
import javax.servlet.FilterChain;
import javax.servlet.FilterConfig;
import javax.servlet.ServletException;
import javax.servlet.ServletInputStream;
import javax.servlet.ServletOutputStream;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
@ -72,9 +73,12 @@ import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.client.http.HttpDestinationOverHTTP;
import org.eclipse.jetty.client.util.BufferingResponseListener;
import org.eclipse.jetty.client.util.BytesContentProvider;
import org.eclipse.jetty.client.util.DeferredContentProvider;
import org.eclipse.jetty.client.util.InputStreamResponseListener;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpHeaderValue;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpConnectionFactory;
import org.eclipse.jetty.server.Server;
@ -187,9 +191,12 @@ public class ProxyServletTest
@After
public void dispose() throws Exception
{
client.stop();
proxy.stop();
server.stop();
if (client != null)
client.stop();
if (proxy != null)
proxy.stop();
if (server != null)
server.stop();
}
@Test
@ -1233,5 +1240,227 @@ public class ProxyServletTest
Assert.assertEquals(200, response.getStatus());
}
// TODO: test proxy authentication
@Test
public void testExpect100ContinueRespond100Continue() throws Exception
{
CountDownLatch serverLatch1 = new CountDownLatch(1);
CountDownLatch serverLatch2 = new CountDownLatch(1);
startServer(new HttpServlet()
{
@Override
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
{
serverLatch1.countDown();
try
{
serverLatch2.await(5, TimeUnit.SECONDS);
}
catch (Throwable x)
{
throw new InterruptedIOException();
}
// Send the 100 Continue.
ServletInputStream input = request.getInputStream();
// Echo the content.
IO.copy(input, response.getOutputStream());
}
});
startProxy();
startClient();
byte[] content = new byte[1024];
CountDownLatch contentLatch = new CountDownLatch(1);
CountDownLatch clientLatch = new CountDownLatch(1);
client.newRequest("localhost", serverConnector.getLocalPort())
.header(HttpHeader.EXPECT, HttpHeaderValue.CONTINUE.asString())
.content(new BytesContentProvider(content))
.onRequestContent((request, buffer) -> contentLatch.countDown())
.send(new BufferingResponseListener()
{
@Override
public void onComplete(Result result)
{
if (result.isSucceeded())
{
if (result.getResponse().getStatus() == HttpStatus.OK_200)
{
if (Arrays.equals(content, getContent()))
clientLatch.countDown();
}
}
}
});
// Wait until we arrive on the server.
Assert.assertTrue(serverLatch1.await(5, TimeUnit.SECONDS));
// The client should not send the content yet.
Assert.assertFalse(contentLatch.await(1, TimeUnit.SECONDS));
// Make the server send the 100 Continue.
serverLatch2.countDown();
// The client has sent the content.
Assert.assertTrue(contentLatch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(clientLatch.await(5, TimeUnit.SECONDS));
}
@Test
public void testExpect100ContinueRespond100ContinueDelayedRequestContent() throws Exception
{
startServer(new HttpServlet()
{
@Override
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
{
// Send the 100 Continue.
ServletInputStream input = request.getInputStream();
// Echo the content.
IO.copy(input, response.getOutputStream());
}
});
startProxy();
startClient();
byte[] content = new byte[1024];
new Random().nextBytes(content);
int chunk1 = content.length / 2;
DeferredContentProvider contentProvider = new DeferredContentProvider();
contentProvider.offer(ByteBuffer.wrap(content, 0, chunk1));
CountDownLatch clientLatch = new CountDownLatch(1);
client.newRequest("localhost", serverConnector.getLocalPort())
.header(HttpHeader.EXPECT, HttpHeaderValue.CONTINUE.asString())
.content(contentProvider)
.send(new BufferingResponseListener()
{
@Override
public void onComplete(Result result)
{
if (result.isSucceeded())
{
if (result.getResponse().getStatus() == HttpStatus.OK_200)
{
if (Arrays.equals(content, getContent()))
clientLatch.countDown();
}
}
}
});
// Wait a while and then offer more content.
Thread.sleep(1000);
contentProvider.offer(ByteBuffer.wrap(content, chunk1, content.length - chunk1));
contentProvider.close();
Assert.assertTrue(clientLatch.await(5, TimeUnit.SECONDS));
}
@Test
public void testExpect100ContinueRespond100ContinueSomeRequestContentThenFailure() throws Exception
{
CountDownLatch serverLatch = new CountDownLatch(1);
startServer(new HttpServlet()
{
@Override
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
{
// Send the 100 Continue.
ServletInputStream input = request.getInputStream();
try
{
// Echo the content.
IO.copy(input, response.getOutputStream());
}
catch (IOException x)
{
serverLatch.countDown();
}
}
});
startProxy();
startClient();
long idleTimeout = 1000;
client.setIdleTimeout(idleTimeout);
byte[] content = new byte[1024];
new Random().nextBytes(content);
int chunk1 = content.length / 2;
DeferredContentProvider contentProvider = new DeferredContentProvider();
contentProvider.offer(ByteBuffer.wrap(content, 0, chunk1));
CountDownLatch clientLatch = new CountDownLatch(1);
client.newRequest("localhost", serverConnector.getLocalPort())
.header(HttpHeader.EXPECT, HttpHeaderValue.CONTINUE.asString())
.content(contentProvider)
.send(result ->
{
if (result.isFailed())
clientLatch.countDown();
});
// Wait more than the idle timeout to break the connection.
Thread.sleep(2 * idleTimeout);
Assert.assertTrue(serverLatch.await(555, TimeUnit.SECONDS));
Assert.assertTrue(clientLatch.await(555, TimeUnit.SECONDS));
}
@Test
public void testExpect100ContinueRespond417ExpectationFailed() throws Exception
{
CountDownLatch serverLatch1 = new CountDownLatch(1);
CountDownLatch serverLatch2 = new CountDownLatch(1);
startServer(new HttpServlet()
{
@Override
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
{
serverLatch1.countDown();
try
{
serverLatch2.await(5, TimeUnit.SECONDS);
}
catch (Throwable x)
{
throw new InterruptedIOException();
}
// Send the 417 Expectation Failed.
response.setStatus(HttpStatus.EXPECTATION_FAILED_417);
}
});
startProxy();
startClient();
byte[] content = new byte[1024];
CountDownLatch contentLatch = new CountDownLatch(1);
CountDownLatch clientLatch = new CountDownLatch(1);
client.newRequest("localhost", serverConnector.getLocalPort())
.header(HttpHeader.EXPECT, HttpHeaderValue.CONTINUE.asString())
.content(new BytesContentProvider(content))
.onRequestContent((request, buffer) -> contentLatch.countDown())
.send(result ->
{
if (result.isFailed())
{
if (result.getResponse().getStatus() == HttpStatus.EXPECTATION_FAILED_417)
clientLatch.countDown();
}
});
// Wait until we arrive on the server.
Assert.assertTrue(serverLatch1.await(5, TimeUnit.SECONDS));
// The client should not send the content yet.
Assert.assertFalse(contentLatch.await(1, TimeUnit.SECONDS));
// Make the server send the 417 Expectation Failed.
serverLatch2.countDown();
// The client should not send the content.
Assert.assertFalse(contentLatch.await(1, TimeUnit.SECONDS));
Assert.assertTrue(clientLatch.await(5, TimeUnit.SECONDS));
}
}

View File

@ -219,8 +219,8 @@ public class HttpChannelOverHttp extends HttpChannel implements HttpParser.Reque
// If we have no request yet, just close
if (_metadata.getMethod() == null)
_httpConnection.close();
else
onEarlyEOF();
else if (onEarlyEOF())
handle();
}
@Override

View File

@ -75,7 +75,7 @@
<dependencies>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
<artifactId>jetty-servlet</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>

View File

@ -21,6 +21,8 @@ package org.eclipse.jetty.http.client;
import java.util.ArrayList;
import java.util.List;
import javax.servlet.http.HttpServlet;
import org.eclipse.jetty.alpn.server.ALPNServerConnectionFactory;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.HttpClientTransport;
@ -40,6 +42,8 @@ import org.eclipse.jetty.server.SecureRequestCustomizer;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.SslConnectionFactory;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.toolchain.test.TestTracker;
import org.eclipse.jetty.util.SocketAddressResolver;
import org.eclipse.jetty.util.ssl.SslContextFactory;
@ -67,6 +71,8 @@ public abstract class AbstractTest
protected SslContextFactory sslContextFactory;
protected Server server;
protected ServerConnector connector;
protected ServletContextHandler context;
protected String servletPath = "/servlet";
protected HttpClient client;
public AbstractTest(Transport transport)
@ -81,6 +87,22 @@ public abstract class AbstractTest
startClient();
}
public void start(HttpServlet servlet) throws Exception
{
startServer(servlet);
startClient();
}
protected void startServer(HttpServlet servlet) throws Exception
{
context = new ServletContextHandler();
context.setContextPath("/");
ServletHolder holder = new ServletHolder(servlet);
holder.setAsyncSupported(true);
context.addServlet(holder, servletPath);
startServer(context);
}
protected void startServer(Handler handler) throws Exception
{
sslContextFactory = new SslContextFactory();
@ -228,9 +250,19 @@ public abstract class AbstractTest
@After
public void stop() throws Exception
{
stopClient();
stopServer();
}
protected void stopClient() throws Exception
{
if (client != null)
client.stop();
}
protected void stopServer() throws Exception
{
if (server != null)
server.stop();
}

View File

@ -488,6 +488,28 @@ public class HttpClientTest extends AbstractTest
Assert.assertTrue(completeLatch.await(5, TimeUnit.SECONDS));
}
@Test
public void testResponseWithContentCompleteListenerInvokedOnce() throws Exception
{
start(new EmptyServerHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
super.handle(target, baseRequest, request, response);
response.getWriter().write("Jetty");
}
});
AtomicInteger completes = new AtomicInteger();
client.newRequest(newURI())
.send(result -> completes.incrementAndGet());
sleep(1000);
Assert.assertEquals(1, completes.get());
}
private void sleep(long time) throws IOException
{
try