jetty-9 switched back to normal synchronization

This commit is contained in:
Greg Wilkins 2012-05-08 20:09:02 +02:00
parent bc7e42e31f
commit fa59ddcc4b
7 changed files with 154 additions and 251 deletions

View File

@ -20,8 +20,8 @@ public class AsyncByteArrayEndPoint extends ByteArrayEndPoint implements AsyncEn
private volatile AbstractAsyncConnection _connection;
private DispatchedIOFuture _readFuture;
private DispatchedIOFuture _writeFuture;
private DispatchingIOFuture _readFuture;
private DispatchingIOFuture _writeFuture;
private ByteBuffer[] _writeBuffers;
@ -178,7 +178,7 @@ public class AsyncByteArrayEndPoint extends ByteArrayEndPoint implements AsyncEn
}
private final class WriteFuture extends DispatchedIOFuture
private final class WriteFuture extends DispatchingIOFuture
{
private WriteFuture()
{
@ -210,7 +210,7 @@ public class AsyncByteArrayEndPoint extends ByteArrayEndPoint implements AsyncEn
private final class ReadFuture extends DispatchedIOFuture
private final class ReadFuture extends DispatchingIOFuture
{
private ReadFuture()
{

View File

@ -17,38 +17,35 @@ import org.eclipse.jetty.util.Callback;
* By default, the callbacks are called by the thread that called {@link #complete()} or
* {@link #fail(Throwable)}
*/
public class DispatchedIOFuture implements IOFuture
public class DispatchingIOFuture implements IOFuture
{
private final Lock _lock;
private final Condition _block;
private final Object _lock;
private boolean _done;
private boolean _complete;
private Throwable _cause;
private Callback<?> _callback;
private Object _context;
public DispatchedIOFuture()
public DispatchingIOFuture()
{
this(new ReentrantLock());
this(null);
}
public DispatchedIOFuture(Lock lock)
public DispatchingIOFuture(Object lock)
{
this(false, lock);
}
public DispatchedIOFuture(boolean ready,Lock lock)
public DispatchingIOFuture(boolean ready,Object lock)
{
_complete=ready;
_done=ready;
_lock = lock;
_block = _lock.newCondition();
_lock = lock==null?this:lock;
}
public void fail(final Throwable cause)
{
_lock.lock();
try
synchronized(_lock)
{
if (_done)
throw new IllegalStateException("complete",cause);
@ -58,18 +55,13 @@ public class DispatchedIOFuture implements IOFuture
if (_callback!=null)
dispatchFailed();
_block.signal();
}
finally
{
_lock.unlock();
_lock.notifyAll();
}
}
public void complete()
{
_lock.lock();
try
synchronized(_lock)
{
if (_done)
throw new IllegalStateException();
@ -78,50 +70,35 @@ public class DispatchedIOFuture implements IOFuture
if (_callback!=null)
dispatchCompleted();
_block.signal();
}
finally
{
_lock.unlock();
_lock.notifyAll();
}
}
protected void cancelled()
{
_lock.lock();
try
synchronized(_lock)
{
if (_done)
throw new IllegalStateException();
_complete=false;
_done=true;
_block.signal();
}
finally
{
_lock.unlock();
_lock.notifyAll();
}
}
@Override
public boolean isDone()
{
_lock.lock();
try
synchronized(_lock)
{
return _done;
}
finally
{
_lock.unlock();
}
}
@Override
public boolean isComplete() throws ExecutionException
{
_lock.lock();
try
synchronized(_lock)
{
if (_done)
{
@ -132,10 +109,6 @@ public class DispatchedIOFuture implements IOFuture
return false;
}
finally
{
_lock.unlock();
}
}
@Override
@ -147,41 +120,30 @@ public class DispatchedIOFuture implements IOFuture
@Override
public void block() throws InterruptedException, ExecutionException
{
_lock.lock();
try
synchronized(_lock)
{
while (!_done)
_block.await();
_lock.wait();
isComplete();
}
finally
{
_lock.unlock();
}
}
@Override
public boolean block(long timeout, TimeUnit units) throws InterruptedException, ExecutionException
{
_lock.lock();
try
synchronized(_lock)
{
if (!_done)
_block.await(timeout,units);
_lock.wait(units.toMillis(timeout));
return isComplete();
}
finally
{
_lock.unlock();
}
}
@Override
public <C> void setCallback(Callback<C> callback, C context)
{
_lock.lock();
try
synchronized(_lock)
{
if (_callback!=null)
throw new IllegalStateException();
@ -196,10 +158,6 @@ public class DispatchedIOFuture implements IOFuture
dispatchFailed();
}
}
finally
{
_lock.unlock();
}
}
protected void dispatch(Runnable callback)

View File

@ -33,17 +33,17 @@ import org.eclipse.jetty.util.thread.Timeout.Task;
*/
public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPoint
{
public static final Logger LOG=Log.getLogger(SelectChannelEndPoint.class);
public static final Logger LOG = Log.getLogger(SelectChannelEndPoint.class);
private final Lock _lock = new ReentrantLock();
private final Object _lock = this;
private final SelectorManager.SelectSet _selectSet;
private final SelectorManager _manager;
private DispatchedIOFuture _readFuture=new DispatchedIOFuture(true,_lock);
private DispatchedIOFuture _writeFuture=new DispatchedIOFuture(true,_lock);
private DispatchingIOFuture _readFuture = new DispatchingIOFuture(true,_lock);
private DispatchingIOFuture _writeFuture = new DispatchingIOFuture(true,_lock);
private SelectionKey _key;
private SelectionKey _key;
private boolean _selected;
private boolean _changing;
@ -60,13 +60,12 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
private ByteBuffer[] _writeBuffers;
/* ------------------------------------------------------------ */
public SelectChannelEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key, int maxIdleTime)
throws IOException
public SelectChannelEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key, int maxIdleTime) throws IOException
{
super(channel);
_manager = selectSet.getManager();
_selectSet = selectSet;
_open=true;
_open = true;
_key = key;
setMaxIdleTime(maxIdleTime);
@ -94,55 +93,54 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
return _manager;
}
/* ------------------------------------------------------------ */
public void setAsyncConnection(AbstractAsyncConnection connection)
{
AbstractAsyncConnection old=getAsyncConnection();
_connection=connection;
if (old!=null && old!=connection)
AbstractAsyncConnection old = getAsyncConnection();
_connection = connection;
if (old != null && old != connection)
_manager.endPointUpgraded(this,old);
}
/* ------------------------------------------------------------ */
/** Called by selectSet to schedule handling
*
/**
* Called by selectSet to schedule handling
*
*/
public void onSelected()
{
_lock.lock();
_selected=true;
try
synchronized (_lock)
{
// If there is no key, then do nothing
if (_key == null || !_key.isValid())
_selected = true;
try
{
// TODO wake ups?
return;
// If there is no key, then do nothing
if (_key == null || !_key.isValid())
{
// TODO wake ups?
return;
}
// TODO do we need to test interest here ???
boolean can_read = (_key.isReadable() && (_key.interestOps() & SelectionKey.OP_READ) == SelectionKey.OP_READ);
boolean can_write = (_key.isWritable() && (_key.interestOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE);
_interestOps = 0;
if (can_read && !_readFuture.isDone())
_readFuture.complete();
if (can_write && _writeBuffers != null)
completeWrite();
}
finally
{
doUpdateKey();
_selected = false;
}
//TODO do we need to test interest here ???
boolean can_read=(_key.isReadable() && (_key.interestOps()&SelectionKey.OP_READ)==SelectionKey.OP_READ);
boolean can_write=(_key.isWritable() && (_key.interestOps()&SelectionKey.OP_WRITE)==SelectionKey.OP_WRITE);
_interestOps=0;
if (can_read && !_readFuture.isDone())
_readFuture.complete();
if (can_write && _writeBuffers!=null)
completeWrite();
}
finally
{
doUpdateKey();
_selected=false;
_lock.unlock();
}
}
/* ------------------------------------------------------------ */
public void cancelTimeout(Task task)
{
@ -159,7 +157,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
@Override
public void setCheckForIdle(boolean check)
{
_idlecheck=check;
_idlecheck = check;
}
/* ------------------------------------------------------------ */
@ -169,23 +167,21 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
return _idlecheck;
}
/* ------------------------------------------------------------ */
public void checkForIdleOrReadWriteTimeout(long now)
{
if (_idlecheck || !_readFuture.isDone() || !_writeFuture.isDone())
{
long idleTimestamp=getIdleTimestamp();
long max_idle_time=getMaxIdleTime();
long idleTimestamp = getIdleTimestamp();
long max_idle_time = getMaxIdleTime();
if (idleTimestamp!=0 && max_idle_time>0)
if (idleTimestamp != 0 && max_idle_time > 0)
{
long idleForMs=now-idleTimestamp;
long idleForMs = now - idleTimestamp;
if (idleForMs>max_idle_time)
if (idleForMs > max_idle_time)
{
_lock.lock();
try
synchronized (_lock)
{
if (_idlecheck)
_connection.onIdleExpired(idleForMs);
@ -195,10 +191,6 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
_writeFuture.fail(new TimeoutException());
notIdle();
}
finally
{
_lock.unlock();
}
}
}
}
@ -208,8 +200,8 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
@Override
public int fill(ByteBuffer buffer) throws IOException
{
int fill=super.fill(buffer);
if (fill>0)
int fill = super.fill(buffer);
if (fill > 0)
notIdle();
return fill;
}
@ -218,58 +210,50 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
@Override
public IOFuture readable() throws IllegalStateException
{
_lock.lock();
try
synchronized (_lock)
{
if (_readFuture!=null && !_readFuture.isDone())
if (_readFuture != null && !_readFuture.isDone())
throw new IllegalStateException("previous read not complete");
_readFuture=new InterestedFuture(SelectionKey.OP_READ);
_interestOps=_interestOps|SelectionKey.OP_READ;
_readFuture = new InterestedFuture(SelectionKey.OP_READ);
_interestOps = _interestOps | SelectionKey.OP_READ;
updateKey();
return _readFuture;
}
finally
{
_lock.unlock();
}
}
/* ------------------------------------------------------------ */
@Override
public IOFuture write(ByteBuffer... buffers)
{
_lock.lock();
try
synchronized (_lock)
{
if (_writeFuture!=null && !_writeFuture.isDone())
throw new IllegalStateException("previous write not complete");
flush(buffers);
// Are we complete?
for (ByteBuffer b : buffers)
try
{
if (b.hasRemaining())
if (_writeFuture != null && !_writeFuture.isDone())
throw new IllegalStateException("previous write not complete");
flush(buffers);
// Are we complete?
for (ByteBuffer b : buffers)
{
_writeBuffers=buffers;
_writeFuture=new InterestedFuture(SelectionKey.OP_WRITE);
_interestOps=_interestOps|SelectionKey.OP_WRITE;
updateKey();
return _writeFuture;
if (b.hasRemaining())
{
_writeBuffers = buffers;
_writeFuture = new InterestedFuture(SelectionKey.OP_WRITE);
_interestOps = _interestOps | SelectionKey.OP_WRITE;
updateKey();
return _writeFuture;
}
}
return DoneIOFuture.COMPLETE;
}
catch (IOException e)
{
return new DoneIOFuture(e);
}
return DoneIOFuture.COMPLETE;
}
catch(IOException e)
{
return new DoneIOFuture(e);
}
finally
{
_lock.unlock();
}
}
@ -285,31 +269,28 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
{
if (b.hasRemaining())
{
_interestOps=_interestOps|SelectionKey.OP_WRITE;
_interestOps = _interestOps | SelectionKey.OP_WRITE;
return;
}
}
// we are complete and ready
_writeFuture.complete();
}
catch(final IOException e)
catch (final IOException e)
{
_writeBuffers=null;
_writeBuffers = null;
if (!_writeFuture.isDone())
_writeFuture.fail(e);
}
}
/* ------------------------------------------------------------ */
@Override
public int flush(ByteBuffer... buffers) throws IOException
{
int l = super.flush(buffers);
if (l>0)
if (l > 0)
notIdle();
return l;
}
@ -320,55 +301,47 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
*/
private void updateKey()
{
if (!_lock.tryLock())
throw new IllegalStateException();
try
synchronized (this)
{
if (!_selected)
{
int current_ops=-1;
int current_ops = -1;
if (getChannel().isOpen())
{
try
{
current_ops = ((_key!=null && _key.isValid())?_key.interestOps():-1);
current_ops = ((_key != null && _key.isValid())?_key.interestOps():-1);
}
catch(Exception e)
catch (Exception e)
{
_key=null;
_key = null;
LOG.ignore(e);
}
}
if (_interestOps!=current_ops && !_changing)
if (_interestOps != current_ops && !_changing)
{
_changing=true;
_changing = true;
_selectSet.addChange(this);
_selectSet.wakeup();
}
}
}
finally
{
_lock.unlock();
}
}
/* ------------------------------------------------------------ */
/**
* Synchronize the interestOps with the actual key. Call is scheduled by a call to updateKey
*/
void doUpdateKey()
{
_lock.lock();
try
synchronized (_lock)
{
_changing=false;
_changing = false;
if (getChannel().isOpen())
{
if (_interestOps>0)
if (_interestOps > 0)
{
if (_key==null || !_key.isValid())
if (_key == null || !_key.isValid())
{
SelectableChannel sc = (SelectableChannel)getChannel();
if (sc.isRegistered())
@ -379,12 +352,12 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
{
try
{
_key=((SelectableChannel)getChannel()).register(_selectSet.getSelector(),_interestOps,this);
_key = ((SelectableChannel)getChannel()).register(_selectSet.getSelector(),_interestOps,this);
}
catch (Exception e)
{
LOG.ignore(e);
if (_key!=null && _key.isValid())
if (_key != null && _key.isValid())
{
_key.cancel();
}
@ -393,7 +366,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
{
_selectSet.destroyEndPoint(this);
}
_open=false;
_open = false;
_key = null;
}
}
@ -405,32 +378,28 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
}
else
{
if (_key!=null && _key.isValid())
if (_key != null && _key.isValid())
_key.interestOps(0);
else
_key=null;
_key = null;
}
}
else
{
if (_key!=null && _key.isValid())
if (_key != null && _key.isValid())
_key.cancel();
if (_open)
{
_open=false;
_open = false;
_selectSet.destroyEndPoint(this);
}
_key = null;
}
}
finally
{
_lock.unlock();
}
}
/* ------------------------------------------------------------ */
/*
* @see org.eclipse.io.nio.ChannelEndPoint#close()
@ -438,22 +407,17 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
@Override
public void close()
{
_lock.lock();
try
synchronized (_lock)
{
try
{
super.close();
}
finally
{
updateKey();
try
{
super.close();
}
finally
{
updateKey();
}
}
finally
{
_lock.unlock();
}
}
/* ------------------------------------------------------------ */
@ -484,19 +448,8 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
keyString += "-";
}
return String.format("SCEP@%x{l(%s)<->r(%s),open=%b,ishut=%b,oshut=%b,i=%d%s,r=%s,w=%s}-{%s}",
hashCode(),
getRemoteAddress(),
getLocalAddress(),
isOpen(),
isInputShutdown(),
isOutputShutdown(),
_interestOps,
keyString,
_readFuture,
_writeFuture,
getAsyncConnection());
return String.format("SCEP@%x{l(%s)<->r(%s),open=%b,ishut=%b,oshut=%b,i=%d%s,r=%s,w=%s}-{%s}",hashCode(),getRemoteAddress(),getLocalAddress(),isOpen(),
isInputShutdown(),isOutputShutdown(),_interestOps,keyString,_readFuture,_writeFuture,getAsyncConnection());
}
/* ------------------------------------------------------------ */
@ -508,13 +461,14 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
/* ------------------------------------------------------------ */
/* ------------------------------------------------------------ */
/* ------------------------------------------------------------ */
private class InterestedFuture extends DispatchedIOFuture
private class InterestedFuture extends DispatchingIOFuture
{
private final int _interest;
private InterestedFuture(int interest)
{
super(_lock);
_interest=interest;
_interest = interest;
}
@Override
@ -522,7 +476,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
{
if (!_manager.dispatch(task))
{
LOG.warn("Dispatch failed: i="+_interest);
LOG.warn("Dispatch failed: i=" + _interest);
throw new IllegalStateException();
}
}
@ -530,17 +484,12 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
@Override
public void cancel()
{
_lock.lock();
try
synchronized (_lock)
{
_interestOps=_interestOps&~_interest;
_interestOps = _interestOps & ~_interest;
updateKey();
cancelled();
}
finally
{
_lock.unlock();
}
}
}
}

View File

@ -47,8 +47,8 @@ public class SslConnection extends AbstractAsyncConnection
private final Lock _lock = new ReentrantLock();
private final NetWriteCallback _netWriteCallback = new NetWriteCallback();
private DispatchedIOFuture _appReadFuture = new DispatchedIOFuture(true,_lock);
private DispatchedIOFuture _appWriteFuture = new DispatchedIOFuture(true,_lock);
private DispatchingIOFuture _appReadFuture = new DispatchingIOFuture(true,_lock);
private DispatchingIOFuture _appWriteFuture = new DispatchingIOFuture(true,_lock);
private final SSLEngine _engine;
private final SSLSession _session;
@ -748,7 +748,7 @@ public class SslConnection extends AbstractAsyncConnection
return DoneIOFuture.COMPLETE;
// No, we need to schedule a network read
_appReadFuture=new DispatchedIOFuture(_lock);
_appReadFuture=new DispatchingIOFuture(_lock);
if (_netReadFuture==null)
_netReadFuture=scheduleOnReadable();
return _appReadFuture;
@ -776,7 +776,7 @@ public class SslConnection extends AbstractAsyncConnection
if (b.hasRemaining())
{
_writeBuffers=buffers;
_appWriteFuture=new DispatchedIOFuture(_lock);
_appWriteFuture=new DispatchingIOFuture(_lock);
return _appWriteFuture;
}
}

View File

@ -130,7 +130,7 @@ public class IOFutureTest
@Test
public void testInCompleted() throws Exception
{
IOFuture future = new DispatchedIOFuture();
IOFuture future = new DispatchingIOFuture();
assertFalse(future.isDone());
assertFalse(future.isComplete());
@ -165,7 +165,7 @@ public class IOFutureTest
@Test
public void testReady() throws Exception
{
DispatchedIOFuture future = new DispatchedIOFuture();
DispatchingIOFuture future = new DispatchingIOFuture();
assertFalse(future.isDone());
assertFalse(future.isComplete());
@ -197,7 +197,7 @@ public class IOFutureTest
start=System.currentTimeMillis();
final DispatchedIOFuture f0=future;
final DispatchingIOFuture f0=future;
new Thread()
{
@Override
@ -220,11 +220,11 @@ public class IOFutureTest
ready.set(false);
future = new DispatchedIOFuture();
future = new DispatchingIOFuture();
assertFalse(future.isDone());
assertFalse(future.isComplete());
start=System.currentTimeMillis();
final DispatchedIOFuture f1=future;
final DispatchingIOFuture f1=future;
new Thread()
{
@Override
@ -248,7 +248,7 @@ public class IOFutureTest
@Test
public void testFail() throws Exception
{
DispatchedIOFuture future = new DispatchedIOFuture();
DispatchingIOFuture future = new DispatchingIOFuture();
final Exception ex=new Exception("failed");
assertFalse(future.isDone());
@ -280,7 +280,7 @@ public class IOFutureTest
assertNull(fail.get());
start=System.currentTimeMillis();
final DispatchedIOFuture f0=future;
final DispatchingIOFuture f0=future;
new Thread()
{
@Override
@ -316,14 +316,14 @@ public class IOFutureTest
assertFalse(ready.get());
assertEquals(ex,fail.get());
future=new DispatchedIOFuture();
future=new DispatchingIOFuture();
ready.set(false);
fail.set(null);
assertFalse(future.isDone());
assertFalse(future.isComplete());
start=System.currentTimeMillis();
final DispatchedIOFuture f1=future;
final DispatchingIOFuture f1=future;
new Thread()
{
@Override

View File

@ -345,10 +345,6 @@ public class IOTest
reading.get();
writing.get();
read.flip();
System.err.println(BufferUtil.toString(read));
}
}

View File

@ -29,7 +29,7 @@ import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.io.AbstractAsyncConnection;
import org.eclipse.jetty.io.AsyncConnection;
import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.io.DispatchedIOFuture;
import org.eclipse.jetty.io.DispatchingIOFuture;
import org.eclipse.jetty.io.DoneIOFuture;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.io.IOFuture;
@ -326,7 +326,7 @@ public class HttpConnection extends AbstractAsyncConnection
catch(ExecutionException e)
{
LOG.debug(e);
DispatchedIOFuture.rethrow(e);
DispatchingIOFuture.rethrow(e);
}
finally
{
@ -649,7 +649,7 @@ public class HttpConnection extends AbstractAsyncConnection
catch (ExecutionException e)
{
LOG.debug(e);
DispatchedIOFuture.rethrow(e);
DispatchingIOFuture.rethrow(e);
}
finally
{