jetty-9: HTTP client: implemented support for 100-Continue.

This commit is contained in:
Simone Bordet 2012-10-10 21:55:15 +02:00
parent e41ad27ca4
commit 402afc6092
19 changed files with 880 additions and 105 deletions

View File

@ -18,7 +18,6 @@
package org.eclipse.jetty.client;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@ -40,9 +39,9 @@ public class AuthenticationProtocolHandler implements ProtocolHandler
public static final Logger LOG = Log.getLogger(AuthenticationProtocolHandler.class);
private static final Pattern WWW_AUTHENTICATE_PATTERN = Pattern.compile("([^\\s]+)\\s+realm=\"([^\"]+)\".*", Pattern.CASE_INSENSITIVE);
private final ResponseNotifier notifier = new ResponseNotifier();
private final HttpClient client;
private final int maxContentLength;
private final ResponseNotifier notifier;
public AuthenticationProtocolHandler(HttpClient client)
{
@ -53,6 +52,7 @@ public class AuthenticationProtocolHandler implements ProtocolHandler
{
this.client = client;
this.maxContentLength = maxContentLength;
this.notifier = new ResponseNotifier(client);
}
@Override
@ -64,6 +64,7 @@ public class AuthenticationProtocolHandler implements ProtocolHandler
@Override
public Response.Listener getResponseListener()
{
// Return new instances every time to keep track of the response content
return new AuthenticationListener();
}
@ -78,12 +79,14 @@ public class AuthenticationProtocolHandler implements ProtocolHandler
public void onComplete(Result result)
{
Request request = result.getRequest();
HttpConversation conversation = client.getConversation(request.conversation());
Response.Listener listener = conversation.exchanges().peekFirst().listener();
ContentResponse response = new HttpContentResponse(result.getResponse(), getContent(), getEncoding());
if (result.isFailed())
{
Throwable failure = result.getFailure();
LOG.debug("Authentication challenge failed {}", failure);
forwardFailure(request, response, failure);
notifier.forwardFailureComplete(listener, request, result.getRequestFailure(), response, result.getResponseFailure());
return;
}
@ -91,7 +94,7 @@ public class AuthenticationProtocolHandler implements ProtocolHandler
if (wwwAuthenticates.isEmpty())
{
LOG.debug("Authentication challenge without WWW-Authenticate header");
forwardFailure(request, response, new HttpResponseException("HTTP protocol violation: 401 without WWW-Authenticate header", response));
notifier.forwardFailureComplete(listener, request, null, response, new HttpResponseException("HTTP protocol violation: 401 without WWW-Authenticate header", response));
return;
}
@ -110,16 +113,15 @@ public class AuthenticationProtocolHandler implements ProtocolHandler
if (authentication == null)
{
LOG.debug("No authentication available for {}", request);
forwardSuccess(request, response);
notifier.forwardSuccessComplete(listener, request, response);
return;
}
HttpConversation conversation = client.getConversation(request);
final Authentication.Result authnResult = authentication.authenticate(request, response, wwwAuthenticate.value, conversation);
LOG.debug("Authentication result {}", authnResult);
if (authnResult == null)
{
forwardSuccess(request, response);
notifier.forwardSuccessComplete(listener, request, response);
return;
}
@ -134,32 +136,6 @@ public class AuthenticationProtocolHandler implements ProtocolHandler
});
}
private void forwardFailure(Request request, Response response, Throwable failure)
{
HttpConversation conversation = client.getConversation(request);
Response.Listener listener = conversation.exchanges().peekFirst().listener();
notifier.notifyBegin(listener, response);
notifier.notifyHeaders(listener, response);
if (response instanceof ContentResponse)
notifier.notifyContent(listener, response, ByteBuffer.wrap(((ContentResponse)response).content()));
notifier.notifyFailure(listener, response, failure);
conversation.complete();
notifier.notifyComplete(listener, new Result(request, response, failure));
}
private void forwardSuccess(Request request, Response response)
{
HttpConversation conversation = client.getConversation(request);
Response.Listener listener = conversation.exchanges().peekFirst().listener();
notifier.notifyBegin(listener, response);
notifier.notifyHeaders(listener, response);
if (response instanceof ContentResponse)
notifier.notifyContent(listener, response, ByteBuffer.wrap(((ContentResponse)response).content()));
notifier.notifySuccess(listener, response);
conversation.complete();
notifier.notifyComplete(listener, new Result(request, response));
}
private List<WWWAuthenticate> parseWWWAuthenticate(Response response)
{
// TODO: these should be ordered by strength

View File

@ -0,0 +1,106 @@
//
// ========================================================================
// Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.client;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.util.BufferingResponseListener;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpHeaderValue;
public class ContinueProtocolHandler implements ProtocolHandler
{
private static final String ATTRIBUTE = ContinueProtocolHandler.class.getName() + ".100continue";
private final HttpClient client;
private final ResponseNotifier notifier;
public ContinueProtocolHandler(HttpClient client)
{
this.client = client;
this.notifier = new ResponseNotifier(client);
}
@Override
public boolean accept(Request request, Response response)
{
boolean expect100 = request.headers().contains(HttpHeader.EXPECT, HttpHeaderValue.CONTINUE.asString());
boolean handled100 = client.getConversation(request.conversation()).getAttribute(ATTRIBUTE) != null;
return expect100 && !handled100;
}
@Override
public Response.Listener getResponseListener()
{
// Return new instances every time to keep track of the response content
return new ContinueListener();
}
private class ContinueListener extends BufferingResponseListener
{
@Override
public void onSuccess(Response response)
{
// Handling of success must be done here and not from onComplete(),
// since the onComplete() is not invoked because the request is not completed yet.
HttpConversation conversation = client.getConversation(response.conversation());
// Mark the 100 Continue response as handled
conversation.setAttribute(ATTRIBUTE, Boolean.TRUE);
HttpExchange exchange = conversation.exchanges().peekLast();
assert exchange.response() == response;
Response.Listener listener = exchange.listener();
switch (response.status())
{
case 100:
{
// All good, continue
exchange.resetResponse(true);
conversation.listener(listener);
exchange.proceed(true);
break;
}
default:
{
// Server either does not support 100 Continue, or it does and wants to refuse the request content
HttpContentResponse contentResponse = new HttpContentResponse(response, getContent(), getEncoding());
notifier.forwardSuccess(listener, contentResponse);
conversation.listener(listener);
exchange.proceed(false);
break;
}
}
}
@Override
public void onFailure(Response response, Throwable failure)
{
HttpConversation conversation = client.getConversation(response.conversation());
// Mark the 100 Continue response as handled
conversation.setAttribute(ATTRIBUTE, Boolean.TRUE);
HttpExchange exchange = conversation.exchanges().peekLast();
assert exchange.response() == response;
Response.Listener listener = exchange.listener();
HttpContentResponse contentResponse = new HttpContentResponse(response, getContent(), getEncoding());
notifier.forwardFailureComplete(listener, exchange.request(), exchange.requestFailure(), contentResponse, failure);
}
}
}

View File

@ -164,6 +164,7 @@ public class HttpClient extends ContainerLifeCycle
selectorManager = newSelectorManager();
addBean(selectorManager);
handlers.add(new ContinueProtocolHandler(this));
handlers.add(new RedirectProtocolHandler(this));
handlers.add(new AuthenticationProtocolHandler(this));
@ -353,9 +354,8 @@ public class HttpClient extends ContainerLifeCycle
}
}
protected HttpConversation getConversation(Request request)
protected HttpConversation getConversation(long id)
{
long id = request.id();
HttpConversation conversation = conversations.get(id);
if (conversation == null)
{
@ -375,13 +375,17 @@ public class HttpClient extends ContainerLifeCycle
LOG.debug("{} removed", conversation);
}
// TODO: find a better method name
protected Response.Listener lookup(Request request, Response response)
protected List<ProtocolHandler> getProtocolHandlers()
{
for (ProtocolHandler handler : handlers)
return handlers;
}
protected ProtocolHandler findProtocolHandler(Request request, Response response)
{
for (ProtocolHandler handler : getProtocolHandlers())
{
if (handler.accept(request, response))
return handler.getResponseListener();
return handler;
}
return null;
}

View File

@ -111,7 +111,7 @@ public class HttpConnection extends AbstractConnection implements Connection
idleTimeout = endPoint.getIdleTimeout();
endPoint.setIdleTimeout(request.idleTimeout());
HttpConversation conversation = client.getConversation(request);
HttpConversation conversation = client.getConversation(request.conversation());
HttpExchange exchange = new HttpExchange(conversation, this, request, listener);
setExchange(exchange);
conversation.exchanges().offer(exchange);
@ -348,6 +348,11 @@ public class HttpConnection extends AbstractConnection implements Connection
receiver.fail(new HttpResponseException("Response aborted", response));
}
public void proceed(boolean proceed)
{
sender.proceed(proceed);
}
@Override
public void close()
{

View File

@ -39,6 +39,12 @@ public class HttpContentResponse implements ContentResponse
this.encoding = encoding;
}
@Override
public long conversation()
{
return response.conversation();
}
@Override
public Listener listener()
{
@ -94,4 +100,15 @@ public class HttpContentResponse implements ContentResponse
throw new UnsupportedCharsetException(encoding);
}
}
@Override
public String toString()
{
return String.format("%s[%s %d %s - %d bytes]",
HttpContentResponse.class.getSimpleName(),
version(),
status(),
reason(),
content().length);
}
}

