jetty-9 removed excess synchronization

This commit is contained in:
Greg Wilkins 2012-07-26 19:28:06 +10:00
parent b3db5ffc8d
commit 7882a17581
2 changed files with 61 additions and 60 deletions

View File

@ -45,7 +45,6 @@ public class HttpConnection extends AbstractAsyncConnection
public static final String UPGRADE_CONNECTION_ATTR = "org.eclispe.jetty.server.HttpConnection.UPGRADE";
private final Object _lock = this;
private final Server _server;
private final HttpConnector _connector;
private final HttpParser _parser;
@ -54,7 +53,6 @@ public class HttpConnection extends AbstractAsyncConnection
private final ByteBufferPool _bufferPool;
private final HttpHttpInput _httpInput;
private volatile Thread _thread;
private ResponseInfo _info;
ByteBuffer _requestBuffer=null;
ByteBuffer _responseHeader=null;
@ -111,12 +109,6 @@ public class HttpConnection extends AbstractAsyncConnection
return _server;
}
/* ------------------------------------------------------------ */
public Thread getThread()
{
return _thread;
}
/* ------------------------------------------------------------ */
/**
* @return Returns the connector.
@ -191,6 +183,13 @@ public class HttpConnection extends AbstractAsyncConnection
}
}
@Override
public void fillInterested()
{
// new Throwable().printStackTrace();
super.fillInterested();
}
/* ------------------------------------------------------------ */
/** Parse and handle HTTP messages.
* <p>
@ -203,18 +202,20 @@ public class HttpConnection extends AbstractAsyncConnection
* the HttpChannel becomes !idle; or the connection has been changed
*/
@Override
public synchronized void onFillable()
public void onFillable()
{
LOG.debug("{} onReadable {}",this,_channel.isIdle());
try
{
_thread=Thread.currentThread();
setCurrentConnection(this);
// TODO try to generalize this loop into AbstractAsyncConnection
while (true)
{
// synchronized (_lock)
{
// Fill the request buffer with data only if it is totally empty.
if (BufferUtil.isEmpty(_requestBuffer))
{
@ -229,8 +230,8 @@ public class HttpConnection extends AbstractAsyncConnection
if (filled==0)
{
// Somebody wanted to read, we didn't so schedule another attempt
fillInterested();
releaseRequestBuffer();
fillInterested();
return;
}
else if (filled<0)
@ -286,6 +287,7 @@ public class HttpConnection extends AbstractAsyncConnection
_channel.getEventHandler().badMessage(HttpStatus.REQUEST_ENTITY_TOO_LARGE_413,null);
}
}
}
}
catch(IOException e)
{
@ -302,9 +304,9 @@ public class HttpConnection extends AbstractAsyncConnection
}
finally
{
_thread=null;
setCurrentConnection(null);
}
}
@ -386,7 +388,7 @@ public class HttpConnection extends AbstractAsyncConnection
@Override
protected synchronized boolean commitError(int status, String reason, String content)
protected boolean commitError(int status, String reason, String content)
{
if (!super.commitError(status,reason,content))
{
@ -407,11 +409,11 @@ public class HttpConnection extends AbstractAsyncConnection
}
@Override
protected synchronized void completed()
protected void completed()
{
// This is called by HttpChannel#handle when it knows that it's handling of the request/response cycle
// is complete. This may be in the original thread dispatched to the connection that has called process from
// the connection#onReadable method, or it may be from a thread dispatched to call process as the result
// the connection#onFillable method, or it may be from a thread dispatched to call process as the result
// of a resumed suspended request.
// At this point the HttpChannel will have completed the generation of any response (although it might remain to
// be asynchronously flushed TBD), but it may not have consumed the entire
@ -437,7 +439,7 @@ public class HttpConnection extends AbstractAsyncConnection
HttpConnection.this.reset();
// are called from non connection thread (ie dispatched from a resume)
if (getThread()!=Thread.currentThread())
if (getCurrentConnection()!=HttpConnection.this)
{
if (_parser.isStart())
{
@ -480,24 +482,17 @@ public class HttpConnection extends AbstractAsyncConnection
@Override
public void run()
{
if (getThread()!=null)
{
// dispatched thread is still executing, try again later
// TODO - this probably should not be able to occur as the resume dispatch is not done until an unhandle
LOG.warn("Dispatch while dispatched???");
execute(this);
}
else
onFillable();
onFillable();
}
/* ------------------------------------------------------------ */
private int generate(ByteBuffer content, Action action) throws IOException
{
long prepared_before=0;
long prepared_after;
synchronized(_lock)
// Only one response writer at a time.
synchronized(this)
{
long prepared_before=0;
long prepared_after;
try
{
if (_generator.isComplete())
@ -513,12 +508,12 @@ public class HttpConnection extends AbstractAsyncConnection
HttpGenerator.Result result=_generator.generate(_info,_responseHeader,_chunk,_responseBuffer,content,action);
if (LOG.isDebugEnabled())
LOG.debug("{} generate: {} ({},{},{})@{}",
this,
result,
BufferUtil.toSummaryString(_responseHeader),
BufferUtil.toSummaryString(_responseBuffer),
BufferUtil.toSummaryString(content),
_generator.getState());
this,
result,
BufferUtil.toSummaryString(_responseHeader),
BufferUtil.toSummaryString(_responseBuffer),
BufferUtil.toSummaryString(content),
_generator.getState());
switch(result)
{
@ -590,19 +585,20 @@ public class HttpConnection extends AbstractAsyncConnection
{
prepared_after=_generator.getContentPrepared();
}
return (int)(prepared_after-prepared_before);
}
return (int)(prepared_after-prepared_before);
}
@Override
protected void commit(ResponseInfo info, ByteBuffer content) throws IOException
protected void commitResponse(ResponseInfo info, ByteBuffer content) throws IOException
{
_info=info;
LOG.debug("{} commit {}",this,_info);
synchronized (_lock)
// Only one response writer at a time.
synchronized (this)
{
_info=info;
LOG.debug("{} commit {}",this,_info);
try
{
if (_generator.isCommitted())
@ -617,12 +613,12 @@ public class HttpConnection extends AbstractAsyncConnection
HttpGenerator.Result result=_generator.generate(_info,_responseHeader,null,_responseBuffer,content,Action.COMPLETE);
if (LOG.isDebugEnabled())
LOG.debug("{} commit: {} ({},{},{})@{}",
this,
result,
BufferUtil.toDetailString(_responseHeader),
BufferUtil.toSummaryString(_responseBuffer),
BufferUtil.toSummaryString(content),
_generator.getState());
this,
result,
BufferUtil.toDetailString(_responseHeader),
BufferUtil.toSummaryString(_responseBuffer),
BufferUtil.toSummaryString(content),
_generator.getState());
switch(result)
{
@ -685,9 +681,8 @@ public class HttpConnection extends AbstractAsyncConnection
{
LOG.debug(e);
FutureCallback.rethrow(e);
}
}
}
}
@Override
@ -814,7 +809,7 @@ public class HttpConnection extends AbstractAsyncConnection
if (!_generator.isPersistent())
{
_parser.setState(HttpParser.State.CLOSED);
synchronized (_inputQ.lock())
synchronized (lock())
{
_inputQ.clear();
}
@ -823,7 +818,7 @@ public class HttpConnection extends AbstractAsyncConnection
{
while (true)
{
synchronized (_inputQ.lock())
synchronized (lock())
{
_inputQ.clear();
}

View File

@ -50,10 +50,16 @@ public class HttpInput extends ServletInputStream
{
}
/* ------------------------------------------------------------ */
public Object lock()
{
return _inputQ.lock();
}
/* ------------------------------------------------------------ */
public void recycle()
{
synchronized (_inputQ.lock())
synchronized (lock())
{
_inputEOF=false;
@ -82,7 +88,7 @@ public class HttpInput extends ServletInputStream
@Override
public int available()
{
synchronized (_inputQ.lock())
synchronized (lock())
{
ByteBuffer content=_inputQ.peekUnsafe();
if (content==null)
@ -99,7 +105,7 @@ public class HttpInput extends ServletInputStream
@Override
public int read(byte[] b, int off, int len) throws IOException
{
synchronized (_inputQ.lock())
synchronized (lock())
{
ByteBuffer content=null;
while(content==null)
@ -132,13 +138,13 @@ public class HttpInput extends ServletInputStream
protected void blockForContent() throws IOException
{
synchronized (_inputQ.lock())
synchronized (lock())
{
while(_inputQ.isEmpty())
{
try
{
_inputQ.lock().wait();
lock().wait();
}
catch (InterruptedException e)
{
@ -150,7 +156,7 @@ public class HttpInput extends ServletInputStream
protected void onContentQueued(ByteBuffer ref)
{
_inputQ.lock().notify();
lock().notify();
}
protected void onContentConsumed(ByteBuffer ref)
@ -167,7 +173,7 @@ public class HttpInput extends ServletInputStream
public boolean content(ByteBuffer ref)
{
synchronized (_inputQ.lock())
synchronized (lock())
{
_inputQ.add(ref);
onContentQueued(ref);
@ -177,7 +183,7 @@ public class HttpInput extends ServletInputStream
public void shutdownInput()
{
synchronized (_inputQ.lock())
synchronized (lock())
{
_inputEOF=true;
}
@ -187,7 +193,7 @@ public class HttpInput extends ServletInputStream
{
while (true)
{
synchronized (_inputQ.lock())
synchronized (lock())
{
_inputQ.clear();
}