Jetty9 - Small cleanups.

This commit is contained in:
Simone Bordet 2012-08-22 11:52:50 +02:00
parent 783a296bc5
commit c6094c2398
5 changed files with 49 additions and 127 deletions

View File

@ -295,89 +295,14 @@ public class HttpChannel implements HttpParser.RequestHandler
}
catch (IOException x)
{
x.printStackTrace(); // TODO
// We cannot write the response, so there is no point in calling
// response.sendError() since that writes, and we already know we cannot write.
LOG.debug("Could not write response", x);
}
return true;
}
// TODO: remove this method
protected void completed()
{
/*
// This method is called by handle() when it knows that its handling of the request/response cycle
// is complete.
// This may happen in the original thread dispatched to the connection that has called handle(),
// or it may be from a thread dispatched to call handle() as the result of a resumed suspended request.
LOG.debug("{} complete", this);
// Handle connection upgrades
if (_response.getStatus() == HttpStatus.SWITCHING_PROTOCOLS_101)
{
Connection connection = (Connection)getRequest().getAttribute(HttpConnection.UPGRADE_CONNECTION_ATTRIBUTE);
if (connection != null)
{
LOG.debug("Upgrade from {} to {}", this, connection);
getEndPoint().setConnection(connection);
// HttpConnection.this.reset(); // TODO: this should be done by the connection privately when handle returns
return;
}
}
// Reset everything for the next cycle.
// HttpConnection.this.reset(); // TODO: this should be done by the connection privately when handle returns
// are called from non connection thread (ie dispatched from a resume)
if (getCurrentConnection()!=HttpConnection.this)
{
if (_parser.isStart())
{
// it wants to eat more
if (_requestBuffer==null)
fillInterested();
else if (getConnector().isStarted())
{
LOG.debug("{} pipelined",this);
try
{
execute(this);
}
catch(RejectedExecutionException e)
{
if (getConnector().isStarted())
LOG.warn(e);
else
LOG.ignore(e);
getEndPoint().close();
}
}
else
getEndPoint().close();
}
if (_parser.isClosed()&&!getEndPoint().isOutputShutdown())
{
// TODO This is a catch all indicating some protocol handling failure
// Currently needed for requests saying they are HTTP/2.0.
// This should be removed once better error handling is in place
LOG.warn("Endpoint output not shutdown when seeking EOF");
getEndPoint().shutdownOutput();
}
}
// make sure that an oshut connection is driven towards close
// TODO this is a little ugly
if (getEndPoint().isOpen() && getEndPoint().isOutputShutdown())
{
fillInterested();
}
*/
}
/**
* <p>Sends an error 500, performing a special logic to detect whether the request is suspended,
* to avoid concurrent writes from the application.</p>
@ -485,7 +410,7 @@ public class HttpChannel implements HttpParser.RequestHandler
// TODO: is this needed ?
if (_state.isIdle())
_state.complete();
_request.getHttpInput().shutdownInput();
_request.getHttpInput().shutdown();
}
catch (IOException x)
{
@ -694,14 +619,14 @@ public class HttpChannel implements HttpParser.RequestHandler
@Override
public boolean messageComplete(long contentLength)
{
_request.getHttpInput().shutdownInput();
_request.getHttpInput().shutdown();
return true;
}
@Override
public boolean earlyEOF()
{
_request.getHttpInput().shutdownInput();
_request.getHttpInput().shutdown();
return false;
}

View File

@ -36,7 +36,7 @@ import org.eclipse.jetty.util.log.Logger;
/**
* <p>A {@link Connection} that handles the HTTP protocol.</p>
*/
public class HttpConnection extends AbstractConnection
public class HttpConnection extends AbstractConnection implements Runnable
{
public static final String UPGRADE_CONNECTION_ATTRIBUTE = "org.eclispe.jetty.server.HttpConnection.UPGRADE";
private static final Logger LOG = Log.getLogger(HttpConnection.class);
@ -228,15 +228,7 @@ public class HttpConnection extends AbstractConnection
try
{
// TODO: avoid object creation
getExecutor().execute(new Runnable()
{
@Override
public void run()
{
onFillable();
}
});
getExecutor().execute(this);
}
catch (RejectedExecutionException e)
{
@ -308,4 +300,10 @@ public class HttpConnection extends AbstractConnection
super.onOpen();
fillInterested();
}
@Override
public void run()
{
onFillable();
}
}

View File

@ -21,7 +21,6 @@ package org.eclipse.jetty.server;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.nio.ByteBuffer;
import javax.servlet.ServletInputStream;
import org.eclipse.jetty.util.ArrayQueue;
@ -34,22 +33,22 @@ import org.eclipse.jetty.util.log.Logger;
/**
* This class provides an HttpInput stream for the {@link HttpChannel}.
* The input stream holds a queue of {@link ByteBuffer}s passed to it by
* calls to {@link #content(ByteBuffer)}. This class does not copy the buffers,
* but simply holds references to them, thus the caller must organise for those
* calls to {@link #content(ByteBuffer)}. This class does not copy the buffers,
* but simply holds references to them, thus the caller must organise for those
* buffers to valid while held by this class. To assist the caller, there are
* extensible methods {@link #onContentQueued(ByteBuffer)}, {@link #onContentConsumed(ByteBuffer)}
* and {@link #onAllContentConsumed()} that can be implemented so that the
* and {@link #onAllContentConsumed()} that can be implemented so that the
* creator of HttpInput will know when buffers are queued and dequeued.
*/
public class HttpInput extends ServletInputStream
{
private static final Logger LOG = Log.getLogger(HttpInput.class);
protected final byte[] _oneByte=new byte[1];
protected final ArrayQueue<ByteBuffer> _inputQ=new ArrayQueue<>();
private ByteBuffer _content;
private boolean _inputEOF;
/* ------------------------------------------------------------ */
public HttpInput()
{
@ -60,14 +59,14 @@ public class HttpInput extends ServletInputStream
{
return _inputQ.lock();
}
/* ------------------------------------------------------------ */
public void recycle()
{
synchronized (lock())
{
_inputEOF=false;
if (_content!=null)
onContentConsumed(_content);
while ((_content=_inputQ.poll())!=null)
@ -77,7 +76,7 @@ public class HttpInput extends ServletInputStream
_content=null;
}
}
/* ------------------------------------------------------------ */
/*
* @see java.io.InputStream#read()
@ -104,7 +103,7 @@ public class HttpInput extends ServletInputStream
}
/* ------------------------------------------------------------ */
/*
/*
* @see java.io.InputStream#read(byte[], int, int)
*/
@Override
@ -112,7 +111,7 @@ public class HttpInput extends ServletInputStream
{
synchronized (lock())
{
ByteBuffer content=null;
ByteBuffer content=null;
while(content==null)
{
content=_inputQ.peekUnsafe();
@ -130,17 +129,17 @@ public class HttpInput extends ServletInputStream
onEof();
return -1;
}
blockForContent();
}
}
int l=Math.min(len,content.remaining());
content.get(b,off,l);
return l;
}
}
protected void blockForContent() throws IOException
{
synchronized (lock())
@ -158,44 +157,53 @@ public class HttpInput extends ServletInputStream
}
}
}
protected void onContentQueued(ByteBuffer ref)
{
lock().notify();
}
protected void onContentConsumed(ByteBuffer ref)
{
{
}
protected void onAllContentConsumed()
{
{
}
protected void onEof()
{
{
}
public boolean content(ByteBuffer ref)
{
synchronized (lock())
{
_inputQ.add(ref);
onContentQueued(ref);
}
}
return true;
}
public void shutdownInput()
public void shutdown()
{
synchronized (lock())
{
{
_inputEOF=true;
}
}
public boolean isShutdown()
{
synchronized (lock())
{
return _inputEOF;
}
}
public void consumeAll()
{
/*
while (true)
{
synchronized (lock())
@ -213,5 +221,6 @@ public class HttpInput extends ServletInputStream
LOG.warn(e);
}
}
*/
}
}

View File

@ -51,11 +51,6 @@ public class HttpWriterTest
return null;
}
@Override
protected void completed()
{
}
@Override
protected void execute(Runnable task)
{

View File

@ -137,11 +137,6 @@ public class ResponseTest
{
}
@Override
protected void completed()
{
}
@Override
public Connector getConnector()
{