View File

@ -63,16 +63,25 @@ public class HttpConversation implements Attributes
this.listener = listener;
}
/**
* @return the exchange that has been identified as the last of this conversation
* @see #last(HttpExchange)
*/
public HttpExchange last()
{
return last;
}
/**
* Remembers the given {@code exchange} as the last of this conversation.
*
* @param exchange the exchange that is the last of this conversation
* @see #last()
*/
public void last(HttpExchange exchange)
{
if (last == null)
last = exchange;
last = exchange;
}
public void complete()

View File

@ -46,7 +46,6 @@ public class HttpDestination implements Destination, AutoCloseable, Dumpable
private static final Logger LOG = Log.getLogger(HttpDestination.class);
private final AtomicInteger connectionCount = new AtomicInteger();
private final ResponseNotifier responseNotifier = new ResponseNotifier();
private final HttpClient client;
private final String scheme;
private final String host;
@ -55,6 +54,7 @@ public class HttpDestination implements Destination, AutoCloseable, Dumpable
private final BlockingQueue<Connection> idleConnections;
private final BlockingQueue<Connection> activeConnections;
private final RequestNotifier requestNotifier;
private final ResponseNotifier responseNotifier;
public HttpDestination(HttpClient client, String scheme, String host, int port)
{
@ -66,6 +66,7 @@ public class HttpDestination implements Destination, AutoCloseable, Dumpable
this.idleConnections = new ArrayBlockingQueue<>(client.getMaxConnectionsPerAddress());
this.activeConnections = new ArrayBlockingQueue<>(client.getMaxConnectionsPerAddress());
this.requestNotifier = new RequestNotifier(client);
this.responseNotifier = new ResponseNotifier(client);
}
protected BlockingQueue<Connection> getIdleConnections()

View File

@ -58,6 +58,11 @@ public class HttpExchange
return request;
}
public Throwable requestFailure()
{
return requestFailure;
}
public Response.Listener listener()
{
return listener;
@ -68,6 +73,11 @@ public class HttpExchange
return response;
}
public Throwable responseFailure()
{
return responseFailure;
}
public void receive()
{
connection.receive();
@ -84,9 +94,17 @@ public class HttpExchange
public Result responseComplete(Throwable failure)
{
this.responseFailure = failure;
int responseSuccess = 0b1100;
int responseFailure = 0b0100;
return complete(failure == null ? responseSuccess : responseFailure);
if (failure == null)
{
int responseSuccess = 0b1100;
return complete(responseSuccess);
}
else
{
proceed(false);
int responseFailure = 0b0100;
return complete(responseFailure);
}
}
/**
@ -117,7 +135,7 @@ public class HttpExchange
if (this == conversation.last())
conversation.complete();
connection.complete(this, success);
return new Result(request, requestFailure, response, responseFailure);
return new Result(request(), requestFailure(), response(), responseFailure());
}
return null;
}
@ -128,6 +146,19 @@ public class HttpExchange
connection.abort(response);
}
public void resetResponse(boolean success)
{
int responseSuccess = 0b1100;
int responseFailure = 0b0100;
int code = success ? responseSuccess : responseFailure;
complete.addAndGet(-code);
}
public void proceed(boolean proceed)
{
connection.proceed(proceed);
}
@Override
public String toString()
{

View File

@ -42,14 +42,15 @@ public class HttpReceiver implements HttpParser.ResponseHandler<ByteBuffer>
private static final Logger LOG = Log.getLogger(HttpReceiver.class);
private final HttpParser parser = new HttpParser(this);
private final ResponseNotifier notifier = new ResponseNotifier();
private final HttpConnection connection;
private final ResponseNotifier notifier;
private ContentDecoder decoder;
private State state = State.IDLE;
public HttpReceiver(HttpConnection connection)
{
this.connection = connection;
this.notifier = new ResponseNotifier(connection.getHttpClient());
}
public void receive()
@ -115,7 +116,8 @@ public class HttpReceiver implements HttpParser.ResponseHandler<ByteBuffer>
Response.Listener currentListener = exchange.listener();
Response.Listener initialListener = conversation.exchanges().peekFirst().listener();
HttpClient client = connection.getHttpClient();
Response.Listener handlerListener = client.lookup(exchange.request(), response);
ProtocolHandler protocolHandler = client.findProtocolHandler(exchange.request(), response);
Response.Listener handlerListener = protocolHandler == null ? null : protocolHandler.getResponseListener();
if (handlerListener == null)
{
conversation.last(exchange);
@ -126,6 +128,7 @@ public class HttpReceiver implements HttpParser.ResponseHandler<ByteBuffer>
}
else
{
LOG.debug("Found protocol handler {}", protocolHandler);
if (currentListener == initialListener)
conversation.listener(handlerListener);
else
@ -298,7 +301,6 @@ public class HttpReceiver implements HttpParser.ResponseHandler<ByteBuffer>
private class DoubleResponseListener implements Response.Listener
{
private final ResponseNotifier notifier = new ResponseNotifier();
private final Response.Listener listener1;
private final Response.Listener listener2;

View File

@ -101,7 +101,7 @@ public class HttpRequest implements Request
}
@Override
public long id()
public long conversation()
{
return id;
}

View File

@ -77,6 +77,12 @@ public class HttpResponse implements Response
return headers;
}
@Override
public long conversation()
{
return exchange.request().conversation();
}
@Override
public Listener listener()
{

View File

@ -21,6 +21,7 @@ package org.eclipse.jetty.client;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Iterator;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference;
@ -28,6 +29,8 @@ import org.eclipse.jetty.client.api.ContentProvider;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.http.HttpGenerator;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpHeaderValue;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.BufferUtil;
@ -38,22 +41,22 @@ import org.eclipse.jetty.util.log.Logger;
public class HttpSender
{
private static final Logger LOG = Log.getLogger(HttpSender.class);
private static final String EXPECT_100_ATTRIBUTE = HttpSender.class.getName() + ".expect100";
private final HttpGenerator generator = new HttpGenerator();
private final ResponseNotifier responseNotifier = new ResponseNotifier();
private final HttpConnection connection;
private final RequestNotifier requestNotifier;
private long contentLength;
private Iterator<ByteBuffer> contentChunks;
private ByteBuffer header;
private ByteBuffer chunk;
private volatile boolean committed;
private volatile boolean failed;
private final ResponseNotifier responseNotifier;
private Iterator<ByteBuffer> contentIterator;
private ContentInfo expectedContent;
private boolean committed;
private boolean failed;
public HttpSender(HttpConnection connection)
{
this.connection = connection;
this.requestNotifier = new RequestNotifier(connection.getHttpClient());
this.responseNotifier = new ResponseNotifier(connection.getHttpClient());
}
public void send(HttpExchange exchange)
@ -68,42 +71,70 @@ public class HttpSender
LOG.debug("Sending {}", request);
requestNotifier.notifyBegin(request);
ContentProvider content = request.content();
this.contentLength = content == null ? -1 : content.length();
this.contentChunks = content == null ? Collections.<ByteBuffer>emptyIterator() : content.iterator();
this.contentIterator = content == null ? Collections.<ByteBuffer>emptyIterator() : content.iterator();
send();
}
}
public void proceed(boolean proceed)
{
ContentInfo contentInfo = expectedContent;
if (contentInfo != null)
{
contentInfo.await();
if (proceed)
send();
else
fail(new HttpRequestException("Expectation failed", connection.getExchange().request()));
}
}
private void send()
{
HttpClient client = connection.getHttpClient();
ByteBufferPool bufferPool = client.getByteBufferPool();
ByteBuffer header = null;
ByteBuffer chunk = null;
try
{
HttpClient client = connection.getHttpClient();
EndPoint endPoint = connection.getEndPoint();
HttpExchange exchange = connection.getExchange();
ByteBufferPool byteBufferPool = client.getByteBufferPool();
final Request request = exchange.request();
HttpGenerator.RequestInfo info = null;
ByteBuffer content = contentChunks.hasNext() ? contentChunks.next() : BufferUtil.EMPTY_BUFFER;
boolean lastContent = !contentChunks.hasNext();
HttpConversation conversation = client.getConversation(request.conversation());
HttpGenerator.RequestInfo requestInfo = null;
boolean expect100 = request.headers().contains(HttpHeader.EXPECT, HttpHeaderValue.CONTINUE.asString());
expect100 &= conversation.getAttribute(EXPECT_100_ATTRIBUTE) == null;
if (expect100)
conversation.setAttribute(EXPECT_100_ATTRIBUTE, Boolean.TRUE);
ContentInfo contentInfo = this.expectedContent;
if (contentInfo == null)
contentInfo = new ContentInfo(contentIterator);
else
expect100 = false;
this.expectedContent = null;
while (true)
{
HttpGenerator.Result result = generator.generateRequest(info, header, chunk, content, lastContent);
HttpGenerator.Result result = generator.generateRequest(requestInfo, header, chunk, contentInfo.content, contentInfo.lastContent);
switch (result)
{
case NEED_INFO:
{
info = new HttpGenerator.RequestInfo(request.version(), request.headers(), contentLength, request.method().asString(), request.path());
ContentProvider content = request.content();
long contentLength = content == null ? -1 : content.length();
requestInfo = new HttpGenerator.RequestInfo(request.version(), request.headers(), contentLength, request.method().asString(), request.path());
break;
}
case NEED_HEADER:
{
header = byteBufferPool.acquire(client.getRequestBufferSize(), false);
header = bufferPool.acquire(client.getRequestBufferSize(), false);
break;
}
case NEED_CHUNK:
{
chunk = byteBufferPool.acquire(HttpGenerator.CHUNK_SIZE, false);
chunk = bufferPool.acquire(HttpGenerator.CHUNK_SIZE, false);
break;
}
case FLUSH:
@ -119,9 +150,20 @@ public class HttpSender
@Override
protected void pendingCompleted()
{
LOG.debug("Write completed for {}", request);
if (!committed)
committed(request);
send();
if (expectedContent == null)
{
send();
}
else
{
LOG.debug("Expecting 100 Continue for {}", request);
expectedContent.ready();
}
}
@Override
@ -130,22 +172,37 @@ public class HttpSender
fail(x);
}
};
if (header == null)
header = BufferUtil.EMPTY_BUFFER;
if (chunk == null)
chunk = BufferUtil.EMPTY_BUFFER;
endPoint.write(null, callback, header, chunk, content);
if (expect100)
{
// Save the expected content waiting for the 100 Continue response
expectedContent = contentInfo;
}
write(callback, header, chunk, expect100 ? null : contentInfo.content);
if (callback.pending())
{
LOG.debug("Write pending for {}", request);
return;
}
if (callback.completed())
{
if (!committed)
committed(request);
releaseBuffers();
content = contentChunks.hasNext() ? contentChunks.next() : BufferUtil.EMPTY_BUFFER;
lastContent = !contentChunks.hasNext();
if (expect100)
{
LOG.debug("Expecting 100 Continue for {}", request);
expectedContent.ready();
return;
}
else
{
// Send further content
contentInfo = new ContentInfo(contentIterator);
}
}
}
break;
@ -179,7 +236,49 @@ public class HttpSender
}
finally
{
releaseBuffers();
releaseBuffers(bufferPool, header, chunk);
}
}
private void write(Callback<Void> callback, ByteBuffer header, ByteBuffer chunk, ByteBuffer content)
{
int mask = 0;
if (header != null)
mask += 1;
if (chunk != null)
mask += 2;
if (content != null)
mask += 4;
EndPoint endPoint = connection.getEndPoint();
switch (mask)
{
case 0:
endPoint.write(null, callback, BufferUtil.EMPTY_BUFFER);
break;
case 1:
endPoint.write(null, callback, header);
break;
case 2:
endPoint.write(null, callback, chunk);
break;
case 3:
endPoint.write(null, callback, header, chunk);
break;
case 4:
endPoint.write(null, callback, content);
break;
case 5:
endPoint.write(null, callback, header, content);
break;
case 6:
endPoint.write(null, callback, chunk, content);
break;
case 7:
endPoint.write(null, callback, header, chunk, content);
break;
default:
throw new IllegalStateException();
}
}
@ -216,9 +315,6 @@ public class HttpSender
protected void fail(Throwable failure)
{
// Cleanup first
BufferUtil.clear(header);
BufferUtil.clear(chunk);
releaseBuffers();
generator.abort();
failed = true;
@ -245,19 +341,12 @@ public class HttpSender
}
}
private void releaseBuffers()
private void releaseBuffers(ByteBufferPool bufferPool, ByteBuffer header, ByteBuffer chunk)
{
ByteBufferPool bufferPool = connection.getHttpClient().getByteBufferPool();
if (!BufferUtil.hasContent(header))
{
bufferPool.release(header);
header = null;
}
if (!BufferUtil.hasContent(chunk))
{
bufferPool.release(chunk);
chunk = null;
}
}
private static abstract class StatefulExecutorCallback implements Callback<Void>, Runnable
@ -341,4 +430,34 @@ public class HttpSender
INCOMPLETE, PENDING, COMPLETE, FAILED
}
}
private class ContentInfo
{
private final CountDownLatch latch = new CountDownLatch(1);
public final boolean lastContent;
public final ByteBuffer content;
public ContentInfo(Iterator<ByteBuffer> contentIterator)
{
lastContent = !contentIterator.hasNext();
content = lastContent ? BufferUtil.EMPTY_BUFFER : contentIterator.next();
}
public void ready()
{
latch.countDown();
}
public void await()
{
try
{
latch.await();
}
catch (InterruptedException x)
{
throw new IllegalStateException(x);
}
}
}
}

View File

@ -28,12 +28,13 @@ public class RedirectProtocolHandler extends Response.Listener.Empty implements
{
private static final String ATTRIBUTE = RedirectProtocolHandler.class.getName() + ".redirect";
private final ResponseNotifier notifier = new ResponseNotifier();
private final HttpClient client;
private final ResponseNotifier notifier;
public RedirectProtocolHandler(HttpClient client)
{
this.client = client;
this.notifier = new ResponseNotifier(client);
}
@Override
@ -104,7 +105,7 @@ public class RedirectProtocolHandler extends Response.Listener.Empty implements
private void redirect(Result result, HttpMethod method, String location)
{
Request request = result.getRequest();
HttpConversation conversation = client.getConversation(request);
HttpConversation conversation = client.getConversation(request.conversation());
Integer redirects = (Integer)conversation.getAttribute(ATTRIBUTE);
if (redirects == null)
redirects = 0;
@ -114,7 +115,7 @@ public class RedirectProtocolHandler extends Response.Listener.Empty implements
++redirects;
conversation.setAttribute(ATTRIBUTE, redirects);
Request redirect = client.newRequest(request.id(), location);
Request redirect = client.newRequest(request.conversation(), location);
// Use given method
redirect.method(method);
@ -140,7 +141,7 @@ public class RedirectProtocolHandler extends Response.Listener.Empty implements
{
Request request = result.getRequest();
Response response = result.getResponse();
HttpConversation conversation = client.getConversation(request);
HttpConversation conversation = client.getConversation(request.conversation());
Response.Listener listener = conversation.exchanges().peekFirst().listener();
// TODO: should we reply all event, or just the failure ?
notifier.notifyFailure(listener, response, failure);

View File

@ -20,6 +20,8 @@ package org.eclipse.jetty.client;
import java.nio.ByteBuffer;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.util.log.Log;
@ -28,6 +30,12 @@ import org.eclipse.jetty.util.log.Logger;
public class ResponseNotifier
{
private static final Logger LOG = Log.getLogger(ResponseNotifier.class);
private final HttpClient client;
public ResponseNotifier(HttpClient client)
{
this.client = client;
}
public void notifyBegin(Response.Listener listener, Response response)
{
@ -106,4 +114,38 @@ public class ResponseNotifier
LOG.info("Exception while notifying listener " + listener, x);
}
}
public void forwardSuccess(Response.Listener listener, Response response)
{
notifyBegin(listener, response);
notifyHeaders(listener, response);
if (response instanceof ContentResponse)
notifyContent(listener, response, ByteBuffer.wrap(((ContentResponse)response).content()));
notifySuccess(listener, response);
}
public void forwardSuccessComplete(Response.Listener listener, Request request, Response response)
{
HttpConversation conversation = client.getConversation(request.conversation());
forwardSuccess(listener, response);
conversation.complete();
notifyComplete(listener, new Result(request, response));
}
public void forwardFailure(Response.Listener listener, Response response, Throwable failure)
{
notifyBegin(listener, response);
notifyHeaders(listener, response);
if (response instanceof ContentResponse)
notifyContent(listener, response, ByteBuffer.wrap(((ContentResponse)response).content()));
notifyFailure(listener, response, failure);
}
public void forwardFailureComplete(Response.Listener listener, Request request, Throwable requestFailure, Response response, Throwable responseFailure)
{
HttpConversation conversation = client.getConversation(request.conversation());
forwardFailure(listener, response, responseFailure);
conversation.complete();
notifyComplete(listener, new Result(request, requestFailure, response, responseFailure));
}
}

View File

@ -44,7 +44,7 @@ public interface Request
/**
* @return the conversation id
*/
long id();
long conversation();
/**
* @return the scheme of this request, such as "http" or "https"

View File

@ -36,6 +36,11 @@ import org.eclipse.jetty.http.HttpVersion;
*/
public interface Response
{
/**
* @return the conversation id
*/
long conversation();
/**
* @return the response listener passed to {@link Request#send(Listener)}
*/

View File

@ -0,0 +1,440 @@
//
// ========================================================================
// Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.client;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.servlet.ServletException;
import javax.servlet.ServletInputStream;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.client.util.BufferingResponseListener;
import org.eclipse.jetty.client.util.BytesContentProvider;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpHeaderValue;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.toolchain.test.annotation.Slow;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.junit.Assert;
import org.junit.Test;
public class HttpClientContinueTest extends AbstractHttpClientServerTest
{
public HttpClientContinueTest(SslContextFactory sslContextFactory)
{
super(sslContextFactory);
}
@Test
public void test_Expect100Continue_WithOneContent_Respond100Continue() throws Exception
{
test_Expect100Continue_Respond100Continue("data1".getBytes("UTF-8"));
}
@Test
public void test_Expect100Continue_WithMultipleContents_Respond100Continue() throws Exception
{
test_Expect100Continue_Respond100Continue("data1".getBytes("UTF-8"), "data2".getBytes("UTF-8"), "data3".getBytes("UTF-8"));
}
private void test_Expect100Continue_Respond100Continue(byte[]... contents) throws Exception
{
start(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
// Send 100-Continue and copy the content back
IO.copy(request.getInputStream(), response.getOutputStream());
}
});
ContentResponse response = client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.header(HttpHeader.EXPECT.asString(), HttpHeaderValue.CONTINUE.asString())
.content(new BytesContentProvider(contents))
.send()
.get(5, TimeUnit.SECONDS);
Assert.assertNotNull(response);
Assert.assertEquals(200, response.status());
int index = 0;
byte[] responseContent = response.content();
for (byte[] content : contents)
{
for (byte b : content)
{
Assert.assertEquals(b, responseContent[index++]);
}
}
}
@Test
public void test_Expect100Continue_WithChunkedContent_Respond100Continue() throws Exception
{
start(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
// Send 100-Continue and copy the content back
ServletInputStream input = request.getInputStream();
// Make sure we chunk the response too
response.flushBuffer();
IO.copy(input, response.getOutputStream());
}
});
byte[] content1 = new byte[10240];
byte[] content2 = new byte[16384];
ContentResponse response = client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.header(HttpHeader.EXPECT.asString(), HttpHeaderValue.CONTINUE.asString())
.content(new BytesContentProvider(content1, content2)
{
@Override
public long length()
{
return -1;
}
})
.send()
.get(5, TimeUnit.SECONDS);
Assert.assertNotNull(response);
Assert.assertEquals(200, response.status());
int index = 0;
byte[] responseContent = response.content();
for (byte b : content1)
Assert.assertEquals(b, responseContent[index++]);
for (byte b : content2)
Assert.assertEquals(b, responseContent[index++]);
}
@Test
public void test_Expect100Continue_WithContent_Respond417ExpectationFailed() throws Exception
{
test_Expect100Continue_WithContent_RespondError(417);
}
@Test
public void test_Expect100Continue_WithContent_Respond413RequestEntityTooLarge() throws Exception
{
test_Expect100Continue_WithContent_RespondError(413);
}
private void test_Expect100Continue_WithContent_RespondError(final int error) throws Exception
{
start(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
response.sendError(error);
}
});
byte[] content1 = new byte[10240];
byte[] content2 = new byte[16384];
final CountDownLatch latch = new CountDownLatch(1);
client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.header(HttpHeader.EXPECT.asString(), HttpHeaderValue.CONTINUE.asString())
.content(new BytesContentProvider(content1, content2))
.send(new BufferingResponseListener()
{
@Override
public void onComplete(Result result)
{
Assert.assertTrue(result.isFailed());
Assert.assertNotNull(result.getRequestFailure());
Assert.assertNull(result.getResponseFailure());
byte[] content = getContent();
Assert.assertNotNull(content);
Assert.assertTrue(content.length > 0);
Assert.assertEquals(error, result.getResponse().status());
latch.countDown();
}
});
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
}
@Test
public void test_Expect100Continue_WithContent_WithRedirect() throws Exception
{
final String data = "success";
start(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
if (request.getRequestURI().endsWith("/done"))
{
response.getOutputStream().print(data);
}
else
{
// Send 100-Continue and consume the content
IO.copy(request.getInputStream(), new ByteArrayOutputStream());
// Send a redirect
response.sendRedirect("/done");
}
}
});
byte[] content = new byte[10240];
final CountDownLatch latch = new CountDownLatch(1);
client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.method(HttpMethod.POST)
.path("/continue")
.header(HttpHeader.EXPECT.asString(), HttpHeaderValue.CONTINUE.asString())
.content(new BytesContentProvider(content))
.send(new BufferingResponseListener()
{
@Override
public void onComplete(Result result)
{
Assert.assertFalse(result.isFailed());
Assert.assertEquals(200, result.getResponse().status());
Assert.assertEquals(data, getContentAsString());
latch.countDown();
}
});
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
}
@Test
public void test_Redirect_WithExpect100Continue_WithContent() throws Exception
{
// A request with Expect: 100-Continue cannot receive non-final responses like 3xx
final String data = "success";
start(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
if (request.getRequestURI().endsWith("/done"))
{
// Send 100-Continue and consume the content
IO.copy(request.getInputStream(), new ByteArrayOutputStream());
response.getOutputStream().print(data);
}
else
{
// Send a redirect
response.sendRedirect("/done");
}
}
});
byte[] content = new byte[10240];
final CountDownLatch latch = new CountDownLatch(1);
client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.method(HttpMethod.POST)
.path("/redirect")
.header(HttpHeader.EXPECT.asString(), HttpHeaderValue.CONTINUE.asString())
.content(new BytesContentProvider(content))
.send(new BufferingResponseListener()
{
@Override
public void onComplete(Result result)
{
Assert.assertTrue(result.isFailed());
Assert.assertNotNull(result.getRequestFailure());
Assert.assertNull(result.getResponseFailure());
Assert.assertEquals(302, result.getResponse().status());
latch.countDown();
}
});
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
}
@Slow
@Test
public void test_Expect100Continue_WithContent_WithResponseFailure_Before100Continue() throws Exception
{
final long idleTimeout = 1000;
start(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
try
{
TimeUnit.MILLISECONDS.sleep(2 * idleTimeout);
}
catch (InterruptedException x)
{
throw new ServletException(x);
}
}
});
client.setIdleTimeout(idleTimeout);
byte[] content = new byte[1024];
final CountDownLatch latch = new CountDownLatch(1);
client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.header(HttpHeader.EXPECT.asString(), HttpHeaderValue.CONTINUE.asString())
.content(new BytesContentProvider(content))
.send(new BufferingResponseListener()
{
@Override
public void onComplete(Result result)
{
Assert.assertTrue(result.isFailed());
Assert.assertNotNull(result.getRequestFailure());
Assert.assertNotNull(result.getResponseFailure());
latch.countDown();
}
});
Assert.assertTrue(latch.await(3 * idleTimeout, TimeUnit.MILLISECONDS));
}
@Slow
@Test
public void test_Expect100Continue_WithContent_WithResponseFailure_After100Continue() throws Exception
{
final long idleTimeout = 1000;
start(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
// Send 100-Continue and consume the content
IO.copy(request.getInputStream(), new ByteArrayOutputStream());
try
{
TimeUnit.MILLISECONDS.sleep(2 * idleTimeout);
}
catch (InterruptedException x)
{
throw new ServletException(x);
}
}
});
client.setIdleTimeout(idleTimeout);
byte[] content = new byte[1024];
final CountDownLatch latch = new CountDownLatch(1);
client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.header(HttpHeader.EXPECT.asString(), HttpHeaderValue.CONTINUE.asString())
.content(new BytesContentProvider(content))
.send(new BufferingResponseListener()
{
@Override
public void onComplete(Result result)
{
Assert.assertTrue(result.isFailed());
Assert.assertNull(result.getRequestFailure());
Assert.assertNotNull(result.getResponseFailure());
latch.countDown();
}
});
Assert.assertTrue(latch.await(3 * idleTimeout, TimeUnit.MILLISECONDS));
}
@Test
public void test_Expect100Continue_WithContent_WithResponseFailure_During100Continue() throws Exception
{
start(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
// Send 100-Continue and consume the content
IO.copy(request.getInputStream(), new ByteArrayOutputStream());
}
});
client.getProtocolHandlers().clear();
client.getProtocolHandlers().add(new ContinueProtocolHandler(client)
{
@Override
public Response.Listener getResponseListener()
{
final Response.Listener listener = super.getResponseListener();
return new Response.Listener.Empty()
{
@Override
public void onBegin(Response response)
{
response.abort();
}
@Override
public void onFailure(Response response, Throwable failure)
{
listener.onFailure(response, failure);
}
};
}
});
byte[] content = new byte[1024];
final CountDownLatch latch = new CountDownLatch(1);
client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.header(HttpHeader.EXPECT.asString(), HttpHeaderValue.CONTINUE.asString())
.content(new BytesContentProvider(content))
.send(new BufferingResponseListener()
{
@Override
public void onComplete(Result result)
{
Assert.assertTrue(result.isFailed());
Assert.assertNotNull(result.getRequestFailure());
Assert.assertNotNull(result.getResponseFailure());
latch.countDown();
}
});
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
}
}

View File

@ -20,6 +20,7 @@ package org.eclipse.jetty.client;
import java.io.ByteArrayOutputStream;
import java.io.EOFException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
@ -67,7 +68,8 @@ public class HttpReceiverTest
protected HttpExchange newExchange(Response.Listener listener)
{
HttpExchange exchange = new HttpExchange(conversation, connection, null, listener);
HttpRequest request = new HttpRequest(client, URI.create("http://localhost"));
HttpExchange exchange = new HttpExchange(conversation, connection, request, listener);
conversation.exchanges().offer(exchange);
connection.setExchange(exchange);
exchange.requestComplete(null);

View File

@ -198,15 +198,26 @@ public class HttpGenerator
else
generateHeaders(info,header,content,last);
// handle the content.
int len = BufferUtil.length(content);
if (len>0)
boolean expect100 = info.getHttpFields().contains(HttpHeader.EXPECT, HttpHeaderValue.CONTINUE.asString());
if (expect100)
{
_contentPrepared+=len;
if (isChunking())
prepareChunk(header,len);
_state = State.COMMITTED;
}
_state = last?State.COMPLETING:State.COMMITTED;
else
{
// handle the content.
int len = BufferUtil.length(content);
if (len>0)
{
_contentPrepared+=len;
if (isChunking())
prepareChunk(header,len);
}
_state = last?State.COMPLETING:State.COMMITTED;
}
return Result.FLUSH;
}
catch(Exception e)
{
@ -217,8 +228,6 @@ public class HttpGenerator
{
BufferUtil.flipToFlush(header,pos);
}
return Result.FLUSH;
}
case COMMITTED: