Merge branch 'jetty-9-client-100-continue-bis' into jetty-9
This commit is contained in:
commit
109381abdb
|
@ -18,7 +18,6 @@
|
|||
|
||||
package org.eclipse.jetty.client;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
@ -40,9 +39,9 @@ public class AuthenticationProtocolHandler implements ProtocolHandler
|
|||
public static final Logger LOG = Log.getLogger(AuthenticationProtocolHandler.class);
|
||||
private static final Pattern WWW_AUTHENTICATE_PATTERN = Pattern.compile("([^\\s]+)\\s+realm=\"([^\"]+)\".*", Pattern.CASE_INSENSITIVE);
|
||||
|
||||
private final ResponseNotifier notifier = new ResponseNotifier();
|
||||
private final HttpClient client;
|
||||
private final int maxContentLength;
|
||||
private final ResponseNotifier notifier;
|
||||
|
||||
public AuthenticationProtocolHandler(HttpClient client)
|
||||
{
|
||||
|
@ -53,6 +52,7 @@ public class AuthenticationProtocolHandler implements ProtocolHandler
|
|||
{
|
||||
this.client = client;
|
||||
this.maxContentLength = maxContentLength;
|
||||
this.notifier = new ResponseNotifier(client);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -64,6 +64,7 @@ public class AuthenticationProtocolHandler implements ProtocolHandler
|
|||
@Override
|
||||
public Response.Listener getResponseListener()
|
||||
{
|
||||
// Return new instances every time to keep track of the response content
|
||||
return new AuthenticationListener();
|
||||
}
|
||||
|
||||
|
@ -78,12 +79,14 @@ public class AuthenticationProtocolHandler implements ProtocolHandler
|
|||
public void onComplete(Result result)
|
||||
{
|
||||
Request request = result.getRequest();
|
||||
HttpConversation conversation = client.getConversation(request.conversation());
|
||||
Response.Listener listener = conversation.exchanges().peekFirst().listener();
|
||||
ContentResponse response = new HttpContentResponse(result.getResponse(), getContent(), getEncoding());
|
||||
if (result.isFailed())
|
||||
{
|
||||
Throwable failure = result.getFailure();
|
||||
LOG.debug("Authentication challenge failed {}", failure);
|
||||
forwardFailure(request, response, failure);
|
||||
notifier.forwardFailureComplete(listener, request, result.getRequestFailure(), response, result.getResponseFailure());
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -91,7 +94,7 @@ public class AuthenticationProtocolHandler implements ProtocolHandler
|
|||
if (wwwAuthenticates.isEmpty())
|
||||
{
|
||||
LOG.debug("Authentication challenge without WWW-Authenticate header");
|
||||
forwardFailure(request, response, new HttpResponseException("HTTP protocol violation: 401 without WWW-Authenticate header", response));
|
||||
notifier.forwardFailureComplete(listener, request, null, response, new HttpResponseException("HTTP protocol violation: 401 without WWW-Authenticate header", response));
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -110,16 +113,15 @@ public class AuthenticationProtocolHandler implements ProtocolHandler
|
|||
if (authentication == null)
|
||||
{
|
||||
LOG.debug("No authentication available for {}", request);
|
||||
forwardSuccess(request, response);
|
||||
notifier.forwardSuccessComplete(listener, request, response);
|
||||
return;
|
||||
}
|
||||
|
||||
HttpConversation conversation = client.getConversation(request);
|
||||
final Authentication.Result authnResult = authentication.authenticate(request, response, wwwAuthenticate.value, conversation);
|
||||
LOG.debug("Authentication result {}", authnResult);
|
||||
if (authnResult == null)
|
||||
{
|
||||
forwardSuccess(request, response);
|
||||
notifier.forwardSuccessComplete(listener, request, response);
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -134,32 +136,6 @@ public class AuthenticationProtocolHandler implements ProtocolHandler
|
|||
});
|
||||
}
|
||||
|
||||
private void forwardFailure(Request request, Response response, Throwable failure)
|
||||
{
|
||||
HttpConversation conversation = client.getConversation(request);
|
||||
Response.Listener listener = conversation.exchanges().peekFirst().listener();
|
||||
notifier.notifyBegin(listener, response);
|
||||
notifier.notifyHeaders(listener, response);
|
||||
if (response instanceof ContentResponse)
|
||||
notifier.notifyContent(listener, response, ByteBuffer.wrap(((ContentResponse)response).content()));
|
||||
notifier.notifyFailure(listener, response, failure);
|
||||
conversation.complete();
|
||||
notifier.notifyComplete(listener, new Result(request, response, failure));
|
||||
}
|
||||
|
||||
private void forwardSuccess(Request request, Response response)
|
||||
{
|
||||
HttpConversation conversation = client.getConversation(request);
|
||||
Response.Listener listener = conversation.exchanges().peekFirst().listener();
|
||||
notifier.notifyBegin(listener, response);
|
||||
notifier.notifyHeaders(listener, response);
|
||||
if (response instanceof ContentResponse)
|
||||
notifier.notifyContent(listener, response, ByteBuffer.wrap(((ContentResponse)response).content()));
|
||||
notifier.notifySuccess(listener, response);
|
||||
conversation.complete();
|
||||
notifier.notifyComplete(listener, new Result(request, response));
|
||||
}
|
||||
|
||||
private List<WWWAuthenticate> parseWWWAuthenticate(Response response)
|
||||
{
|
||||
// TODO: these should be ordered by strength
|
||||
|
|
|
@ -0,0 +1,106 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd.
|
||||
// ------------------------------------------------------------------------
|
||||
// All rights reserved. This program and the accompanying materials
|
||||
// are made available under the terms of the Eclipse Public License v1.0
|
||||
// and Apache License v2.0 which accompanies this distribution.
|
||||
//
|
||||
// The Eclipse Public License is available at
|
||||
// http://www.eclipse.org/legal/epl-v10.html
|
||||
//
|
||||
// The Apache License v2.0 is available at
|
||||
// http://www.opensource.org/licenses/apache2.0.php
|
||||
//
|
||||
// You may elect to redistribute this code under either of these licenses.
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.client;
|
||||
|
||||
import org.eclipse.jetty.client.api.Request;
|
||||
import org.eclipse.jetty.client.api.Response;
|
||||
import org.eclipse.jetty.client.util.BufferingResponseListener;
|
||||
import org.eclipse.jetty.http.HttpHeader;
|
||||
import org.eclipse.jetty.http.HttpHeaderValue;
|
||||
|
||||
public class ContinueProtocolHandler implements ProtocolHandler
|
||||
{
|
||||
private static final String ATTRIBUTE = ContinueProtocolHandler.class.getName() + ".100continue";
|
||||
|
||||
private final HttpClient client;
|
||||
private final ResponseNotifier notifier;
|
||||
|
||||
public ContinueProtocolHandler(HttpClient client)
|
||||
{
|
||||
this.client = client;
|
||||
this.notifier = new ResponseNotifier(client);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean accept(Request request, Response response)
|
||||
{
|
||||
boolean expect100 = request.headers().contains(HttpHeader.EXPECT, HttpHeaderValue.CONTINUE.asString());
|
||||
boolean handled100 = client.getConversation(request.conversation()).getAttribute(ATTRIBUTE) != null;
|
||||
return expect100 && !handled100;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Response.Listener getResponseListener()
|
||||
{
|
||||
// Return new instances every time to keep track of the response content
|
||||
return new ContinueListener();
|
||||
}
|
||||
|
||||
private class ContinueListener extends BufferingResponseListener
|
||||
{
|
||||
@Override
|
||||
public void onSuccess(Response response)
|
||||
{
|
||||
// Handling of success must be done here and not from onComplete(),
|
||||
// since the onComplete() is not invoked because the request is not completed yet.
|
||||
|
||||
HttpConversation conversation = client.getConversation(response.conversation());
|
||||
// Mark the 100 Continue response as handled
|
||||
conversation.setAttribute(ATTRIBUTE, Boolean.TRUE);
|
||||
|
||||
HttpExchange exchange = conversation.exchanges().peekLast();
|
||||
assert exchange.response() == response;
|
||||
Response.Listener listener = exchange.listener();
|
||||
switch (response.status())
|
||||
{
|
||||
case 100:
|
||||
{
|
||||
// All good, continue
|
||||
exchange.resetResponse(true);
|
||||
conversation.listener(listener);
|
||||
exchange.proceed(true);
|
||||
break;
|
||||
}
|
||||
default:
|
||||
{
|
||||
// Server either does not support 100 Continue, or it does and wants to refuse the request content
|
||||
HttpContentResponse contentResponse = new HttpContentResponse(response, getContent(), getEncoding());
|
||||
notifier.forwardSuccess(listener, contentResponse);
|
||||
conversation.listener(listener);
|
||||
exchange.proceed(false);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Response response, Throwable failure)
|
||||
{
|
||||
HttpConversation conversation = client.getConversation(response.conversation());
|
||||
// Mark the 100 Continue response as handled
|
||||
conversation.setAttribute(ATTRIBUTE, Boolean.TRUE);
|
||||
|
||||
HttpExchange exchange = conversation.exchanges().peekLast();
|
||||
assert exchange.response() == response;
|
||||
Response.Listener listener = exchange.listener();
|
||||
HttpContentResponse contentResponse = new HttpContentResponse(response, getContent(), getEncoding());
|
||||
notifier.forwardFailureComplete(listener, exchange.request(), exchange.requestFailure(), contentResponse, failure);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -164,6 +164,7 @@ public class HttpClient extends ContainerLifeCycle
|
|||
selectorManager = newSelectorManager();
|
||||
addBean(selectorManager);
|
||||
|
||||
handlers.add(new ContinueProtocolHandler(this));
|
||||
handlers.add(new RedirectProtocolHandler(this));
|
||||
handlers.add(new AuthenticationProtocolHandler(this));
|
||||
|
||||
|
@ -353,9 +354,8 @@ public class HttpClient extends ContainerLifeCycle
|
|||
}
|
||||
}
|
||||
|
||||
protected HttpConversation getConversation(Request request)
|
||||
protected HttpConversation getConversation(long id)
|
||||
{
|
||||
long id = request.id();
|
||||
HttpConversation conversation = conversations.get(id);
|
||||
if (conversation == null)
|
||||
{
|
||||
|
@ -375,13 +375,17 @@ public class HttpClient extends ContainerLifeCycle
|
|||
LOG.debug("{} removed", conversation);
|
||||
}
|
||||
|
||||
// TODO: find a better method name
|
||||
protected Response.Listener lookup(Request request, Response response)
|
||||
protected List<ProtocolHandler> getProtocolHandlers()
|
||||
{
|
||||
for (ProtocolHandler handler : handlers)
|
||||
return handlers;
|
||||
}
|
||||
|
||||
protected ProtocolHandler findProtocolHandler(Request request, Response response)
|
||||
{
|
||||
for (ProtocolHandler handler : getProtocolHandlers())
|
||||
{
|
||||
if (handler.accept(request, response))
|
||||
return handler.getResponseListener();
|
||||
return handler;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
|
|
@ -111,7 +111,7 @@ public class HttpConnection extends AbstractConnection implements Connection
|
|||
idleTimeout = endPoint.getIdleTimeout();
|
||||
endPoint.setIdleTimeout(request.idleTimeout());
|
||||
|
||||
HttpConversation conversation = client.getConversation(request);
|
||||
HttpConversation conversation = client.getConversation(request.conversation());
|
||||
HttpExchange exchange = new HttpExchange(conversation, this, request, listener);
|
||||
setExchange(exchange);
|
||||
conversation.exchanges().offer(exchange);
|
||||
|
@ -348,6 +348,11 @@ public class HttpConnection extends AbstractConnection implements Connection
|
|||
receiver.fail(new HttpResponseException("Response aborted", response));
|
||||
}
|
||||
|
||||
public void proceed(boolean proceed)
|
||||
{
|
||||
sender.proceed(proceed);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close()
|
||||
{
|
||||
|
|
|
@ -39,6 +39,12 @@ public class HttpContentResponse implements ContentResponse
|
|||
this.encoding = encoding;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long conversation()
|
||||
{
|
||||
return response.conversation();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Listener listener()
|
||||
{
|
||||
|
@ -94,4 +100,15 @@ public class HttpContentResponse implements ContentResponse
|
|||
throw new UnsupportedCharsetException(encoding);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return String.format("%s[%s %d %s - %d bytes]",
|
||||
HttpContentResponse.class.getSimpleName(),
|
||||
version(),
|
||||
status(),
|
||||
reason(),
|
||||
content().length);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -63,16 +63,25 @@ public class HttpConversation implements Attributes
|
|||
this.listener = listener;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the exchange that has been identified as the last of this conversation
|
||||
* @see #last(HttpExchange)
|
||||
*/
|
||||
public HttpExchange last()
|
||||
{
|
||||
return last;
|
||||
}
|
||||
|
||||
/**
|
||||
* Remembers the given {@code exchange} as the last of this conversation.
|
||||
*
|
||||
* @param exchange the exchange that is the last of this conversation
|
||||
* @see #last()
|
||||
*/
|
||||
public void last(HttpExchange exchange)
|
||||
{
|
||||
if (last == null)
|
||||
|
||||
last = exchange;
|
||||
last = exchange;
|
||||
}
|
||||
|
||||
public void complete()
|
||||
|
|
|
@ -46,7 +46,6 @@ public class HttpDestination implements Destination, AutoCloseable, Dumpable
|
|||
private static final Logger LOG = Log.getLogger(HttpDestination.class);
|
||||
|
||||
private final AtomicInteger connectionCount = new AtomicInteger();
|
||||
private final ResponseNotifier responseNotifier = new ResponseNotifier();
|
||||
private final HttpClient client;
|
||||
private final String scheme;
|
||||
private final String host;
|
||||
|
@ -55,6 +54,7 @@ public class HttpDestination implements Destination, AutoCloseable, Dumpable
|
|||
private final BlockingQueue<Connection> idleConnections;
|
||||
private final BlockingQueue<Connection> activeConnections;
|
||||
private final RequestNotifier requestNotifier;
|
||||
private final ResponseNotifier responseNotifier;
|
||||
|
||||
public HttpDestination(HttpClient client, String scheme, String host, int port)
|
||||
{
|
||||
|
@ -66,6 +66,7 @@ public class HttpDestination implements Destination, AutoCloseable, Dumpable
|
|||
this.idleConnections = new ArrayBlockingQueue<>(client.getMaxConnectionsPerAddress());
|
||||
this.activeConnections = new ArrayBlockingQueue<>(client.getMaxConnectionsPerAddress());
|
||||
this.requestNotifier = new RequestNotifier(client);
|
||||
this.responseNotifier = new ResponseNotifier(client);
|
||||
}
|
||||
|
||||
protected BlockingQueue<Connection> getIdleConnections()
|
||||
|
|
|
@ -58,6 +58,11 @@ public class HttpExchange
|
|||
return request;
|
||||
}
|
||||
|
||||
public Throwable requestFailure()
|
||||
{
|
||||
return requestFailure;
|
||||
}
|
||||
|
||||
public Response.Listener listener()
|
||||
{
|
||||
return listener;
|
||||
|
@ -68,6 +73,11 @@ public class HttpExchange
|
|||
return response;
|
||||
}
|
||||
|
||||
public Throwable responseFailure()
|
||||
{
|
||||
return responseFailure;
|
||||
}
|
||||
|
||||
public void receive()
|
||||
{
|
||||
connection.receive();
|
||||
|
@ -84,9 +94,17 @@ public class HttpExchange
|
|||
public Result responseComplete(Throwable failure)
|
||||
{
|
||||
this.responseFailure = failure;
|
||||
int responseSuccess = 0b1100;
|
||||
int responseFailure = 0b0100;
|
||||
return complete(failure == null ? responseSuccess : responseFailure);
|
||||
if (failure == null)
|
||||
{
|
||||
int responseSuccess = 0b1100;
|
||||
return complete(responseSuccess);
|
||||
}
|
||||
else
|
||||
{
|
||||
proceed(false);
|
||||
int responseFailure = 0b0100;
|
||||
return complete(responseFailure);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -117,7 +135,7 @@ public class HttpExchange
|
|||
if (this == conversation.last())
|
||||
conversation.complete();
|
||||
connection.complete(this, success);
|
||||
return new Result(request, requestFailure, response, responseFailure);
|
||||
return new Result(request(), requestFailure(), response(), responseFailure());
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
@ -128,6 +146,19 @@ public class HttpExchange
|
|||
connection.abort(response);
|
||||
}
|
||||
|
||||
public void resetResponse(boolean success)
|
||||
{
|
||||
int responseSuccess = 0b1100;
|
||||
int responseFailure = 0b0100;
|
||||
int code = success ? responseSuccess : responseFailure;
|
||||
complete.addAndGet(-code);
|
||||
}
|
||||
|
||||
public void proceed(boolean proceed)
|
||||
{
|
||||
connection.proceed(proceed);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
|
|
|
@ -42,14 +42,15 @@ public class HttpReceiver implements HttpParser.ResponseHandler<ByteBuffer>
|
|||
private static final Logger LOG = Log.getLogger(HttpReceiver.class);
|
||||
|
||||
private final HttpParser parser = new HttpParser(this);
|
||||
private final ResponseNotifier notifier = new ResponseNotifier();
|
||||
private final HttpConnection connection;
|
||||
private final ResponseNotifier notifier;
|
||||
private ContentDecoder decoder;
|
||||
private State state = State.IDLE;
|
||||
|
||||
public HttpReceiver(HttpConnection connection)
|
||||
{
|
||||
this.connection = connection;
|
||||
this.notifier = new ResponseNotifier(connection.getHttpClient());
|
||||
}
|
||||
|
||||
public void receive()
|
||||
|
@ -115,7 +116,8 @@ public class HttpReceiver implements HttpParser.ResponseHandler<ByteBuffer>
|
|||
Response.Listener currentListener = exchange.listener();
|
||||
Response.Listener initialListener = conversation.exchanges().peekFirst().listener();
|
||||
HttpClient client = connection.getHttpClient();
|
||||
Response.Listener handlerListener = client.lookup(exchange.request(), response);
|
||||
ProtocolHandler protocolHandler = client.findProtocolHandler(exchange.request(), response);
|
||||
Response.Listener handlerListener = protocolHandler == null ? null : protocolHandler.getResponseListener();
|
||||
if (handlerListener == null)
|
||||
{
|
||||
conversation.last(exchange);
|
||||
|
@ -126,6 +128,7 @@ public class HttpReceiver implements HttpParser.ResponseHandler<ByteBuffer>
|
|||
}
|
||||
else
|
||||
{
|
||||
LOG.debug("Found protocol handler {}", protocolHandler);
|
||||
if (currentListener == initialListener)
|
||||
conversation.listener(handlerListener);
|
||||
else
|
||||
|
@ -298,7 +301,6 @@ public class HttpReceiver implements HttpParser.ResponseHandler<ByteBuffer>
|
|||
|
||||
private class DoubleResponseListener implements Response.Listener
|
||||
{
|
||||
private final ResponseNotifier notifier = new ResponseNotifier();
|
||||
private final Response.Listener listener1;
|
||||
private final Response.Listener listener2;
|
||||
|
||||
|
|
|
@ -101,7 +101,7 @@ public class HttpRequest implements Request
|
|||
}
|
||||
|
||||
@Override
|
||||
public long id()
|
||||
public long conversation()
|
||||
{
|
||||
return id;
|
||||
}
|
||||
|
|
|
@ -77,6 +77,12 @@ public class HttpResponse implements Response
|
|||
return headers;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long conversation()
|
||||
{
|
||||
return exchange.request().conversation();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Listener listener()
|
||||
{
|
||||
|
|
|
@ -21,6 +21,7 @@ package org.eclipse.jetty.client;
|
|||
import java.nio.ByteBuffer;
|
||||
import java.util.Collections;
|
||||
import java.util.Iterator;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
|
@ -28,6 +29,8 @@ import org.eclipse.jetty.client.api.ContentProvider;
|
|||
import org.eclipse.jetty.client.api.Request;
|
||||
import org.eclipse.jetty.client.api.Result;
|
||||
import org.eclipse.jetty.http.HttpGenerator;
|
||||
import org.eclipse.jetty.http.HttpHeader;
|
||||
import org.eclipse.jetty.http.HttpHeaderValue;
|
||||
import org.eclipse.jetty.io.ByteBufferPool;
|
||||
import org.eclipse.jetty.io.EndPoint;
|
||||
import org.eclipse.jetty.util.BufferUtil;
|
||||
|
@ -38,22 +41,22 @@ import org.eclipse.jetty.util.log.Logger;
|
|||
public class HttpSender
|
||||
{
|
||||
private static final Logger LOG = Log.getLogger(HttpSender.class);
|
||||
private static final String EXPECT_100_ATTRIBUTE = HttpSender.class.getName() + ".expect100";
|
||||
|
||||
private final HttpGenerator generator = new HttpGenerator();
|
||||
private final ResponseNotifier responseNotifier = new ResponseNotifier();
|
||||
private final HttpConnection connection;
|
||||
private final RequestNotifier requestNotifier;
|
||||
private long contentLength;
|
||||
private Iterator<ByteBuffer> contentChunks;
|
||||
private ByteBuffer header;
|
||||
private ByteBuffer chunk;
|
||||
private volatile boolean committed;
|
||||
private volatile boolean failed;
|
||||
private final ResponseNotifier responseNotifier;
|
||||
private Iterator<ByteBuffer> contentIterator;
|
||||
private ContentInfo expectedContent;
|
||||
private boolean committed;
|
||||
private boolean failed;
|
||||
|
||||
public HttpSender(HttpConnection connection)
|
||||
{
|
||||
this.connection = connection;
|
||||
this.requestNotifier = new RequestNotifier(connection.getHttpClient());
|
||||
this.responseNotifier = new ResponseNotifier(connection.getHttpClient());
|
||||
}
|
||||
|
||||
public void send(HttpExchange exchange)
|
||||
|
@ -68,42 +71,70 @@ public class HttpSender
|
|||
LOG.debug("Sending {}", request);
|
||||
requestNotifier.notifyBegin(request);
|
||||
ContentProvider content = request.content();
|
||||
this.contentLength = content == null ? -1 : content.length();
|
||||
this.contentChunks = content == null ? Collections.<ByteBuffer>emptyIterator() : content.iterator();
|
||||
this.contentIterator = content == null ? Collections.<ByteBuffer>emptyIterator() : content.iterator();
|
||||
send();
|
||||
}
|
||||
}
|
||||
|
||||
public void proceed(boolean proceed)
|
||||
{
|
||||
ContentInfo contentInfo = expectedContent;
|
||||
if (contentInfo != null)
|
||||
{
|
||||
contentInfo.await();
|
||||
if (proceed)
|
||||
send();
|
||||
else
|
||||
fail(new HttpRequestException("Expectation failed", connection.getExchange().request()));
|
||||
}
|
||||
}
|
||||
|
||||
private void send()
|
||||
{
|
||||
HttpClient client = connection.getHttpClient();
|
||||
ByteBufferPool bufferPool = client.getByteBufferPool();
|
||||
ByteBuffer header = null;
|
||||
ByteBuffer chunk = null;
|
||||
try
|
||||
{
|
||||
HttpClient client = connection.getHttpClient();
|
||||
EndPoint endPoint = connection.getEndPoint();
|
||||
HttpExchange exchange = connection.getExchange();
|
||||
ByteBufferPool byteBufferPool = client.getByteBufferPool();
|
||||
final Request request = exchange.request();
|
||||
HttpGenerator.RequestInfo info = null;
|
||||
ByteBuffer content = contentChunks.hasNext() ? contentChunks.next() : BufferUtil.EMPTY_BUFFER;
|
||||
boolean lastContent = !contentChunks.hasNext();
|
||||
HttpConversation conversation = client.getConversation(request.conversation());
|
||||
HttpGenerator.RequestInfo requestInfo = null;
|
||||
|
||||
boolean expect100 = request.headers().contains(HttpHeader.EXPECT, HttpHeaderValue.CONTINUE.asString());
|
||||
expect100 &= conversation.getAttribute(EXPECT_100_ATTRIBUTE) == null;
|
||||
if (expect100)
|
||||
conversation.setAttribute(EXPECT_100_ATTRIBUTE, Boolean.TRUE);
|
||||
|
||||
ContentInfo contentInfo = this.expectedContent;
|
||||
if (contentInfo == null)
|
||||
contentInfo = new ContentInfo(contentIterator);
|
||||
else
|
||||
expect100 = false;
|
||||
this.expectedContent = null;
|
||||
|
||||
while (true)
|
||||
{
|
||||
HttpGenerator.Result result = generator.generateRequest(info, header, chunk, content, lastContent);
|
||||
HttpGenerator.Result result = generator.generateRequest(requestInfo, header, chunk, contentInfo.content, contentInfo.lastContent);
|
||||
switch (result)
|
||||
{
|
||||
case NEED_INFO:
|
||||
{
|
||||
info = new HttpGenerator.RequestInfo(request.version(), request.headers(), contentLength, request.method().asString(), request.path());
|
||||
ContentProvider content = request.content();
|
||||
long contentLength = content == null ? -1 : content.length();
|
||||
requestInfo = new HttpGenerator.RequestInfo(request.version(), request.headers(), contentLength, request.method().asString(), request.path());
|
||||
break;
|
||||
}
|
||||
case NEED_HEADER:
|
||||
{
|
||||
header = byteBufferPool.acquire(client.getRequestBufferSize(), false);
|
||||
header = bufferPool.acquire(client.getRequestBufferSize(), false);
|
||||
break;
|
||||
}
|
||||
case NEED_CHUNK:
|
||||
{
|
||||
chunk = byteBufferPool.acquire(HttpGenerator.CHUNK_SIZE, false);
|
||||
chunk = bufferPool.acquire(HttpGenerator.CHUNK_SIZE, false);
|
||||
break;
|
||||
}
|
||||
case FLUSH:
|
||||
|
@ -119,9 +150,20 @@ public class HttpSender
|
|||
@Override
|
||||
protected void pendingCompleted()
|
||||
{
|
||||
LOG.debug("Write completed for {}", request);
|
||||
|
||||
if (!committed)
|
||||
committed(request);
|
||||
send();
|
||||
|
||||
if (expectedContent == null)
|
||||
{
|
||||
send();
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG.debug("Expecting 100 Continue for {}", request);
|
||||
expectedContent.ready();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -130,22 +172,37 @@ public class HttpSender
|
|||
fail(x);
|
||||
}
|
||||
};
|
||||
if (header == null)
|
||||
header = BufferUtil.EMPTY_BUFFER;
|
||||
if (chunk == null)
|
||||
chunk = BufferUtil.EMPTY_BUFFER;
|
||||
endPoint.write(null, callback, header, chunk, content);
|
||||
|
||||
if (expect100)
|
||||
{
|
||||
// Save the expected content waiting for the 100 Continue response
|
||||
expectedContent = contentInfo;
|
||||
}
|
||||
|
||||
write(callback, header, chunk, expect100 ? null : contentInfo.content);
|
||||
|
||||
if (callback.pending())
|
||||
{
|
||||
LOG.debug("Write pending for {}", request);
|
||||
return;
|
||||
}
|
||||
|
||||
if (callback.completed())
|
||||
{
|
||||
if (!committed)
|
||||
committed(request);
|
||||
|
||||
releaseBuffers();
|
||||
content = contentChunks.hasNext() ? contentChunks.next() : BufferUtil.EMPTY_BUFFER;
|
||||
lastContent = !contentChunks.hasNext();
|
||||
if (expect100)
|
||||
{
|
||||
LOG.debug("Expecting 100 Continue for {}", request);
|
||||
expectedContent.ready();
|
||||
return;
|
||||
}
|
||||
else
|
||||
{
|
||||
// Send further content
|
||||
contentInfo = new ContentInfo(contentIterator);
|
||||
}
|
||||
}
|
||||
}
|
||||
break;
|
||||
|
@ -179,7 +236,49 @@ public class HttpSender
|
|||
}
|
||||
finally
|
||||
{
|
||||
releaseBuffers();
|
||||
releaseBuffers(bufferPool, header, chunk);
|
||||
}
|
||||
}
|
||||
|
||||
private void write(Callback<Void> callback, ByteBuffer header, ByteBuffer chunk, ByteBuffer content)
|
||||
{
|
||||
int mask = 0;
|
||||
if (header != null)
|
||||
mask += 1;
|
||||
if (chunk != null)
|
||||
mask += 2;
|
||||
if (content != null)
|
||||
mask += 4;
|
||||
|
||||
EndPoint endPoint = connection.getEndPoint();
|
||||
switch (mask)
|
||||
{
|
||||
case 0:
|
||||
endPoint.write(null, callback, BufferUtil.EMPTY_BUFFER);
|
||||
break;
|
||||
case 1:
|
||||
endPoint.write(null, callback, header);
|
||||
break;
|
||||
case 2:
|
||||
endPoint.write(null, callback, chunk);
|
||||
break;
|
||||
case 3:
|
||||
endPoint.write(null, callback, header, chunk);
|
||||
break;
|
||||
case 4:
|
||||
endPoint.write(null, callback, content);
|
||||
break;
|
||||
case 5:
|
||||
endPoint.write(null, callback, header, content);
|
||||
break;
|
||||
case 6:
|
||||
endPoint.write(null, callback, chunk, content);
|
||||
break;
|
||||
case 7:
|
||||
endPoint.write(null, callback, header, chunk, content);
|
||||
break;
|
||||
default:
|
||||
throw new IllegalStateException();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -216,9 +315,6 @@ public class HttpSender
|
|||
protected void fail(Throwable failure)
|
||||
{
|
||||
// Cleanup first
|
||||
BufferUtil.clear(header);
|
||||
BufferUtil.clear(chunk);
|
||||
releaseBuffers();
|
||||
generator.abort();
|
||||
failed = true;
|
||||
|
||||
|
@ -245,19 +341,12 @@ public class HttpSender
|
|||
}
|
||||
}
|
||||
|
||||
private void releaseBuffers()
|
||||
private void releaseBuffers(ByteBufferPool bufferPool, ByteBuffer header, ByteBuffer chunk)
|
||||
{
|
||||
ByteBufferPool bufferPool = connection.getHttpClient().getByteBufferPool();
|
||||
if (!BufferUtil.hasContent(header))
|
||||
{
|
||||
bufferPool.release(header);
|
||||
header = null;
|
||||
}
|
||||
if (!BufferUtil.hasContent(chunk))
|
||||
{
|
||||
bufferPool.release(chunk);
|
||||
chunk = null;
|
||||
}
|
||||
}
|
||||
|
||||
private static abstract class StatefulExecutorCallback implements Callback<Void>, Runnable
|
||||
|
@ -341,4 +430,34 @@ public class HttpSender
|
|||
INCOMPLETE, PENDING, COMPLETE, FAILED
|
||||
}
|
||||
}
|
||||
|
||||
private class ContentInfo
|
||||
{
|
||||
private final CountDownLatch latch = new CountDownLatch(1);
|
||||
public final boolean lastContent;
|
||||
public final ByteBuffer content;
|
||||
|
||||
public ContentInfo(Iterator<ByteBuffer> contentIterator)
|
||||
{
|
||||
lastContent = !contentIterator.hasNext();
|
||||
content = lastContent ? BufferUtil.EMPTY_BUFFER : contentIterator.next();
|
||||
}
|
||||
|
||||
public void ready()
|
||||
{
|
||||
latch.countDown();
|
||||
}
|
||||
|
||||
public void await()
|
||||
{
|
||||
try
|
||||
{
|
||||
latch.await();
|
||||
}
|
||||
catch (InterruptedException x)
|
||||
{
|
||||
throw new IllegalStateException(x);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -28,12 +28,13 @@ public class RedirectProtocolHandler extends Response.Listener.Empty implements
|
|||
{
|
||||
private static final String ATTRIBUTE = RedirectProtocolHandler.class.getName() + ".redirect";
|
||||
|
||||
private final ResponseNotifier notifier = new ResponseNotifier();
|
||||
private final HttpClient client;
|
||||
private final ResponseNotifier notifier;
|
||||
|
||||
public RedirectProtocolHandler(HttpClient client)
|
||||
{
|
||||
this.client = client;
|
||||
this.notifier = new ResponseNotifier(client);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -104,7 +105,7 @@ public class RedirectProtocolHandler extends Response.Listener.Empty implements
|
|||
private void redirect(Result result, HttpMethod method, String location)
|
||||
{
|
||||
Request request = result.getRequest();
|
||||
HttpConversation conversation = client.getConversation(request);
|
||||
HttpConversation conversation = client.getConversation(request.conversation());
|
||||
Integer redirects = (Integer)conversation.getAttribute(ATTRIBUTE);
|
||||
if (redirects == null)
|
||||
redirects = 0;
|
||||
|
@ -114,7 +115,7 @@ public class RedirectProtocolHandler extends Response.Listener.Empty implements
|
|||
++redirects;
|
||||
conversation.setAttribute(ATTRIBUTE, redirects);
|
||||
|
||||
Request redirect = client.newRequest(request.id(), location);
|
||||
Request redirect = client.newRequest(request.conversation(), location);
|
||||
|
||||
// Use given method
|
||||
redirect.method(method);
|
||||
|
@ -140,7 +141,7 @@ public class RedirectProtocolHandler extends Response.Listener.Empty implements
|
|||
{
|
||||
Request request = result.getRequest();
|
||||
Response response = result.getResponse();
|
||||
HttpConversation conversation = client.getConversation(request);
|
||||
HttpConversation conversation = client.getConversation(request.conversation());
|
||||
Response.Listener listener = conversation.exchanges().peekFirst().listener();
|
||||
// TODO: should we reply all event, or just the failure ?
|
||||
notifier.notifyFailure(listener, response, failure);
|
||||
|
|
|
@ -20,6 +20,8 @@ package org.eclipse.jetty.client;
|
|||
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
import org.eclipse.jetty.client.api.ContentResponse;
|
||||
import org.eclipse.jetty.client.api.Request;
|
||||
import org.eclipse.jetty.client.api.Response;
|
||||
import org.eclipse.jetty.client.api.Result;
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
|
@ -28,6 +30,12 @@ import org.eclipse.jetty.util.log.Logger;
|
|||
public class ResponseNotifier
|
||||
{
|
||||
private static final Logger LOG = Log.getLogger(ResponseNotifier.class);
|
||||
private final HttpClient client;
|
||||
|
||||
public ResponseNotifier(HttpClient client)
|
||||
{
|
||||
this.client = client;
|
||||
}
|
||||
|
||||
public void notifyBegin(Response.Listener listener, Response response)
|
||||
{
|
||||
|
@ -106,4 +114,38 @@ public class ResponseNotifier
|
|||
LOG.info("Exception while notifying listener " + listener, x);
|
||||
}
|
||||
}
|
||||
|
||||
public void forwardSuccess(Response.Listener listener, Response response)
|
||||
{
|
||||
notifyBegin(listener, response);
|
||||
notifyHeaders(listener, response);
|
||||
if (response instanceof ContentResponse)
|
||||
notifyContent(listener, response, ByteBuffer.wrap(((ContentResponse)response).content()));
|
||||
notifySuccess(listener, response);
|
||||
}
|
||||
|
||||
public void forwardSuccessComplete(Response.Listener listener, Request request, Response response)
|
||||
{
|
||||
HttpConversation conversation = client.getConversation(request.conversation());
|
||||
forwardSuccess(listener, response);
|
||||
conversation.complete();
|
||||
notifyComplete(listener, new Result(request, response));
|
||||
}
|
||||
|
||||
public void forwardFailure(Response.Listener listener, Response response, Throwable failure)
|
||||
{
|
||||
notifyBegin(listener, response);
|
||||
notifyHeaders(listener, response);
|
||||
if (response instanceof ContentResponse)
|
||||
notifyContent(listener, response, ByteBuffer.wrap(((ContentResponse)response).content()));
|
||||
notifyFailure(listener, response, failure);
|
||||
}
|
||||
|
||||
public void forwardFailureComplete(Response.Listener listener, Request request, Throwable requestFailure, Response response, Throwable responseFailure)
|
||||
{
|
||||
HttpConversation conversation = client.getConversation(request.conversation());
|
||||
forwardFailure(listener, response, responseFailure);
|
||||
conversation.complete();
|
||||
notifyComplete(listener, new Result(request, requestFailure, response, responseFailure));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -44,7 +44,7 @@ public interface Request
|
|||
/**
|
||||
* @return the conversation id
|
||||
*/
|
||||
long id();
|
||||
long conversation();
|
||||
|
||||
/**
|
||||
* @return the scheme of this request, such as "http" or "https"
|
||||
|
|
|
@ -36,6 +36,11 @@ import org.eclipse.jetty.http.HttpVersion;
|
|||
*/
|
||||
public interface Response
|
||||
{
|
||||
/**
|
||||
* @return the conversation id
|
||||
*/
|
||||
long conversation();
|
||||
|
||||
/**
|
||||
* @return the response listener passed to {@link Request#send(Listener)}
|
||||
*/
|
||||
|
|
|
@ -0,0 +1,440 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd.
|
||||
// ------------------------------------------------------------------------
|
||||
// All rights reserved. This program and the accompanying materials
|
||||
// are made available under the terms of the Eclipse Public License v1.0
|
||||
// and Apache License v2.0 which accompanies this distribution.
|
||||
//
|
||||
// The Eclipse Public License is available at
|
||||
// http://www.eclipse.org/legal/epl-v10.html
|
||||
//
|
||||
// The Apache License v2.0 is available at
|
||||
// http://www.opensource.org/licenses/apache2.0.php
|
||||
//
|
||||
// You may elect to redistribute this code under either of these licenses.
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.client;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import javax.servlet.ServletException;
|
||||
import javax.servlet.ServletInputStream;
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
|
||||
import org.eclipse.jetty.client.api.ContentResponse;
|
||||
import org.eclipse.jetty.client.api.Response;
|
||||
import org.eclipse.jetty.client.api.Result;
|
||||
import org.eclipse.jetty.client.util.BufferingResponseListener;
|
||||
import org.eclipse.jetty.client.util.BytesContentProvider;
|
||||
import org.eclipse.jetty.http.HttpHeader;
|
||||
import org.eclipse.jetty.http.HttpHeaderValue;
|
||||
import org.eclipse.jetty.http.HttpMethod;
|
||||
import org.eclipse.jetty.server.Request;
|
||||
import org.eclipse.jetty.server.handler.AbstractHandler;
|
||||
import org.eclipse.jetty.toolchain.test.annotation.Slow;
|
||||
import org.eclipse.jetty.util.IO;
|
||||
import org.eclipse.jetty.util.ssl.SslContextFactory;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
public class HttpClientContinueTest extends AbstractHttpClientServerTest
|
||||
{
|
||||
public HttpClientContinueTest(SslContextFactory sslContextFactory)
|
||||
{
|
||||
super(sslContextFactory);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test_Expect100Continue_WithOneContent_Respond100Continue() throws Exception
|
||||
{
|
||||
test_Expect100Continue_Respond100Continue("data1".getBytes("UTF-8"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test_Expect100Continue_WithMultipleContents_Respond100Continue() throws Exception
|
||||
{
|
||||
test_Expect100Continue_Respond100Continue("data1".getBytes("UTF-8"), "data2".getBytes("UTF-8"), "data3".getBytes("UTF-8"));
|
||||
}
|
||||
|
||||
private void test_Expect100Continue_Respond100Continue(byte[]... contents) throws Exception
|
||||
{
|
||||
start(new AbstractHandler()
|
||||
{
|
||||
@Override
|
||||
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
|
||||
{
|
||||
baseRequest.setHandled(true);
|
||||
// Send 100-Continue and copy the content back
|
||||
IO.copy(request.getInputStream(), response.getOutputStream());
|
||||
}
|
||||
});
|
||||
|
||||
ContentResponse response = client.newRequest("localhost", connector.getLocalPort())
|
||||
.scheme(scheme)
|
||||
.header(HttpHeader.EXPECT.asString(), HttpHeaderValue.CONTINUE.asString())
|
||||
.content(new BytesContentProvider(contents))
|
||||
.send()
|
||||
.get(5, TimeUnit.SECONDS);
|
||||
|
||||
Assert.assertNotNull(response);
|
||||
Assert.assertEquals(200, response.status());
|
||||
|
||||
int index = 0;
|
||||
byte[] responseContent = response.content();
|
||||
for (byte[] content : contents)
|
||||
{
|
||||
for (byte b : content)
|
||||
{
|
||||
Assert.assertEquals(b, responseContent[index++]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test_Expect100Continue_WithChunkedContent_Respond100Continue() throws Exception
|
||||
{
|
||||
start(new AbstractHandler()
|
||||
{
|
||||
@Override
|
||||
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
|
||||
{
|
||||
baseRequest.setHandled(true);
|
||||
// Send 100-Continue and copy the content back
|
||||
ServletInputStream input = request.getInputStream();
|
||||
// Make sure we chunk the response too
|
||||
response.flushBuffer();
|
||||
IO.copy(input, response.getOutputStream());
|
||||
}
|
||||
});
|
||||
|
||||
byte[] content1 = new byte[10240];
|
||||
byte[] content2 = new byte[16384];
|
||||
ContentResponse response = client.newRequest("localhost", connector.getLocalPort())
|
||||
.scheme(scheme)
|
||||
.header(HttpHeader.EXPECT.asString(), HttpHeaderValue.CONTINUE.asString())
|
||||
.content(new BytesContentProvider(content1, content2)
|
||||
{
|
||||
@Override
|
||||
public long length()
|
||||
{
|
||||
return -1;
|
||||
}
|
||||
})
|
||||
.send()
|
||||
.get(5, TimeUnit.SECONDS);
|
||||
|
||||
Assert.assertNotNull(response);
|
||||
Assert.assertEquals(200, response.status());
|
||||
|
||||
int index = 0;
|
||||
byte[] responseContent = response.content();
|
||||
for (byte b : content1)
|
||||
Assert.assertEquals(b, responseContent[index++]);
|
||||
for (byte b : content2)
|
||||
Assert.assertEquals(b, responseContent[index++]);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test_Expect100Continue_WithContent_Respond417ExpectationFailed() throws Exception
|
||||
{
|
||||
test_Expect100Continue_WithContent_RespondError(417);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test_Expect100Continue_WithContent_Respond413RequestEntityTooLarge() throws Exception
|
||||
{
|
||||
test_Expect100Continue_WithContent_RespondError(413);
|
||||
}
|
||||
|
||||
private void test_Expect100Continue_WithContent_RespondError(final int error) throws Exception
|
||||
{
|
||||
start(new AbstractHandler()
|
||||
{
|
||||
@Override
|
||||
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
|
||||
{
|
||||
baseRequest.setHandled(true);
|
||||
response.sendError(error);
|
||||
}
|
||||
});
|
||||
|
||||
byte[] content1 = new byte[10240];
|
||||
byte[] content2 = new byte[16384];
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
client.newRequest("localhost", connector.getLocalPort())
|
||||
.scheme(scheme)
|
||||
.header(HttpHeader.EXPECT.asString(), HttpHeaderValue.CONTINUE.asString())
|
||||
.content(new BytesContentProvider(content1, content2))
|
||||
.send(new BufferingResponseListener()
|
||||
{
|
||||
@Override
|
||||
public void onComplete(Result result)
|
||||
{
|
||||
Assert.assertTrue(result.isFailed());
|
||||
Assert.assertNotNull(result.getRequestFailure());
|
||||
Assert.assertNull(result.getResponseFailure());
|
||||
byte[] content = getContent();
|
||||
Assert.assertNotNull(content);
|
||||
Assert.assertTrue(content.length > 0);
|
||||
Assert.assertEquals(error, result.getResponse().status());
|
||||
latch.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test_Expect100Continue_WithContent_WithRedirect() throws Exception
|
||||
{
|
||||
final String data = "success";
|
||||
start(new AbstractHandler()
|
||||
{
|
||||
@Override
|
||||
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
|
||||
{
|
||||
baseRequest.setHandled(true);
|
||||
if (request.getRequestURI().endsWith("/done"))
|
||||
{
|
||||
response.getOutputStream().print(data);
|
||||
}
|
||||
else
|
||||
{
|
||||
// Send 100-Continue and consume the content
|
||||
IO.copy(request.getInputStream(), new ByteArrayOutputStream());
|
||||
// Send a redirect
|
||||
response.sendRedirect("/done");
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
byte[] content = new byte[10240];
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
client.newRequest("localhost", connector.getLocalPort())
|
||||
.scheme(scheme)
|
||||
.method(HttpMethod.POST)
|
||||
.path("/continue")
|
||||
.header(HttpHeader.EXPECT.asString(), HttpHeaderValue.CONTINUE.asString())
|
||||
.content(new BytesContentProvider(content))
|
||||
.send(new BufferingResponseListener()
|
||||
{
|
||||
@Override
|
||||
public void onComplete(Result result)
|
||||
{
|
||||
Assert.assertFalse(result.isFailed());
|
||||
Assert.assertEquals(200, result.getResponse().status());
|
||||
Assert.assertEquals(data, getContentAsString());
|
||||
latch.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test_Redirect_WithExpect100Continue_WithContent() throws Exception
|
||||
{
|
||||
// A request with Expect: 100-Continue cannot receive non-final responses like 3xx
|
||||
|
||||
final String data = "success";
|
||||
start(new AbstractHandler()
|
||||
{
|
||||
@Override
|
||||
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
|
||||
{
|
||||
baseRequest.setHandled(true);
|
||||
if (request.getRequestURI().endsWith("/done"))
|
||||
{
|
||||
// Send 100-Continue and consume the content
|
||||
IO.copy(request.getInputStream(), new ByteArrayOutputStream());
|
||||
response.getOutputStream().print(data);
|
||||
}
|
||||
else
|
||||
{
|
||||
// Send a redirect
|
||||
response.sendRedirect("/done");
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
byte[] content = new byte[10240];
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
client.newRequest("localhost", connector.getLocalPort())
|
||||
.scheme(scheme)
|
||||
.method(HttpMethod.POST)
|
||||
.path("/redirect")
|
||||
.header(HttpHeader.EXPECT.asString(), HttpHeaderValue.CONTINUE.asString())
|
||||
.content(new BytesContentProvider(content))
|
||||
.send(new BufferingResponseListener()
|
||||
{
|
||||
@Override
|
||||
public void onComplete(Result result)
|
||||
{
|
||||
Assert.assertTrue(result.isFailed());
|
||||
Assert.assertNotNull(result.getRequestFailure());
|
||||
Assert.assertNull(result.getResponseFailure());
|
||||
Assert.assertEquals(302, result.getResponse().status());
|
||||
latch.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
@Slow
|
||||
@Test
|
||||
public void test_Expect100Continue_WithContent_WithResponseFailure_Before100Continue() throws Exception
|
||||
{
|
||||
final long idleTimeout = 1000;
|
||||
start(new AbstractHandler()
|
||||
{
|
||||
@Override
|
||||
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
|
||||
{
|
||||
baseRequest.setHandled(true);
|
||||
try
|
||||
{
|
||||
TimeUnit.MILLISECONDS.sleep(2 * idleTimeout);
|
||||
}
|
||||
catch (InterruptedException x)
|
||||
{
|
||||
throw new ServletException(x);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
client.setIdleTimeout(idleTimeout);
|
||||
|
||||
byte[] content = new byte[1024];
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
client.newRequest("localhost", connector.getLocalPort())
|
||||
.scheme(scheme)
|
||||
.header(HttpHeader.EXPECT.asString(), HttpHeaderValue.CONTINUE.asString())
|
||||
.content(new BytesContentProvider(content))
|
||||
.send(new BufferingResponseListener()
|
||||
{
|
||||
@Override
|
||||
public void onComplete(Result result)
|
||||
{
|
||||
Assert.assertTrue(result.isFailed());
|
||||
Assert.assertNotNull(result.getRequestFailure());
|
||||
Assert.assertNotNull(result.getResponseFailure());
|
||||
latch.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
Assert.assertTrue(latch.await(3 * idleTimeout, TimeUnit.MILLISECONDS));
|
||||
}
|
||||
|
||||
@Slow
|
||||
@Test
|
||||
public void test_Expect100Continue_WithContent_WithResponseFailure_After100Continue() throws Exception
|
||||
{
|
||||
final long idleTimeout = 1000;
|
||||
start(new AbstractHandler()
|
||||
{
|
||||
@Override
|
||||
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
|
||||
{
|
||||
baseRequest.setHandled(true);
|
||||
// Send 100-Continue and consume the content
|
||||
IO.copy(request.getInputStream(), new ByteArrayOutputStream());
|
||||
try
|
||||
{
|
||||
TimeUnit.MILLISECONDS.sleep(2 * idleTimeout);
|
||||
}
|
||||
catch (InterruptedException x)
|
||||
{
|
||||
throw new ServletException(x);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
client.setIdleTimeout(idleTimeout);
|
||||
|
||||
byte[] content = new byte[1024];
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
client.newRequest("localhost", connector.getLocalPort())
|
||||
.scheme(scheme)
|
||||
.header(HttpHeader.EXPECT.asString(), HttpHeaderValue.CONTINUE.asString())
|
||||
.content(new BytesContentProvider(content))
|
||||
.send(new BufferingResponseListener()
|
||||
{
|
||||
@Override
|
||||
public void onComplete(Result result)
|
||||
{
|
||||
Assert.assertTrue(result.isFailed());
|
||||
Assert.assertNull(result.getRequestFailure());
|
||||
Assert.assertNotNull(result.getResponseFailure());
|
||||
latch.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
Assert.assertTrue(latch.await(3 * idleTimeout, TimeUnit.MILLISECONDS));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test_Expect100Continue_WithContent_WithResponseFailure_During100Continue() throws Exception
|
||||
{
|
||||
start(new AbstractHandler()
|
||||
{
|
||||
@Override
|
||||
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
|
||||
{
|
||||
baseRequest.setHandled(true);
|
||||
// Send 100-Continue and consume the content
|
||||
IO.copy(request.getInputStream(), new ByteArrayOutputStream());
|
||||
}
|
||||
});
|
||||
|
||||
client.getProtocolHandlers().clear();
|
||||
client.getProtocolHandlers().add(new ContinueProtocolHandler(client)
|
||||
{
|
||||
@Override
|
||||
public Response.Listener getResponseListener()
|
||||
{
|
||||
final Response.Listener listener = super.getResponseListener();
|
||||
return new Response.Listener.Empty()
|
||||
{
|
||||
@Override
|
||||
public void onBegin(Response response)
|
||||
{
|
||||
response.abort();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Response response, Throwable failure)
|
||||
{
|
||||
listener.onFailure(response, failure);
|
||||
}
|
||||
};
|
||||
}
|
||||
});
|
||||
|
||||
byte[] content = new byte[1024];
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
client.newRequest("localhost", connector.getLocalPort())
|
||||
.scheme(scheme)
|
||||
.header(HttpHeader.EXPECT.asString(), HttpHeaderValue.CONTINUE.asString())
|
||||
.content(new BytesContentProvider(content))
|
||||
.send(new BufferingResponseListener()
|
||||
{
|
||||
@Override
|
||||
public void onComplete(Result result)
|
||||
{
|
||||
Assert.assertTrue(result.isFailed());
|
||||
Assert.assertNotNull(result.getRequestFailure());
|
||||
Assert.assertNotNull(result.getResponseFailure());
|
||||
latch.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
|
||||
}
|
||||
}
|
|
@ -20,6 +20,7 @@ package org.eclipse.jetty.client;
|
|||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.EOFException;
|
||||
import java.net.URI;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
|
@ -67,7 +68,8 @@ public class HttpReceiverTest
|
|||
|
||||
protected HttpExchange newExchange(Response.Listener listener)
|
||||
{
|
||||
HttpExchange exchange = new HttpExchange(conversation, connection, null, listener);
|
||||
HttpRequest request = new HttpRequest(client, URI.create("http://localhost"));
|
||||
HttpExchange exchange = new HttpExchange(conversation, connection, request, listener);
|
||||
conversation.exchanges().offer(exchange);
|
||||
connection.setExchange(exchange);
|
||||
exchange.requestComplete(null);
|
||||
|
|
|
@ -198,15 +198,26 @@ public class HttpGenerator
|
|||
else
|
||||
generateHeaders(info,header,content,last);
|
||||
|
||||
// handle the content.
|
||||
int len = BufferUtil.length(content);
|
||||
if (len>0)
|
||||
boolean expect100 = info.getHttpFields().contains(HttpHeader.EXPECT, HttpHeaderValue.CONTINUE.asString());
|
||||
|
||||
if (expect100)
|
||||
{
|
||||
_contentPrepared+=len;
|
||||
if (isChunking())
|
||||
prepareChunk(header,len);
|
||||
_state = State.COMMITTED;
|
||||
}
|
||||
_state = last?State.COMPLETING:State.COMMITTED;
|
||||
else
|
||||
{
|
||||
// handle the content.
|
||||
int len = BufferUtil.length(content);
|
||||
if (len>0)
|
||||
{
|
||||
_contentPrepared+=len;
|
||||
if (isChunking())
|
||||
prepareChunk(header,len);
|
||||
}
|
||||
_state = last?State.COMPLETING:State.COMMITTED;
|
||||
}
|
||||
|
||||
return Result.FLUSH;
|
||||
}
|
||||
catch(Exception e)
|
||||
{
|
||||
|
@ -217,8 +228,6 @@ public class HttpGenerator
|
|||
{
|
||||
BufferUtil.flipToFlush(header,pos);
|
||||
}
|
||||
|
||||
return Result.FLUSH;
|
||||
}
|
||||
|
||||
case COMMITTED:
|
||||
|
|
|
@ -58,9 +58,8 @@ public class MappedByteBufferPool implements ByteBufferPool
|
|||
int capacity = bucket * factor;
|
||||
result = direct ? BufferUtil.allocateDirect(capacity) : BufferUtil.allocate(capacity);
|
||||
}
|
||||
else
|
||||
BufferUtil.clear(result);
|
||||
|
||||
BufferUtil.clear(result);
|
||||
return result;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue