Merge remote-tracking branch 'origin/jetty-9.2.x'

Conflicts:
	jetty-http/src/main/java/org/eclipse/jetty/http/HttpParser.java
This commit is contained in:
Greg Wilkins 2015-03-05 20:17:20 +11:00
commit 48b6bec64c
10 changed files with 163 additions and 171 deletions

View File

@ -78,7 +78,7 @@ public class ContinueProtocolHandler implements ProtocolHandler
case 100:
{
// All good, continue
exchange.resetResponse(true);
exchange.resetResponse();
exchange.proceed(null);
break;
}

View File

@ -19,29 +19,30 @@
package org.eclipse.jetty.client;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.SpinLock;
public class HttpExchange
{
private static final Logger LOG = Log.getLogger(HttpExchange.class);
private final AtomicBoolean requestComplete = new AtomicBoolean();
private final AtomicBoolean responseComplete = new AtomicBoolean();
private final AtomicInteger complete = new AtomicInteger();
private final AtomicReference<HttpChannel> channel = new AtomicReference<>();
private final HttpDestination destination;
private final HttpRequest request;
private final List<Response.ResponseListener> listeners;
private final HttpResponse response;
private volatile Throwable requestFailure;
private volatile Throwable responseFailure;
enum State { PENDING, COMPLETED, TERMINATED } ;
private final SpinLock _lock = new SpinLock();
private State requestState=State.PENDING;
private State responseState=State.PENDING;
private Throwable requestFailure;
private Throwable responseFailure;
public HttpExchange(HttpDestination destination, HttpRequest request, List<Response.ResponseListener> listeners)
{
@ -66,7 +67,10 @@ public class HttpExchange
public Throwable getRequestFailure()
{
return requestFailure;
try(SpinLock.Lock lock = _lock.lock())
{
return requestFailure;
}
}
public List<Response.ResponseListener> getResponseListeners()
@ -81,7 +85,10 @@ public class HttpExchange
public Throwable getResponseFailure()
{
return responseFailure;
try(SpinLock.Lock lock = _lock.lock())
{
return responseFailure;
}
}
public void associate(HttpChannel channel)
@ -98,90 +105,51 @@ public class HttpExchange
public boolean requestComplete()
{
return requestComplete.compareAndSet(false, true);
try(SpinLock.Lock lock = _lock.lock())
{
if (requestState!=State.PENDING)
return false;
requestState=State.COMPLETED;
return true;
}
}
public boolean responseComplete()
{
return responseComplete.compareAndSet(false, true);
try(SpinLock.Lock lock = _lock.lock())
{
if (responseState!=State.PENDING)
return false;
responseState=State.COMPLETED;
return true;
}
}
public Result terminateRequest(Throwable failure)
{
int requestSuccess = 0b0011;
int requestFailure = 0b0001;
return terminate(failure == null ? requestSuccess : requestFailure, failure);
}
public Result terminateResponse(Throwable failure)
{
if (failure == null)
try(SpinLock.Lock lock = _lock.lock())
{
int responseSuccess = 0b1100;
return terminate(responseSuccess, null);
}
else
{
proceed(failure);
int responseFailure = 0b0100;
return terminate(responseFailure, failure);
}
}
/**
* This method needs to atomically compute whether this exchange is completed,
* that is both request and responses are completed (either with a success or
* a failure).
*
* Furthermore, this method needs to atomically compute whether the exchange
* has completed successfully (both request and response are successful) or not.
*
* To do this, we use 2 bits for the request (one to indicate completion, one
* to indicate success), and similarly for the response.
* By using {@link AtomicInteger} to atomically sum these codes we can know
* whether the exchange is completed and whether is successful.
*
* @return the {@link Result} - if any - associated with the status
*/
private Result terminate(int code, Throwable failure)
{
int current = update(code, failure);
int terminated = 0b0101;
if ((current & terminated) == terminated)
{
// Request and response terminated
if (LOG.isDebugEnabled())
LOG.debug("{} terminated", this);
return new Result(getRequest(), getRequestFailure(), getResponse(), getResponseFailure());
requestState=State.TERMINATED;
requestFailure=failure;
if (State.TERMINATED.equals(responseState))
return new Result(getRequest(), requestFailure, getResponse(), responseFailure);
}
return null;
}
private int update(int code, Throwable failure)
public Result terminateResponse(Throwable failure)
{
int current;
while (true)
try(SpinLock.Lock lock = _lock.lock())
{
current = complete.get();
boolean updateable = (current & code) == 0;
if (updateable)
{
int candidate = current | code;
if (!complete.compareAndSet(current, candidate))
continue;
current = candidate;
if ((code & 0b01) == 0b01)
requestFailure = failure;
if ((code & 0b0100) == 0b0100)
responseFailure = failure;
if (LOG.isDebugEnabled())
LOG.debug("{} updated", this);
}
break;
responseState=State.TERMINATED;
responseFailure=failure;
if (State.TERMINATED.equals(requestState))
return new Result(getRequest(), requestFailure, getResponse(), responseFailure);
}
return current;
return null;
}
public boolean abort(Throwable cause)
{
if (destination.remove(this))
@ -205,7 +173,24 @@ public class HttpExchange
private boolean fail(Throwable cause)
{
if (update(0b0101, cause) == 0b0101)
boolean notify=false;
try(SpinLock.Lock lock = _lock.lock())
{
if (!Boolean.TRUE.equals(requestState))
{
requestState=State.TERMINATED;
notify=true;
requestFailure=cause;
}
if (!Boolean.TRUE.equals(responseState))
{
responseState=State.TERMINATED;
notify=true;
responseFailure=cause;
}
}
if (notify)
{
if (LOG.isDebugEnabled())
LOG.debug("Failing {}: {}", this, cause);
@ -222,13 +207,13 @@ public class HttpExchange
}
}
public void resetResponse(boolean success)
public void resetResponse()
{
responseComplete.set(false);
int responseSuccess = 0b1100;
int responseFailure = 0b0100;
int code = success ? responseSuccess : responseFailure;
complete.addAndGet(-code);
try(SpinLock.Lock lock = _lock.lock())
{
responseState=State.PENDING;
responseFailure=null;
}
}
public void proceed(Throwable failure)
@ -238,20 +223,16 @@ public class HttpExchange
channel.proceed(this, failure);
}
private String toString(int code)
{
String padding = "0000";
String status = Integer.toBinaryString(code);
return String.format("%s@%x status=%s%s",
HttpExchange.class.getSimpleName(),
hashCode(),
padding.substring(status.length()),
status);
}
@Override
public String toString()
{
return toString(complete.get());
try(SpinLock.Lock lock = _lock.lock())
{
return String.format("%s@%x req=%s/%s res=%s/%s",
HttpExchange.class.getSimpleName(),
hashCode(),
requestState,requestFailure,
responseState,responseFailure);
}
}
}

View File

@ -429,16 +429,7 @@ public abstract class HttpReceiver
dispose();
// Mark atomically the response as terminated and failed,
// with respect to concurrency between request and response.
Result result = exchange.terminateResponse(failure);
HttpResponse response = exchange.getResponse();
if (LOG.isDebugEnabled())
LOG.debug("Response failure {} {} on {}: {}", response, exchange, getHttpChannel(), failure);
List<Response.ResponseListener> listeners = exchange.getConversation().getResponseListeners();
ResponseNotifier notifier = getHttpDestination().getResponseNotifier();
notifier.notifyFailure(listeners, response, failure);
Result result = failResponse(exchange, failure);
if (fail)
{
@ -453,9 +444,25 @@ public abstract class HttpReceiver
return true;
}
private Result failResponse(HttpExchange exchange, Throwable failure)
{
// Mark atomically the response as terminated and failed,
// with respect to concurrency between request and response.
Result result = exchange.terminateResponse(failure);
HttpResponse response = exchange.getResponse();
if (LOG.isDebugEnabled())
LOG.debug("Response failure {} {} on {}: {}", response, exchange, getHttpChannel(), failure);
List<Response.ResponseListener> listeners = exchange.getConversation().getResponseListeners();
ResponseNotifier notifier = getHttpDestination().getResponseNotifier();
notifier.notifyFailure(listeners, response, failure);
return result;
}
private void terminateResponse(HttpExchange exchange, Throwable failure)
{
Result result = exchange.terminateResponse(failure);
Result result = failResponse(exchange, failure);
terminateResponse(exchange, result);
}
@ -472,7 +479,7 @@ public abstract class HttpReceiver
if (!ordered)
channel.exchangeTerminated(result);
if (LOG.isDebugEnabled())
LOG.debug("Request/Response {} {}", failure == null ? "succeeded" : "failed", response);
LOG.debug("Request/Response {}: {}", failure == null ? "succeeded" : "failed", result);
List<Response.ResponseListener> listeners = exchange.getConversation().getResponseListeners();
ResponseNotifier notifier = getHttpDestination().getResponseNotifier();
notifier.notifyComplete(listeners, result);

View File

@ -321,15 +321,7 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
dispose();
// Mark atomically the request as terminated and failed,
// with respect to concurrency between request and response.
Result result = exchange.terminateRequest(failure);
Request request = exchange.getRequest();
if (LOG.isDebugEnabled())
LOG.debug("Request failure {} {} on {}: {}", request, exchange, getHttpChannel(), failure);
HttpDestination destination = getHttpChannel().getHttpDestination();
destination.getRequestNotifier().notifyFailure(request, failure);
Result result = failRequest(exchange, failure);
if (fail)
{
@ -344,11 +336,26 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
return true;
}
private Result failRequest(HttpExchange exchange, Throwable failure)
{
// Mark atomically the request as terminated and failed,
// with respect to concurrency between request and response.
Result result = exchange.terminateRequest(failure);
Request request = exchange.getRequest();
if (LOG.isDebugEnabled())
LOG.debug("Request failure {} {} on {}: {}", request, exchange, getHttpChannel(), failure);
HttpDestination destination = getHttpChannel().getHttpDestination();
destination.getRequestNotifier().notifyFailure(request, failure);
return result;
}
private void terminateRequest(HttpExchange exchange, Throwable failure)
{
if (exchange != null)
{
Result result = exchange.terminateRequest(failure);
Result result = failRequest(exchange, failure);
terminateRequest(exchange, failure, result);
}
}
@ -376,7 +383,7 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
if (!ordered)
channel.exchangeTerminated(result);
if (LOG.isDebugEnabled())
LOG.debug("Request/Response {} {}", failure == null ? "succeeded" : "failed", request);
LOG.debug("Request/Response {}: {}", failure == null ? "succeeded" : "failed", result);
HttpConversation conversation = exchange.getConversation();
destination.getResponseNotifier().notifyComplete(conversation.getResponseListeners(), result);
if (ordered)

View File

@ -152,7 +152,7 @@ public class HttpConnectionOverHTTP extends AbstractConnection implements Connec
return closed.compareAndSet(false, true);
}
private boolean abort(Throwable failure)
protected boolean abort(Throwable failure)
{
HttpExchange exchange = channel.getHttpExchange();
return exchange != null && exchange.getRequest().abort(failure);

View File

@ -64,27 +64,31 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res
public void receive()
{
buffer = acquireBuffer();
process(buffer);
if (buffer==null)
acquireBuffer();
process();
}
private ByteBuffer acquireBuffer()
private void acquireBuffer()
{
HttpClient client = getHttpDestination().getHttpClient();
ByteBufferPool bufferPool = client.getByteBufferPool();
return bufferPool.acquire(client.getResponseBufferSize(), true);
buffer = bufferPool.acquire(client.getResponseBufferSize(), true);
}
private void releaseBuffer(ByteBuffer buffer)
private void releaseBuffer()
{
assert this.buffer == buffer;
if (buffer==null)
throw new IllegalStateException();
if (BufferUtil.hasContent(buffer))
throw new IllegalStateException();
HttpClient client = getHttpDestination().getHttpClient();
ByteBufferPool bufferPool = client.getByteBufferPool();
bufferPool.release(buffer);
this.buffer = null;
buffer = null;
}
private void process(ByteBuffer buffer)
private void process()
{
try
{
@ -97,11 +101,11 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res
{
if (LOG.isDebugEnabled())
LOG.debug("{} closed", connection);
releaseBuffer(buffer);
releaseBuffer();
return;
}
if (!parse(buffer))
if (parse())
return;
int read = endPoint.fill(buffer);
@ -110,18 +114,18 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res
if (read > 0)
{
if (!parse(buffer))
if (parse())
return;
}
else if (read == 0)
{
releaseBuffer(buffer);
releaseBuffer();
fillInterested();
return;
}
else
{
releaseBuffer(buffer);
releaseBuffer();
shutdown();
return;
}
@ -131,19 +135,19 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res
{
if (LOG.isDebugEnabled())
LOG.debug(x);
releaseBuffer(buffer);
BufferUtil.clear(buffer);
if (buffer!=null)
releaseBuffer();
failAndClose(x);
}
}
/**
* Parses a HTTP response from the given {@code buffer}.
* Parses a HTTP response in the receivers buffer.
*
* @param buffer the response bytes
* @return true to indicate that the parsing may proceed (for example with another response),
* false to indicate that the parsing should be interrupted (and will be resumed by another thread).
* @return true to indicate that parsing should be interrupted (and will be resumed by another thread).
*/
private boolean parse(ByteBuffer buffer)
private boolean parse()
{
// Must parse even if the buffer is fully consumed, to allow the
// parser to advance from asynchronous content to response complete.
@ -151,13 +155,7 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res
if (LOG.isDebugEnabled())
LOG.debug("Parsed {}, remaining {} {}", handle, buffer.remaining(), parser);
if (!handle)
return true;
// If the parser returns true, we need to differentiate two cases:
// A) the response is completed, so the parser is in START state;
// B) the content is handled asynchronously, so the parser is in CONTENT state.
return parser.isStart();
return handle;
}
protected void fillInterested()
@ -241,7 +239,7 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res
{
if (LOG.isDebugEnabled())
LOG.debug("Content consumed asynchronously, resuming processing");
process(getResponseBuffer());
process();
}
public void abort(Throwable x)
@ -257,11 +255,9 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res
public boolean messageComplete()
{
HttpExchange exchange = getHttpExchange();
if (exchange == null)
return false;
responseSuccess(exchange);
return true;
if (exchange != null)
responseSuccess(exchange);
return false;
}
@Override

View File

@ -939,25 +939,24 @@ public class HttpParser
case EOF_CONTENT:
setState(State.EOF_CONTENT);
handle=_handler.headerComplete()||handle;
break;
return handle;
case CHUNKED_CONTENT:
setState(State.CHUNKED_CONTENT);
handle=_handler.headerComplete()||handle;
break;
return handle;
case NO_CONTENT:
handle=_handler.headerComplete()||handle;
setState(State.END);
handle=_handler.messageComplete()||handle;
break;
return handle;
default:
setState(State.CONTENT);
handle=_handler.headerComplete()||handle;
break;
return handle;
}
break;
}
default:
@ -1191,8 +1190,7 @@ public class HttpParser
if (_responseStatus>0 && _headResponse)
{
setState(State.END);
if (_handler.messageComplete())
return true;
return _handler.messageComplete();
}
else
{
@ -1332,8 +1330,7 @@ public class HttpParser
if (content == 0)
{
setState(State.END);
if (_handler.messageComplete())
return true;
return _handler.messageComplete();
}
}
@ -1357,8 +1354,7 @@ public class HttpParser
if (content == 0)
{
setState(State.END);
if (_handler.messageComplete())
return true;
return _handler.messageComplete();
}
else
{
@ -1381,8 +1377,7 @@ public class HttpParser
if(_contentPosition == _contentLength)
{
setState(State.END);
if (_handler.messageComplete())
return true;
return _handler.messageComplete();
}
}
break;
@ -1411,8 +1406,7 @@ public class HttpParser
if (_chunkLength == 0)
{
setState(State.END);
if (_handler.messageComplete())
return true;
return _handler.messageComplete();
}
else
setState(State.CHUNK);
@ -1432,8 +1426,7 @@ public class HttpParser
if (_chunkLength == 0)
{
setState(State.END);
if (_handler.messageComplete())
return true;
return _handler.messageComplete();
}
else
setState(State.CHUNK);

View File

@ -53,7 +53,7 @@ import org.junit.Test;
public class SslConnectionTest
{
private static SslContextFactory __sslCtxFactory=new SslContextFactory();
private static ByteBufferPool __byteBufferPool = new MappedByteBufferPool();
private static ByteBufferPool __byteBufferPool = new LeakTrackingByteBufferPool(new MappedByteBufferPool());
protected volatile EndPoint _lastEndp;
private volatile boolean _testFill=true;

View File

@ -28,17 +28,24 @@ import java.net.SocketException;
import java.net.URI;
import java.security.KeyStore;
import java.util.Arrays;
import java.util.concurrent.Executor;
import javax.net.ssl.HttpsURLConnection;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManagerFactory;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.LeakTrackingByteBufferPool;
import org.eclipse.jetty.io.MappedByteBufferPool;
import org.eclipse.jetty.io.ssl.SslConnection;
import org.eclipse.jetty.server.AbstractConnectionFactory;
import org.eclipse.jetty.server.HttpConnectionFactory;
import org.eclipse.jetty.server.HttpServerTestBase;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.toolchain.test.OS;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.Scheduler;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Ignore;
@ -114,8 +121,10 @@ public class SelectChannelServerSslTest extends HttpServerTestBase
sslContextFactory.setKeyManagerPassword("keypwd");
sslContextFactory.setTrustStorePath(keystorePath);
sslContextFactory.setTrustStorePassword("storepwd");
ServerConnector connector = new ServerConnector(_server, 1, 1, sslContextFactory);
ByteBufferPool pool = new LeakTrackingByteBufferPool(new MappedByteBufferPool());
ServerConnector connector = new ServerConnector(_server,(Executor)null,(Scheduler)null,pool, 1, 1, AbstractConnectionFactory.getFactories(sslContextFactory,new HttpConnectionFactory()));
startServer(connector);
KeyStore keystore = KeyStore.getInstance(KeyStore.getDefaultType());

View File

@ -18,7 +18,6 @@
package org.eclipse.jetty.util.thread;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;