jetty-9 work in progress on HttpConnection to HttpChannel split

This commit is contained in:
Greg Wilkins 2012-03-23 16:29:38 +11:00
parent dc29d73d0b
commit e77f4344ac
20 changed files with 595 additions and 1590 deletions

View File

@ -137,7 +137,13 @@ public class HttpGenerator
{
return _state == state;
}
/* ------------------------------------------------------------ */
public boolean isIdle()
{
return _state == State.START;
}
/* ------------------------------------------------------------ */
public boolean isComplete()
{

View File

@ -134,6 +134,12 @@ public class HttpParser
{
return _state.ordinal() < State.END.ordinal();
}
/* ------------------------------------------------------------------------------- */
public boolean isInContent()
{
return _content!=Content.NO_CONTENT && _content!=Content.UNKNOWN_CONTENT;
}
/* ------------------------------------------------------------------------------- */
public boolean isChunking()
@ -200,7 +206,7 @@ public class HttpParser
/* ------------------------------------------------------------------------------- */
/**
* Parse until next Event.
* @return an indication of progress
* @return True if an {@link RequestHandler} method was called and it returned true;
*/
public boolean parseNext(ByteBuffer buffer) throws IOException
{
@ -981,5 +987,4 @@ public class HttpParser
}
}

View File

@ -15,6 +15,7 @@ package org.eclipse.jetty.io;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.GatheringByteChannel;
import org.eclipse.jetty.util.BufferUtil;
@ -215,23 +216,24 @@ public class ByteArrayEndPoint implements ConnectedEndPoint
/*
* @see org.eclipse.io.EndPoint#flush(org.eclipse.io.Buffer, org.eclipse.io.Buffer, org.eclipse.io.Buffer)
*/
public int flush(ByteBuffer header, ByteBuffer buffer) throws IOException
public int gather(ByteBuffer... buffers) throws IOException
{
if (_closed)
throw new IOException("CLOSED");
int flushed=0;
if (header!=null && header.remaining()>0)
flushed=flush(header);
if (header==null || header.remaining()==0)
int len=0;
for (ByteBuffer b : buffers)
{
if (buffer!=null && buffer.remaining()>0)
flushed+=flush(buffer);
if (b.hasRemaining())
{
int l=flush(b);
if (l>0)
len+=l;
else
break;
}
}
return flushed;
return len;
}
/* ------------------------------------------------------------ */

View File

@ -45,7 +45,7 @@ public interface Connection
boolean isIdle();
boolean isSuspended();
boolean isReadInterested();
/**
* Called when the connection is closed

View File

@ -72,11 +72,10 @@ public interface EndPoint
* are taken from the header/buffer position up until the buffer limit. The header/buffers position
* is updated to indicate how many bytes have been consumed.
*
* @param buffer The buffer to flush. This buffers position is updated if it is not read only.
* @return the number of bytes written
* @throws EofException If the endpoint is closed or output is shutdown.
*/
int flush(ByteBuffer header, ByteBuffer buffer) throws IOException;
int gather(ByteBuffer... buffer) throws IOException;
/* ------------------------------------------------------------ */

View File

@ -245,23 +245,27 @@ public class ChannelEndPoint implements EndPoint
/* (non-Javadoc)
* @see org.eclipse.io.EndPoint#flush(org.eclipse.io.Buffer, org.eclipse.io.Buffer, org.eclipse.io.Buffer)
*/
public int flush(ByteBuffer header, ByteBuffer buffer) throws IOException
public int gather(ByteBuffer... buffers) throws IOException
{
int len;
if (buffer==null||buffer.remaining()==0)
len=flush(header);
else if (header==null||header.remaining()==0)
len=flush(buffer);
else if (_channel instanceof GatheringByteChannel)
len= (int)((GatheringByteChannel)_channel).write(new ByteBuffer[]{header,buffer},0,2);
int len=0;
if (_channel instanceof GatheringByteChannel)
{
len= (int)((GatheringByteChannel)_channel).write(buffers,0,2);
}
else
{
len=flush(header);
if (header.remaining()==0)
len+=flush(buffer);
for (ByteBuffer b : buffers)
{
if (b.hasRemaining())
{
int l=_channel.write(b);
if (l>0)
len+=l;
else
break;
}
}
}
return len;
}

View File

@ -53,14 +53,20 @@ public class NetworkTrafficSelectChannelEndPoint extends SelectChannelEndPoint
}
@Override
public int flush(ByteBuffer header, ByteBuffer buffer) throws IOException
public int gather(ByteBuffer... buffers) throws IOException
{
int headerPosition = header.position();
int headerLength = header.remaining();
int bufferPosition = buffer.position();
int written = super.flush(header, buffer);
notifyOutgoing(header, headerPosition, written > headerLength ? headerLength : written);
notifyOutgoing(buffer, bufferPosition, written > headerLength ? written - headerLength : 0);
int written=0;
for (ByteBuffer b : buffers)
{
if (b.hasRemaining())
{
int l = flush(b);
if (l==0)
break;
else
written+=l;
}
}
return written;
}

View File

@ -314,17 +314,22 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
/* ------------------------------------------------------------ */
@Override
public int flush(ByteBuffer header, ByteBuffer buffer) throws IOException
public int gather(ByteBuffer... buffers) throws IOException
{
int l = super.flush(header, buffer);
int l = super.gather(buffers);
// If there was something to write and it wasn't written, then we are not writable.
if (l==0 && ( header!=null && header.remaining()>0 || buffer!=null && buffer.remaining()>0))
if (l==0)
{
synchronized (this)
for (ByteBuffer b: buffers)
{
if (_dispatched)
_writable=false;
if (b.hasRemaining())
{
synchronized (this)
{
if (_dispatched)
_writable=false;
}
}
}
}
else if (l>0)
@ -495,7 +500,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
int current_ops=-1;
if (getChannel().isOpen())
{
boolean read_interest = _readBlocked || (!_dispatched && !_connection.isSuspended());
boolean read_interest = _readBlocked || (!_dispatched && _connection.isReadInterested());
boolean write_interest= _writeBlocked || (!_dispatched && !_writable);
_interestOps =

View File

@ -229,18 +229,19 @@ public class SslConnection extends AbstractConnection implements AsyncConnection
/* ------------------------------------------------------------ */
public boolean isIdle()
{
return false;
return _connection.isIdle();
}
/* ------------------------------------------------------------ */
public boolean isSuspended()
public boolean isReadInterested()
{
return false;
return _connection.isReadInterested();
}
/* ------------------------------------------------------------ */
public void onClose()
{
_connection.onClose();
}
/* ------------------------------------------------------------ */
@ -622,13 +623,21 @@ public class SslConnection extends AbstractConnection implements AsyncConnection
return size-buffer.remaining();
}
public int flush(ByteBuffer header, ByteBuffer buffer) throws IOException
public int gather(ByteBuffer... buffers) throws IOException
{
if (BufferUtil.hasContent(header))
return flush(header);
if (BufferUtil.hasContent(buffer))
return flush(buffer);
return 0;
int len=0;
for (ByteBuffer b : buffers)
{
if (b.hasRemaining())
{
int l=flush(b);
if (l>0)
len+=l;
else
break;
}
}
return len;
}
public boolean blockReadable(long millisecs) throws IOException

View File

@ -157,9 +157,10 @@ public class SelectChannelEndPointTest
return false;
}
public boolean isSuspended()
@Override
public boolean isReadInterested()
{
return false;
return true;
}
public void onClose()

View File

@ -77,7 +77,7 @@ public class AsyncContinuation implements AsyncContext, Continuation
};
/* ------------------------------------------------------------ */
protected ServerConnection _connection;
protected HttpChannel _connection;
private List<AsyncListener> _lastAsyncListeners;
private List<AsyncListener> _asyncListeners;
private List<ContinuationListener> _continuationListeners;
@ -101,7 +101,7 @@ public class AsyncContinuation implements AsyncContext, Continuation
}
/* ------------------------------------------------------------ */
protected void setConnection(final ServerConnection connection)
protected void setConnection(final HttpChannel connection)
{
synchronized(this)
{

View File

@ -1,296 +0,0 @@
// ========================================================================
// Copyright (c) 2006-2011 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
package org.eclipse.jetty.server;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.nio.ByteBuffer;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.eclipse.jetty.http.HttpException;
import org.eclipse.jetty.http.HttpGenerator;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.HttpGenerator.Action;
import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.io.nio.AsyncConnection;
import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
/* ------------------------------------------------------------ */
/** Asychronous Server HTTP connection
*
*/
public class AsyncHttpConnection extends AbstractHttpConnection implements AsyncConnection
{
private final static int NO_PROGRESS_INFO = Integer.getInteger("org.mortbay.jetty.NO_PROGRESS_INFO",100);
private final static int NO_PROGRESS_CLOSE = Integer.getInteger("org.mortbay.jetty.NO_PROGRESS_CLOSE",200);
private static final Logger LOG = Log.getLogger(AsyncHttpConnection.class);
private int _total_no_progress;
private final AsyncEndPoint _asyncEndp;
public AsyncHttpConnection(Connector connector, EndPoint endpoint, Server server)
{
super(connector,endpoint,server);
_asyncEndp=(AsyncEndPoint)endpoint;
}
@Override
public Connection handle() throws IOException
{
Connection connection = this;
boolean some_progress=false;
boolean progress=true;
try
{
setCurrentConnection(this);
// don't check for idle while dispatched (unless blocking IO is done).
_asyncEndp.setCheckForIdle(false);
// While progress and the connection has not changed
while (progress && connection==this)
{
progress=false;
try
{
// Handle resumed request
if (_request._async.isAsync())
{
if (_request._async.isDispatchable())
handleRequest();
}
// else Parse more input
else if (!_parser.isComplete() && _parser.parseAvailable())
progress=true;
// Generate more output
if (_generator.isCommitted() && !_generator.isComplete() && !_endp.isOutputShutdown())
if (_generator.flushBuffer()>0)
progress=true;
// Flush output
_endp.flush();
// Has any IO been done by the endpoint itself since last loop
if (_asyncEndp.hasProgressed())
progress=true;
}
catch (HttpException e)
{
if (LOG.isDebugEnabled())
{
LOG.debug("uri="+_uri);
LOG.debug("fields="+_requestFields);
LOG.debug(e);
}
progress=true;
_generator.sendError(e.getStatus(), e.getReason(), null, true);
}
finally
{
some_progress|=progress;
// Is this request/response round complete and are fully flushed?
if (_parser.isComplete() && _generator.isComplete())
{
// Reset the parser/generator
progress=true;
// look for a switched connection instance?
if (_response.getStatus()==HttpStatus.SWITCHING_PROTOCOLS_101)
{
Connection switched=(Connection)_request.getAttribute("org.eclipse.jetty.io.Connection");
if (switched!=null)
connection=switched;
}
reset();
// TODO Is this still required?
if (!_generator.isPersistent() && !_endp.isOutputShutdown())
{
LOG.warn("Safety net oshut!!! IF YOU SEE THIS, PLEASE RAISE BUGZILLA");
_endp.shutdownOutput();
}
}
else if (_request.getAsyncContinuation().isAsyncStarted())
{
// The request is suspended, so even though progress has been made,
// exit the while loop by setting progress to false
LOG.debug("suspended {}",this);
progress=false;
}
}
}
}
finally
{
setCurrentConnection(null);
// If we are not suspended
if (!_request.getAsyncContinuation().isAsyncStarted())
{
// return buffers
_parser.returnBuffers();
_generator.returnBuffers();
// reenable idle checking unless request is suspended
_asyncEndp.setCheckForIdle(true);
}
// Safety net to catch spinning
if (some_progress)
_total_no_progress=0;
else
{
_total_no_progress++;
if (NO_PROGRESS_INFO>0 && _total_no_progress%NO_PROGRESS_INFO==0 && (NO_PROGRESS_CLOSE<=0 || _total_no_progress< NO_PROGRESS_CLOSE))
LOG.info("EndPoint making no progress: "+_total_no_progress+" "+_endp+" "+this);
if (NO_PROGRESS_CLOSE>0 && _total_no_progress==NO_PROGRESS_CLOSE)
{
LOG.warn("Closing EndPoint making no progress: "+_total_no_progress+" "+_endp+" "+this);
if (_endp instanceof SelectChannelEndPoint)
((SelectChannelEndPoint)_endp).getChannel().close();
}
}
}
return connection;
}
public void onInputShutdown() throws IOException
{
// If we don't have a committed response and we are not suspended
if (_generator.isIdle() && !_request.getAsyncContinuation().isSuspended())
{
// then no more can happen, so close.
_endp.close();
}
// Make idle parser seek EOF
if (_parser.isIdle())
_parser.setPersistent(false);
}
ByteBuffer header=null;
ByteBuffer chunk=null;
ByteBuffer buffer=null;
/* ------------------------------------------------------------ */
private void write(ByteBuffer content) throws IOException
{
if (!_generator.isComplete())
throw new EofException();
try
{
while(BufferUtil.hasContent(content))
{
// Generate
Action action=BufferUtil.hasContent(content)?null:Action.COMPLETE;
/* System.err.printf("generate(%s,%s,%s,%s,%s)@%s%n",
BufferUtil.toSummaryString(header),
BufferUtil.toSummaryString(chunk),
BufferUtil.toSummaryString(buffer),
BufferUtil.toSummaryString(content),
action,gen.getState());*/
HttpGenerator.Result result=_generator.generate(header,chunk,buffer,content,action);
/*System.err.printf("%s (%s,%s,%s,%s,%s)@%s%n",
result,
BufferUtil.toSummaryString(header),
BufferUtil.toSummaryString(chunk),
BufferUtil.toSummaryString(buffer),
BufferUtil.toSummaryString(content),
action,gen.getState());*/
switch(result)
{
case NEED_HEADER:
header=BufferUtil.allocate(2048);
break;
case NEED_BUFFER:
buffer=BufferUtil.allocate(8192);
break;
case NEED_CHUNK:
header=null;
chunk=BufferUtil.allocate(HttpGenerator.CHUNK_SIZE);
break;
case FLUSH:
{
Future<Integer> future = getEndPoint().flush(header,chunk,buffer);
future.get(getMaxIdleTime(),TimeUnit.MILLISECONDS);
break;
}
case FLUSH_CONTENT:
{
Future<Integer> future = getEndPoint().flush(header,chunk,content);
future.get(getMaxIdleTime(),TimeUnit.MILLISECONDS);
break;
}
case OK:
break;
case SHUTDOWN_OUT:
getEndPoint().shutdownOutput();
break;
}
}
}
catch(final TimeoutException e)
{
throw new InterruptedIOException(e.toString())
{
{
this.initCause(e);
}
};
}
catch (final InterruptedException e)
{
throw new InterruptedIOException(e.toString())
{
{
this.initCause(e);
}
};
}
catch (final ExecutionException e)
{
throw new IOException(e.toString())
{
{
this.initCause(e);
}
};
}
}
}

View File

@ -36,6 +36,7 @@ import org.eclipse.jetty.http.HttpHeaderValue;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpParser;
import org.eclipse.jetty.http.HttpParser.RequestHandler;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.HttpURI;
import org.eclipse.jetty.http.HttpVersion;
@ -62,11 +63,11 @@ import org.eclipse.jetty.util.thread.Timeout.Task;
/**
*
*/
public abstract class ServerConnection
public class HttpChannel
{
private static final Logger LOG = Log.getLogger(ServerConnection.class);
private static final Logger LOG = Log.getLogger(HttpChannel.class);
private static final ThreadLocal<ServerConnection> __currentConnection = new ThreadLocal<ServerConnection>();
private static final ThreadLocal<HttpChannel> __currentConnection = new ThreadLocal<HttpChannel>();
private int _requests;
@ -86,8 +87,6 @@ public abstract class ServerConnection
int _include;
private Object _associatedObject; // associated object
private HttpVersion _version = HttpVersion.HTTP_1_1;
private boolean _expect = false;
@ -96,13 +95,13 @@ public abstract class ServerConnection
private boolean _host = false;
/* ------------------------------------------------------------ */
public static ServerConnection getCurrentConnection()
public static HttpChannel getCurrentConnection()
{
return __currentConnection.get();
}
/* ------------------------------------------------------------ */
protected static void setCurrentConnection(ServerConnection connection)
protected static void setCurrentConnection(HttpChannel connection)
{
__currentConnection.set(connection);
}
@ -111,7 +110,7 @@ public abstract class ServerConnection
/** Constructor
*
*/
public ServerConnection(Server server)
public HttpChannel(Server server)
{
_uri = new HttpURI(URIUtil.__CHARSET);
_requestFields = new HttpFields();
@ -136,23 +135,11 @@ public abstract class ServerConnection
{
return _server;
}
/* ------------------------------------------------------------ */
/**
* @return Returns the associatedObject.
*/
public Object getAssociatedObject()
public AsyncContinuation getAsyncContinuation()
{
return _associatedObject;
}
/* ------------------------------------------------------------ */
/**
* @param associatedObject The associatedObject to set.
*/
public void setAssociatedObject(Object associatedObject)
{
_associatedObject = associatedObject;
return _async;
}
/* ------------------------------------------------------------ */
@ -219,7 +206,7 @@ public abstract class ServerConnection
}
if (_in == null)
_in = new HttpInput(ServerConnection.this);
_in = new HttpInput(HttpChannel.this);
return _in;
}
@ -382,7 +369,6 @@ public abstract class ServerConnection
if (_async.isUncompleted())
{
_async.doComplete(async_exception);
if (_expect100Continue)
@ -397,16 +383,16 @@ public abstract class ServerConnection
setPersistent(false);
}
if (error)
setPersistent(false);
else if (!_response.isCommitted() && !_request.isHandled())
_response.sendError(HttpServletResponse.SC_NOT_FOUND);
if (error)
setPersistent(false);
else if (!_response.isCommitted() && !_request.isHandled())
_response.sendError(HttpServletResponse.SC_NOT_FOUND);
_response.complete();
if (isPersistent())
persist();
_response.complete();
if (isPersistent())
persist();
_request.setHandled(true);
_request.setHandled(true);
}
}
}
@ -661,7 +647,7 @@ public abstract class ServerConnection
{
Output()
{
super(ServerConnection.this);
super(HttpChannel.this);
}
/* ------------------------------------------------------------ */
@ -743,7 +729,7 @@ public abstract class ServerConnection
{
OutputWriter()
{
super(ServerConnection.this._out);
super(HttpChannel.this._out);
}
}
@ -920,4 +906,55 @@ public abstract class ServerConnection
return false;
}
public HttpParser.RequestHandler getRequestHandler()
{
return _handler;
}
public HttpGenerator.ResponseInfo getResponseInfo()
{
return _info;
}
private final RequestHandler _handler = new RequestHandler();
private final HttpGenerator.ResponseInfo _info = new HttpGenerator.ResponseInfo()
{
@Override
public HttpVersion getHttpVersion()
{
return getRequest().getHttpVersion();
}
@Override
public HttpFields getHttpFields()
{
return _responseFields;
}
@Override
public long getContentLength()
{
return _response.getLongContentLength();
}
@Override
public boolean isHead()
{
return getRequest().isHead();
}
@Override
public int getStatus()
{
return _response.getStatus();
}
@Override
public String getReason()
{
return _response.getReason();
}
};
}

View File

@ -0,0 +1,411 @@
// ========================================================================
// Copyright (c) 2004-2011 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
package org.eclipse.jetty.server;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.io.PrintWriter;
import java.nio.ByteBuffer;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.servlet.DispatcherType;
import javax.servlet.ServletInputStream;
import javax.servlet.ServletOutputStream;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.continuation.ContinuationThrowable;
import org.eclipse.jetty.http.HttpBuffers;
import org.eclipse.jetty.http.HttpContent;
import org.eclipse.jetty.http.HttpException;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpGenerator;
import org.eclipse.jetty.http.HttpHeaderValue;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpParser;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.HttpURI;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MimeTypes;
import org.eclipse.jetty.http.HttpGenerator.Action;
import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.io.Buffers;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.io.RuntimeIOException;
import org.eclipse.jetty.io.UncheckedPrintWriter;
import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
import org.eclipse.jetty.server.nio.NIOConnector;
import org.eclipse.jetty.server.ssl.SslConnector;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.QuotedStringTokenizer;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.URIUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.resource.Resource;
/**
*/
public abstract class HttpConnection extends AbstractConnection
{
private static final Logger LOG = Log.getLogger(HttpConnection.class);
private static final ThreadLocal<HttpConnection> __currentConnection = new ThreadLocal<HttpConnection>();
private final AsyncEndPoint _asyncEndp;
private final Server _server;
private final Connector _connector;
private final HttpParser _parser;
private final HttpGenerator _generator;
private final HttpChannel _channel;
ByteBuffer _requestBuffer=null;
ByteBuffer _responseHeader=null;
ByteBuffer _chunk=null;
ByteBuffer _responseBuffer=null;
/* ------------------------------------------------------------ */
public static HttpConnection getCurrentConnection()
{
return __currentConnection.get();
}
/* ------------------------------------------------------------ */
protected static void setCurrentConnection(HttpConnection connection)
{
__currentConnection.set(connection);
}
/* ------------------------------------------------------------ */
/** Constructor
*
*/
public HttpConnection(Connector connector, EndPoint endpoint, Server server)
{
super(endpoint);
_connector = connector;
_server = server;
_asyncEndp=(AsyncEndPoint)endpoint;
_channel = new HttpChannel(server);
_parser = new HttpParser(_channel.getRequestHandler());
_generator = new HttpGenerator(_channel.getResponseInfo());
}
/* ------------------------------------------------------------ */
/**
* @return the parser used by this connection
*/
public HttpParser getParser()
{
return _parser;
}
/* ------------------------------------------------------------ */
public Server getServer()
{
return _server;
}
/* ------------------------------------------------------------ */
/**
* @return Returns the connector.
*/
public Connector getConnector()
{
return _connector;
}
/* ------------------------------------------------------------ */
public void reset()
{
_parser.reset();
_generator.reset();
_channel.reset();
if (_requestBuffer!=null)
_connector.getResponseBuffers().returnBuffer(_requestBuffer);
_requestBuffer=null;
if (_responseHeader!=null)
_connector.getResponseBuffers().returnBuffer(_responseHeader);
_responseHeader=null;
if (_responseBuffer!=null)
_connector.getResponseBuffers().returnBuffer(_responseBuffer);
_responseBuffer=null;
if (_chunk!=null)
_connector.getResponseBuffers().returnBuffer(_chunk);
_chunk=null;
}
/* ------------------------------------------------------------ */
public HttpGenerator getGenerator()
{
return _generator;
}
/* ------------------------------------------------------------ */
public boolean isIdle()
{
return _parser.isIdle() && _generator.isIdle();
}
/* ------------------------------------------------------------ */
public boolean isReadInterested()
{
return !_channel.getAsyncContinuation().isSuspended() && !_parser.isComplete();
}
/* ------------------------------------------------------------ */
public int getMaxIdleTime()
{
if (_connector.isLowResources() && _endp.getMaxIdleTime()==_connector.getMaxIdleTime())
return _connector.getLowResourceMaxIdleTime();
if (_endp.getMaxIdleTime()>0)
return _endp.getMaxIdleTime();
return _connector.getMaxIdleTime();
}
/* ------------------------------------------------------------ */
public String toString()
{
return String.format("%s,g=%s,p=%s",
super.toString(),
_generator,
_parser);
}
/* ------------------------------------------------------------ */
@Override
public Connection handle() throws IOException
{
Connection connection = this;
boolean progress=true;
try
{
setCurrentConnection(this);
// don't check for idle while dispatched (unless blocking IO is done).
_asyncEndp.setCheckForIdle(false);
// While progress and the connection has not changed
while (progress && connection==this)
{
progress=false;
try
{
// Shall we try some reading
if (isReadInterested())
{
// We will need a buffer to read into
if (_requestBuffer==null)
_requestBuffer=_parser.isInContent()
?_connector.getRequestBuffers().getBuffer()
:_connector.getRequestBuffers().getHeader();
}
// If we parse to an event, call the connection
if (BufferUtil.hasContent(_requestBuffer) && _parser.parseNext(_requestBuffer))
_channel.handleRequest();
}
catch (HttpException e)
{
progress=true;
_channel.sendError(e.getStatus(), e.getReason(), null, true);
}
finally
{
// Return empty request buffer
if (_requestBuffer!=null && !_requestBuffer.hasRemaining())
{
_connector.getRequestBuffers().returnBuffer(_requestBuffer);
_requestBuffer=null;
}
// Is this request/response round complete and are fully flushed?
if (_parser.isComplete() && _generator.isComplete())
{
// look for a switched connection instance?
if (_channel.getResponse().getStatus()==HttpStatus.SWITCHING_PROTOCOLS_101)
{
Connection switched=(Connection)_channel.getRequest().getAttribute("org.eclipse.jetty.io.Connection");
if (switched!=null)
connection=switched;
}
// Reset the parser/generator
reset();
progress=true;
}
else if (_channel.getRequest().getAsyncContinuation().isAsyncStarted())
{
// The request is suspended, so even though progress has been made,
// exit the while loop by setting progress to false
LOG.debug("suspended {}",this);
progress=false;
}
}
}
}
finally
{
setCurrentConnection(null);
// If we are not suspended
if (!_channel.getRequest().getAsyncContinuation().isAsyncStarted())
{
// reenable idle checking unless request is suspended
_asyncEndp.setCheckForIdle(true);
}
}
return connection;
}
/* ------------------------------------------------------------ */
private void write(ByteBuffer content, Action action) throws IOException
{
if (!_generator.isComplete())
throw new EofException();
try
{
while(BufferUtil.hasContent(content))
{
/* System.err.printf("generate(%s,%s,%s,%s,%s)@%s%n",
BufferUtil.toSummaryString(header),
BufferUtil.toSummaryString(chunk),
BufferUtil.toSummaryString(buffer),
BufferUtil.toSummaryString(content),
action,gen.getState());*/
HttpGenerator.Result result=_generator.generate(_responseHeader,_chunk,_responseBuffer,content,action);
/*System.err.printf("%s (%s,%s,%s,%s,%s)@%s%n",
result,
BufferUtil.toSummaryString(header),
BufferUtil.toSummaryString(chunk),
BufferUtil.toSummaryString(buffer),
BufferUtil.toSummaryString(content),
action,gen.getState());*/
switch(result)
{
case NEED_HEADER:
_responseHeader=_connector.getResponseBuffers().getHeader();
break;
case NEED_BUFFER:
_responseBuffer=_connector.getResponseBuffers().getBuffer();
_responseBuffer=BufferUtil.allocate(8192);
break;
case NEED_CHUNK:
_responseHeader=null;
_chunk=_connector.getResponseBuffers().getBuffer(HttpGenerator.CHUNK_SIZE);
break;
case FLUSH:
{
Future<Integer> future = getEndPoint().flush(_responseHeader,_chunk,_responseBuffer);
future.get(getMaxIdleTime(),TimeUnit.MILLISECONDS);
break;
}
case FLUSH_CONTENT:
{
Future<Integer> future = getEndPoint().flush(_responseHeader,_chunk,content);
future.get(getMaxIdleTime(),TimeUnit.MILLISECONDS);
break;
}
case OK:
break;
case SHUTDOWN_OUT:
getEndPoint().shutdownOutput();
break;
}
switch(action)
{
case COMPLETE: action=Action.PREPARE; break;
case FLUSH: action=Action.FLUSH; break;
case PREPARE: action=Action.PREPARE; break;
}
}
}
catch(final TimeoutException e)
{
throw new InterruptedIOException(e.toString())
{
{
this.initCause(e);
}
};
}
catch (final InterruptedException e)
{
throw new InterruptedIOException(e.toString())
{
{
this.initCause(e);
}
};
}
catch (final ExecutionException e)
{
throw new IOException(e.toString())
{
{
this.initCause(e);
}
};
}
}
/* ------------------------------------------------------------ */
public void onClose()
{
_channel.onClose();
}
/* ------------------------------------------------------------ */
public void onInputShutdown() throws IOException
{
// If we don't have a committed response and we are not suspended
if (_generator.isIdle() && !_channel.getRequest().getAsyncContinuation().isSuspended())
{
// then no more can happen, so close.
_endp.close();
}
// Make idle parser seek EOF
if (_parser.isIdle())
_parser.setPersistent(false);
}
}

