jetty-9 Simplified interface

This commit is contained in:
Greg Wilkins 2012-04-04 14:00:16 +10:00
parent dfab993fcb
commit e7dcd16757
6 changed files with 234 additions and 270 deletions

View File

@ -1,226 +0,0 @@
package org.eclipse.jetty.io;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
public abstract class AbstractSelectableConnection implements SelectableConnection
{
private static final Logger LOG = Log.getLogger(AbstractSelectableConnection.class);
protected final SelectableEndPoint _endp;
private final long _createdTimeStamp;
private final Lock _lock=new ReentrantLock();
private final Condition _readable=_lock.newCondition();
private final Condition _writeable=_lock.newCondition();
private boolean _readBlocked;
private boolean _writeBlocked;
private final Runnable _reader=new Runnable()
{
@Override
public void run()
{
try
{
doRead();
}
catch(Throwable th)
{
LOG.warn(th);
}
}
};
private final Runnable _writer=new Runnable()
{
@Override
public void run()
{
try
{
doWrite();
}
catch(Throwable th)
{
LOG.warn(th);
}
}
};
private volatile int _maxIdleTime=-1;
public AbstractSelectableConnection(SelectableEndPoint endp)
{
_endp=endp;
_createdTimeStamp = System.currentTimeMillis();
}
@Override
public EndPoint getEndPoint()
{
return _endp;
}
@Override
public SelectableEndPoint getSelectableEndPoint()
{
return _endp;
}
@Override
public long getCreatedTimeStamp()
{
return _createdTimeStamp;
}
@Override
public Runnable onReadable()
{
_lock.lock();
try
{
if (_readBlocked)
_readable.signalAll();
else
return _reader;
}
finally
{
_lock.unlock();
}
return null;
}
@Override
public Runnable onWriteable()
{
_lock.lock();
try
{
if (_writeBlocked)
_writeable.signalAll();
else
return _writer;
}
finally
{
_lock.unlock();
}
return null;
}
@Override
public boolean blockReadable()
{
_lock.lock();
boolean readable=false;
try
{
if (_readBlocked)
throw new IllegalStateException();
_readBlocked=true;
_endp.setReadInterested(true);
readable=_readable.await(getMaxIdleTime(),TimeUnit.SECONDS);
}
catch(InterruptedException e)
{
LOG.ignore(e);
}
finally
{
if (!readable)
_endp.setReadInterested(false);
_readBlocked=false;
_lock.unlock();
}
return readable;
}
@Override
public boolean blockWriteable()
{
_lock.lock();
boolean writeable=false;
try
{
if (_writeBlocked)
throw new IllegalStateException();
_writeBlocked=true;
_endp.setWriteInterested(true);
writeable=_writeable.await(getMaxIdleTime(),TimeUnit.SECONDS);
}
catch(InterruptedException e)
{
LOG.ignore(e);
}
finally
{
if (!writeable)
_endp.setWriteInterested(false);
_writeBlocked=false;
_lock.unlock();
}
return writeable;
}
@Override
public void onIdleExpired(long idleForMs)
{
try
{
LOG.debug("onIdleExpired {}ms {} {}",idleForMs,this,_endp);
if (_endp.isInputShutdown() || _endp.isOutputShutdown())
_endp.close();
else
_endp.shutdownOutput();
}
catch(IOException e)
{
LOG.ignore(e);
try
{
_endp.close();
}
catch(IOException e2)
{
LOG.ignore(e2);
}
}
}
protected void doRead()
{
throw new IllegalStateException();
}
protected void doWrite()
{
throw new IllegalStateException();
}
@Override
public int getMaxIdleTime()
{
int max=_maxIdleTime;
return (max==-1)?_endp.getMaxIdleTime():max;
}
public void setMaxIdleTime(int max)
{
_maxIdleTime=max;
}
@Override
public String toString()
{
return String.format("%s@%x", getClass().getSimpleName(), hashCode());
}
}

View File

@ -104,7 +104,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Selectable
/** Called by selectSet to schedule handling
*
*/
public void selected()
public void selected() throws IOException
{
_lock.lock();
_selected=true;

View File

@ -1,28 +1,227 @@
package org.eclipse.jetty.io;
public interface SelectableConnection extends Connection
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
public abstract class SelectableConnection implements Connection
{
SelectableEndPoint getSelectableEndPoint();
private static final Logger LOG = Log.getLogger(SelectableConnection.class);
Runnable onReadable();
Runnable onWriteable();
public boolean blockReadable();
public boolean blockWriteable();
protected final SelectableEndPoint _endp;
private final long _createdTimeStamp;
private final Lock _lock=new ReentrantLock();
private final Condition _readable=_lock.newCondition();
private final Condition _writeable=_lock.newCondition();
private boolean _readBlocked;
private boolean _writeBlocked;
/**
* Called when the connection idle timeout expires
* @param idleForMs TODO
*/
void onIdleExpired(long idleForMs);
private final Runnable _reader=new Runnable()
{
@Override
public void run()
{
try
{
doRead();
}
catch(Throwable th)
{
LOG.warn(th);
}
}
};
private final Runnable _writer=new Runnable()
{
@Override
public void run()
{
try
{
doWrite();
}
catch(Throwable th)
{
LOG.warn(th);
}
}
};
void onInputShutdown();
private volatile int _maxIdleTime=-1;
public SelectableConnection(SelectableEndPoint endp)
{
_endp=endp;
_createdTimeStamp = System.currentTimeMillis();
}
/**
* Called when the connection is closed
*/
void onClose();
@Override
public EndPoint getEndPoint()
{
return _endp;
}
public SelectableEndPoint getSelectableEndPoint()
{
return _endp;
}
@Override
public long getCreatedTimeStamp()
{
return _createdTimeStamp;
}
public Runnable onReadable()
{
_lock.lock();
try
{
if (_readBlocked)
_readable.signalAll();
else
return _reader;
}
finally
{
_lock.unlock();
}
return null;
}
public Runnable onWriteable()
{
_lock.lock();
try
{
if (_writeBlocked)
_writeable.signalAll();
else
return _writer;
}
finally
{
_lock.unlock();
}
return null;
}
public boolean blockReadable()
{
_lock.lock();
boolean readable=false;
try
{
if (_readBlocked)
throw new IllegalStateException();
_readBlocked=true;
_endp.setReadInterested(true);
readable=_readable.await(getMaxIdleTime(),TimeUnit.SECONDS);
}
catch(InterruptedException e)
{
LOG.ignore(e);
}
finally
{
if (!readable)
_endp.setReadInterested(false);
_readBlocked=false;
_lock.unlock();
}
return readable;
}
public boolean blockWriteable()
{
_lock.lock();
boolean writeable=false;
try
{
if (_writeBlocked)
throw new IllegalStateException();
_writeBlocked=true;
_endp.setWriteInterested(true);
writeable=_writeable.await(getMaxIdleTime(),TimeUnit.SECONDS);
}
catch(InterruptedException e)
{
LOG.ignore(e);
}
finally
{
if (!writeable)
_endp.setWriteInterested(false);
_writeBlocked=false;
_lock.unlock();
}
return writeable;
}
public void onIdleExpired(long idleForMs)
{
try
{
LOG.debug("onIdleExpired {}ms {} {}",idleForMs,this,_endp);
if (_endp.isInputShutdown() || _endp.isOutputShutdown())
_endp.close();
else
_endp.shutdownOutput();
}
catch(IOException e)
{
LOG.ignore(e);
try
{
_endp.close();
}
catch(IOException e2)
{
LOG.ignore(e2);
}
}
}
protected void doRead()
{
throw new IllegalStateException();
}
protected void doWrite()
{
throw new IllegalStateException();
}
@Override
public int getMaxIdleTime()
{
int max=_maxIdleTime;
return (max==-1)?_endp.getMaxIdleTime():max;
}
public void setMaxIdleTime(int max)
{
_maxIdleTime=max;
}
@Override
public String toString()
{
return String.format("%s@%x", getClass().getSimpleName(), hashCode());
}
public void onInputShutdown() throws IOException
{
}
public void onClose()
{
}
}

View File

@ -38,7 +38,7 @@ import org.eclipse.jetty.util.log.Logger;
* it's source/sink of encrypted data. It then provides {@link #getAppEndPoint()} to
* expose a source/sink of unencrypted data to another connection (eg HttpConnection).
*/
public class SslConnection extends AbstractSelectableConnection
public class SslConnection extends SelectableConnection
{
private static final Logger LOG = Log.getLogger("org.eclipse.jetty.io.nio.ssl");
@ -679,7 +679,7 @@ public class SslConnection extends AbstractSelectableConnection
public void setSelectableConnection(SelectableConnection connection)
{
_appConnection=(AbstractSelectableConnection)connection;
_appConnection=(SelectableConnection)connection;
}
@Override

View File

@ -21,7 +21,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.AbstractSelectableConnection;
import org.eclipse.jetty.io.SelectableConnection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.SelectChannelEndPoint;
import org.eclipse.jetty.io.SelectorManager;
@ -108,7 +108,7 @@ public class SelectChannelEndPointTest
return new TestConnection(endpoint);
}
public class TestConnection extends AbstractSelectableConnection
public class TestConnection extends SelectableConnection
{
ByteBuffer _in = BufferUtil.allocate(32*1024);
ByteBuffer _out = BufferUtil.allocate(32*1024);

View File

@ -27,17 +27,18 @@ import org.eclipse.jetty.http.HttpGenerator;
import org.eclipse.jetty.http.HttpGenerator.Action;
import org.eclipse.jetty.http.HttpParser;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.io.AbstractSelectableConnection;
import org.eclipse.jetty.io.SelectableConnection;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.io.SelectableEndPoint;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
/**
*/
public abstract class HttpConnection extends AbstractSelectableConnection
public abstract class HttpConnection extends SelectableConnection
{
private static final Logger LOG = Log.getLogger(HttpConnection.class);
@ -74,7 +75,7 @@ public abstract class HttpConnection extends AbstractSelectableConnection
/** Constructor
*
*/
public HttpConnection(Connector connector, EndPoint endpoint, Server server)
public HttpConnection(Connector connector, SelectableEndPoint endpoint, Server server)
{
super(endpoint);
_connector = connector;
@ -149,6 +150,7 @@ public abstract class HttpConnection extends AbstractSelectableConnection
}
/* ------------------------------------------------------------ */
@Override
public boolean isIdle()
{
return _parser.isIdle() && _generator.isIdle();
@ -161,6 +163,7 @@ public abstract class HttpConnection extends AbstractSelectableConnection
}
/* ------------------------------------------------------------ */
@Override
public int getMaxIdleTime()
{
if (_connector.isLowResources() && _endp.getMaxIdleTime()==_connector.getMaxIdleTime())
@ -171,6 +174,7 @@ public abstract class HttpConnection extends AbstractSelectableConnection
}
/* ------------------------------------------------------------ */
@Override
public String toString()
{
return String.format("%s,g=%s,p=%s",
@ -179,21 +183,6 @@ public abstract class HttpConnection extends AbstractSelectableConnection
_parser);
}
/* ------------------------------------------------------------ */
@Override
public void canRead()
{
}
/* ------------------------------------------------------------ */
@Override
public void canWrite()
{
}
/* ------------------------------------------------------------ */
public void processInput()
@ -206,7 +195,7 @@ public abstract class HttpConnection extends AbstractSelectableConnection
setCurrentConnection(this);
// don't check for idle while dispatched (unless blocking IO is done).
getEndPoint().setCheckForIdle(false);
getSelectableEndPoint().setCheckForIdle(false);
// While progress and the connection has not changed
@ -277,7 +266,7 @@ public abstract class HttpConnection extends AbstractSelectableConnection
if (!_processor.getRequest().getAsyncContinuation().isAsyncStarted())
{
// reenable idle checking unless request is suspended
getEndPoint().setCheckForIdle(true);
getSelectableEndPoint().setCheckForIdle(true);
}
}
}
@ -416,18 +405,20 @@ public abstract class HttpConnection extends AbstractSelectableConnection
break;
if (_toFlush>0)
_endp.blockWritable(getMaxIdleTime());
blockWriteable();
}
}
/* ------------------------------------------------------------ */
@Override
public void onClose()
{
_processor.onClose();
}
/* ------------------------------------------------------------ */
@Override
public void onInputShutdown() throws IOException
{
// If we don't have a committed response and we are not suspended