View File

@ -24,11 +24,11 @@ import org.eclipse.jetty.util.BufferUtil;
public class HttpInput extends ServletInputStream
{
protected final ServerConnection _connection;
protected final HttpChannel _connection;
private ByteBuffer _content;
/* ------------------------------------------------------------ */
public HttpInput(ServerConnection connection)
public HttpInput(HttpChannel connection)
{
_connection=connection;
}

View File

@ -44,7 +44,7 @@ import org.eclipse.jetty.util.ByteArrayOutputStream2;
*/
public class HttpOutput extends ServletOutputStream
{
private final ServerConnection _connection;
private final HttpChannel _connection;
private boolean _closed;
// These are held here for reuse by Writer
@ -54,7 +54,7 @@ public class HttpOutput extends ServletOutputStream
ByteArrayOutputStream2 _bytes;
/* ------------------------------------------------------------ */
public HttpOutput(ServerConnection connection)
public HttpOutput(HttpChannel connection)
{
_connection=connection;
}

View File

@ -126,7 +126,7 @@ public class Request implements HttpServletRequest
private static final Collection<Locale> __defaultLocale = Collections.singleton(Locale.getDefault());
private static final int __NONE = 0, _STREAM = 1, __READER = 2;
private ServerConnection _connection;
private HttpChannel _connection;
private HttpFields _fields;
private final AsyncContinuation _async = new AsyncContinuation();
@ -176,7 +176,7 @@ public class Request implements HttpServletRequest
/* ------------------------------------------------------------ */
public Request(ServerConnection connection)
public Request(HttpChannel connection)
{
_connection = connection;
_fields=_connection.getRequestFields();
@ -400,7 +400,7 @@ public class Request implements HttpServletRequest
/**
* @return Returns the connection.
*/
public ServerConnection getConnection()
public HttpChannel getConnection()
{
return _connection;
}

View File

@ -68,7 +68,7 @@ public class Response implements HttpServletResponse
*/
public final static String HTTP_ONLY_COMMENT="__HTTP_ONLY__";
private final ServerConnection _connection;
private final HttpChannel _connection;
private final HttpFields _fields;
private int _status=SC_OK;
private String _reason;
@ -80,53 +80,13 @@ public class Response implements HttpServletResponse
private PrintWriter _writer;
private long _contentLength;
private final HttpGenerator.ResponseInfo _info = new HttpGenerator.ResponseInfo()
{
@Override
public HttpVersion getHttpVersion()
{
return _connection.getRequest().getHttpVersion();
}
@Override
public HttpFields getHttpFields()
{
return _fields;
}
@Override
public long getContentLength()
{
return _contentLength;
}
@Override
public boolean isHead()
{
return _connection.getRequest().isHead();
}
@Override
public int getStatus()
{
return _status;
}
@Override
public String getReason()
{
return _reason;
}
};
/* ------------------------------------------------------------ */
/**
*
*/
public Response(ServerConnection connection)
public Response(HttpChannel connection)
{
_connection=connection;
_fields=connection.getResponseFields();
@ -804,6 +764,12 @@ public class Response implements HttpServletResponse
}
}
/* ------------------------------------------------------------ */
public long getLongContentLength()
{
return _contentLength;
}
/* ------------------------------------------------------------ */
/*
* @see javax.servlet.ServletResponse#setContentLength(int)
@ -1100,5 +1066,4 @@ public class Response implements HttpServletResponse
}
}
}

View File

@ -335,7 +335,7 @@ public class Server extends HandlerWrapper implements Attributes
* or after the entire request has been received (for short requests of known length), or
* on the dispatch of an async request.
*/
public void handle(ServerConnection connection) throws IOException, ServletException
public void handle(HttpChannel connection) throws IOException, ServletException
{
final String target=connection.getRequest().getPathInfo();
final Request request=connection.getRequest();
@ -357,7 +357,7 @@ public class Server extends HandlerWrapper implements Attributes
* or after the entire request has been received (for short requests of known length), or
* on the dispatch of an async request.
*/
public void handleAsync(ServerConnection connection) throws IOException, ServletException
public void handleAsync(HttpChannel connection) throws IOException, ServletException
{
final AsyncContinuation async = connection.getRequest().getAsyncContinuation();
final AsyncContinuation.AsyncEventState state = async.getAsyncEventState();