jetty-9: work in progress on read/write interest

This commit is contained in:
Greg Wilkins 2012-03-29 22:49:08 +11:00
parent b267511f8f
commit c60ec9cf5f
23 changed files with 314 additions and 756 deletions

View File

@ -2,25 +2,25 @@ package org.eclipse.jetty.io;
import java.io.IOException;
import org.eclipse.jetty.io.nio.AsyncConnection;
import org.eclipse.jetty.io.nio.Connection;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
public abstract class AbstractAsyncConnection implements AsyncConnection
public abstract class AbstractConnection implements Connection
{
private static final Logger LOG = Log.getLogger(AbstractAsyncConnection.class);
private static final Logger LOG = Log.getLogger(AbstractConnection.class);
private final long _timeStamp;
protected final AsyncEndPoint _endp;
protected final EndPoint _endp;
public AbstractAsyncConnection(AsyncEndPoint endp)
public AbstractConnection(EndPoint endp)
{
_endp=endp;
_timeStamp = System.currentTimeMillis();
}
public AbstractAsyncConnection(AsyncEndPoint endp,long timestamp)
public AbstractConnection(EndPoint endp,long timestamp)
{
_endp=endp;
_timeStamp = timestamp;
@ -28,7 +28,7 @@ public abstract class AbstractAsyncConnection implements AsyncConnection
@Override
public AsyncEndPoint getAsyncEndPoint()
public EndPoint getEndPoint()
{
return _endp;
}

View File

@ -1,73 +0,0 @@
// ========================================================================
// Copyright (c) 2004-2009 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
package org.eclipse.jetty.io;
import java.io.IOException;
import org.eclipse.jetty.io.nio.AsyncConnection;
import org.eclipse.jetty.util.thread.Timeout;
public interface AsyncEndPoint extends EndPoint
{
/* ------------------------------------------------------------ */
AsyncConnection getAsyncConnection();
/* ------------------------------------------------------------ */
void setAsyncConnection(AsyncConnection connection);
/* ------------------------------------------------------------ */
/**
* Dispatch the endpoint to a thread to attend to it.
*
*/
public void asyncDispatch();
/* ------------------------------------------------------------ */
/** Schedule a write dispatch.
* Set the endpoint to not be writable and schedule a dispatch when
* it becomes writable.
*/
public void scheduleWrite();
/* ------------------------------------------------------------ */
/** Callback when idle.
* <p>An endpoint is idle if there has been no IO activity for
* {@link #getMaxIdleTime()} and {@link #isCheckForIdle()} is true.
* @param idleForMs TODO
*/
public void onIdleExpired(long idleForMs);
/* ------------------------------------------------------------ */
/** Set if the endpoint should be checked for idleness
*/
public void setCheckForIdle(boolean check);
/* ------------------------------------------------------------ */
/** Get if the endpoint should be checked for idleness
*/
public boolean isCheckForIdle();
/* ------------------------------------------------------------ */
public boolean isWritable();
/* ------------------------------------------------------------ */
/**
*/
public void scheduleTimeout(Timeout.Task task, long timeoutMs);
/* ------------------------------------------------------------ */
/**
*/
public void cancelTimeout(Timeout.Task task);
}

View File

@ -17,6 +17,7 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import org.eclipse.jetty.io.nio.Connection;
import org.eclipse.jetty.util.BufferUtil;
@ -34,6 +35,8 @@ public class ByteArrayEndPoint implements EndPoint
protected boolean _closed;
protected boolean _growOutput;
protected int _maxIdleTime;
protected Connection _connection;
private boolean _idleCheck;
/* ------------------------------------------------------------ */
/**
@ -117,18 +120,6 @@ public class ByteArrayEndPoint implements EndPoint
return _closed;
}
/* ------------------------------------------------------------ */
public boolean blockReadable(long millisecs)
{
return true;
}
/* ------------------------------------------------------------ */
public boolean blockWritable(long millisecs)
{
return true;
}
/* ------------------------------------------------------------ */
/*
* @see org.eclipse.io.EndPoint#shutdownOutput()
@ -170,37 +161,11 @@ public class ByteArrayEndPoint implements EndPoint
return 0;
}
/* ------------------------------------------------------------ */
/*
* @see org.eclipse.io.EndPoint#flush(org.eclipse.io.Buffer)
*/
public int flush(ByteBuffer buffer) throws IOException
{
if (_closed)
throw new IOException("CLOSED");
if (_growOutput && buffer.remaining()>_out.remaining())
{
_out.compact();
if (buffer.remaining()>_out.remaining())
{
ByteBuffer n = ByteBuffer.allocate(_out.capacity()+buffer.remaining()*2);
n.put(_out);
_out=n;
}
}
int put=buffer.remaining();
_out.put(buffer);
return put;
}
/* ------------------------------------------------------------ */
/*
* @see org.eclipse.io.EndPoint#flush(org.eclipse.io.Buffer, org.eclipse.io.Buffer, org.eclipse.io.Buffer)
*/
public int gather(ByteBuffer... buffers) throws IOException
public int flush(ByteBuffer... buffers) throws IOException
{
if (_closed)
throw new IOException("CLOSED");
@ -210,9 +175,25 @@ public class ByteArrayEndPoint implements EndPoint
{
if (b.hasRemaining())
{
int l=flush(b);
if (l>0)
len+=l;
if (_growOutput && b.remaining()>_out.remaining())
{
_out.compact();
if (b.remaining()>_out.remaining())
{
ByteBuffer n = ByteBuffer.allocate(_out.capacity()+b.remaining()*2);
n.put(_out);
_out=n;
}
}
int put=b.remaining();
if (put>0)
{
_out.put(b);
len+=put;
}
else
break;
}
@ -254,11 +235,6 @@ public class ByteArrayEndPoint implements EndPoint
return _inBytes;
}
/* ------------------------------------------------------------ */
public void flush() throws IOException
{
}
/* ------------------------------------------------------------ */
/**
* @return the growOutput
@ -295,5 +271,34 @@ public class ByteArrayEndPoint implements EndPoint
_maxIdleTime=timeMs;
}
@Override
public Connection getConnection()
{
return _connection;
}
@Override
public void setConnection(Connection connection)
{
_connection=connection;
}
@Override
public void onIdleExpired(long idleForMs)
{
}
@Override
public void setCheckForIdle(boolean check)
{
_idleCheck=check;
}
@Override
public boolean isCheckForIdle()
{
return _idleCheck;
}
}

View File

@ -17,6 +17,8 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import org.eclipse.jetty.io.nio.Connection;
/**
*
@ -57,17 +59,6 @@ public interface EndPoint
int fill(ByteBuffer buffer) throws IOException;
/**
* Flush data from the passed buffer to this endpoint. As many bytes as can be consumed
* are taken from the buffer position up until the buffer limit. The
* buffers position is updated to indicate how many bytes have been consumed.
*
* @param buffer The buffer to flush. This buffers position is updated if it is not read only.
* @return the number of bytes written
* @throws EofException If the endpoint is closed or output is shutdown.
*/
int flush(ByteBuffer buffer) throws IOException;
/**
* Flush data from the passed header/buffer to this endpoint. As many bytes as can be consumed
* are taken from the header/buffer position up until the buffer limit. The header/buffers position
@ -76,7 +67,7 @@ public interface EndPoint
* @return the number of bytes written
* @throws EofException If the endpoint is closed or output is shutdown.
*/
int gather(ByteBuffer... buffer) throws IOException;
int flush(ByteBuffer... buffer) throws IOException;
/* ------------------------------------------------------------ */
@ -120,4 +111,30 @@ public interface EndPoint
* @throws IOException if the timeout cannot be set.
*/
void setMaxIdleTime(int timeMs) throws IOException;
/* ------------------------------------------------------------ */
Connection getConnection();
/* ------------------------------------------------------------ */
void setConnection(Connection connection);
/* ------------------------------------------------------------ */
/** Callback when idle.
* <p>An endpoint is idle if there has been no IO activity for
* {@link #getMaxIdleTime()} and {@link #isCheckForIdle()} is true.
* @param idleForMs TODO
*/
public void onIdleExpired(long idleForMs);
/* ------------------------------------------------------------ */
/** Set if the endpoint should be checked for idleness
*/
public void setCheckForIdle(boolean check);
/* ------------------------------------------------------------ */
/** Get if the endpoint should be checked for idleness
*/
public boolean isCheckForIdle();
}

View File

@ -36,14 +36,15 @@ public class ChannelEndPoint implements EndPoint
{
private static final Logger LOG = Log.getLogger(ChannelEndPoint.class);
protected final ByteChannel _channel;
protected final ByteBuffer[] _gather2=new ByteBuffer[2];
protected final Socket _socket;
protected final InetSocketAddress _local;
protected final InetSocketAddress _remote;
protected volatile int _maxIdleTime;
private final ByteChannel _channel;
private final Socket _socket;
private final InetSocketAddress _local;
private final InetSocketAddress _remote;
private volatile int _maxIdleTime;
private volatile boolean _ishut;
private volatile boolean _oshut;
private Connection _connection;
private boolean _idleCheck;
public ChannelEndPoint(ByteChannel channel) throws IOException
@ -223,19 +224,10 @@ public class ChannelEndPoint implements EndPoint
}
}
/* (non-Javadoc)
* @see org.eclipse.io.EndPoint#flush(org.eclipse.io.Buffer)
*/
public int flush(ByteBuffer buffer) throws IOException
{
int len=_channel.write(buffer);
return len;
}
/* (non-Javadoc)
* @see org.eclipse.io.EndPoint#flush(org.eclipse.io.Buffer, org.eclipse.io.Buffer, org.eclipse.io.Buffer)
*/
public int gather(ByteBuffer... buffers) throws IOException
public int flush(ByteBuffer... buffers) throws IOException
{
int len=0;
if (_channel instanceof GatheringByteChannel)
@ -283,13 +275,16 @@ public class ChannelEndPoint implements EndPoint
}
/* ------------------------------------------------------------ */
/*
* @see org.eclipse.io.EndPoint#getConnection()
*/
public Object getTransport()
{
return _channel;
}
/* ------------------------------------------------------------ */
public Socket getSocket()
{
return _socket;
}
/* ------------------------------------------------------------ */
public int getMaxIdleTime()
@ -303,8 +298,39 @@ public class ChannelEndPoint implements EndPoint
*/
public void setMaxIdleTime(int timeMs) throws IOException
{
if (_socket!=null && timeMs!=_maxIdleTime)
_socket.setSoTimeout(timeMs>0?timeMs:0);
//if (_socket!=null && timeMs!=_maxIdleTime)
// _socket.setSoTimeout(timeMs>0?timeMs:0);
_maxIdleTime=timeMs;
}
@Override
public Connection getConnection()
{
return _connection;
}
@Override
public void setConnection(Connection connection)
{
_connection=connection;
}
@Override
public void onIdleExpired(long idleForMs)
{
}
@Override
public void setCheckForIdle(boolean check)
{
_idleCheck=check;
}
@Override
public boolean isCheckForIdle()
{
return _idleCheck;
}
}

View File

@ -15,25 +15,22 @@ package org.eclipse.jetty.io.nio;
import java.io.IOException;
import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.io.EndPoint;
public interface AsyncConnection
public interface Connection
{
void onInputShutdown() throws IOException;
AsyncEndPoint getAsyncEndPoint();
EndPoint getEndPoint();
/* ------------------------------------------------------------ */
/**
* Handle the connection.
* @return The Connection to use for the next handling of the connection.
* This allows protocol upgrades and support for CONNECT.
* @throws IOException
*/
AsyncConnection handle() throws IOException;
void canRead();
void canWrite();
boolean isReadInterested();
boolean isWriteInterested();
void onInputShutdown() throws IOException;
/**
* Called when the connection is closed
*/

View File

@ -44,23 +44,16 @@ public class NetworkTrafficSelectChannelEndPoint extends SelectChannelEndPoint
}
@Override
public int flush(ByteBuffer buffer) throws IOException
{
int position = buffer.position();
int written = super.flush(buffer);
notifyOutgoing(buffer, position, written);
return written;
}
@Override
public int gather(ByteBuffer... buffers) throws IOException
public int flush(ByteBuffer... buffers) throws IOException
{
int written=0;
for (ByteBuffer b : buffers)
{
if (b.hasRemaining())
{
int l = flush(b);
int position = b.position();
int l = super.flush(b);
notifyOutgoing(b, position, l);
if (l==0)
break;
else
@ -79,7 +72,7 @@ public class NetworkTrafficSelectChannelEndPoint extends SelectChannelEndPoint
{
try
{
listener.opened(_socket);
listener.opened(getSocket());
}
catch (Exception x)
{
@ -98,7 +91,7 @@ public class NetworkTrafficSelectChannelEndPoint extends SelectChannelEndPoint
try
{
ByteBuffer view = buffer.asReadOnlyBuffer();
listener.incoming(_socket, view);
listener.incoming(getSocket(), view);
}
catch (Exception x)
{
@ -119,7 +112,7 @@ public class NetworkTrafficSelectChannelEndPoint extends SelectChannelEndPoint
ByteBuffer view = buffer.slice();
view.position(position);
view.limit(position + written);
listener.outgoing(_socket, view);
listener.outgoing(getSocket(), view);
}
catch (Exception x)
{
@ -137,7 +130,7 @@ public class NetworkTrafficSelectChannelEndPoint extends SelectChannelEndPoint
{
try
{
listener.closed(_socket);
listener.closed(getSocket());
}
catch (Exception x)
{

View File

@ -14,14 +14,13 @@
package org.eclipse.jetty.io.nio;
import java.io.IOException;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.nio.SelectorManager.SelectSet;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
@ -31,45 +30,22 @@ import org.eclipse.jetty.util.thread.Timeout.Task;
/**
* An Endpoint that can be scheduled by {@link SelectorManager}.
*/
public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPoint
public class SelectChannelEndPoint extends ChannelEndPoint implements EndPoint
{
public static final Logger LOG=Log.getLogger("org.eclipse.jetty.io.nio");
private final SelectorManager.SelectSet _selectSet;
private final SelectorManager _manager;
private SelectionKey _key;
private final Runnable _handler = new Runnable()
{
public void run() { handle(); }
};
/** The desired value for {@link SelectionKey#interestOps()} */
private int _interestOps;
/** true if a thread has been dispatched to handle this endpoint */
private boolean _dispatched = false;
/** true if a non IO dispatch (eg async resume) is outstanding */
private boolean _asyncDispatch = false;
/** true if the last write operation succeed and wrote all offered bytes */
private volatile boolean _writable = true;
/** True if a thread has is blocked in {@link #blockReadable(long)} */
private boolean _readBlocked;
/** True if a thread has is blocked in {@link #blockWritable(long)} */
private boolean _writeBlocked;
/** true if {@link SelectSet#destroyEndPoint(SelectChannelEndPoint)} has not been called */
private boolean _open;
private volatile long _idleTimestamp;
private boolean _ishut;
private volatile AsyncConnection _connection;
private volatile Connection _connection;
/* ------------------------------------------------------------ */
public SelectChannelEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key, int maxIdleTime)
@ -79,8 +55,6 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
_manager = selectSet.getManager();
_selectSet = selectSet;
_dispatched = false;
_asyncDispatch = false;
_open=true;
_key = key;
@ -104,12 +78,12 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
/* ------------------------------------------------------------ */
public void setAsyncConnection(AsyncConnection connection)
public void setConnection(Connection connection)
{
AsyncConnection old=getAsyncConnection();
Connection old=getConnection();
_connection=connection;
if (old!=null && old!=connection)
_manager.endPointUpgraded(this,(AsyncConnection)old);
_manager.endPointUpgraded(this,(Connection)old);
}
/* ------------------------------------------------------------ */
@ -122,117 +96,29 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
/** Called by selectSet to schedule handling
*
*/
public void schedule()
public void selected()
{
final boolean can_read;
final boolean can_write;
synchronized (this)
{
// If there is no key, then do nothing
if (_key == null || !_key.isValid())
{
_readBlocked=false;
_writeBlocked=false;
this.notifyAll();
return;
}
// If there are threads dispatched reading and writing
if (_readBlocked || _writeBlocked)
{
// assert _dispatched;
if (_readBlocked && _key.isReadable())
_readBlocked=false;
if (_writeBlocked && _key.isWritable())
_writeBlocked=false;
// wake them up is as good as a dispatched.
this.notifyAll();
// we are not interested in further selecting
_key.interestOps(0);
if (!_dispatched)
updateKey();
return;
}
// Remove writeable op
if ((_key.readyOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE && (_key.interestOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE)
{
// Remove writeable op
_interestOps = _key.interestOps() & ~SelectionKey.OP_WRITE;
_key.interestOps(_interestOps);
_writable = true; // Once writable is in ops, only removed with dispatch.
}
// If dispatched, then deregister interest
if (_dispatched)
_key.interestOps(0);
else
{
// other wise do the dispatch
dispatch();
if (_dispatched && !_selectSet.getManager().isDeferringInterestedOps0())
{
_key.interestOps(0);
}
}
can_read=(_key.isReadable() && (_key.interestOps()|SelectionKey.OP_READ)!=0);
can_write=(_key.isWritable() && (_key.interestOps()|SelectionKey.OP_WRITE)!=0);
_interestOps=0;
_key.interestOps(0);
}
}
/* ------------------------------------------------------------ */
public void asyncDispatch()
{
synchronized(this)
{
if (_dispatched)
_asyncDispatch=true;
else
dispatch();
}
}
/* ------------------------------------------------------------ */
public void dispatch()
{
synchronized(this)
{
if (_dispatched)
{
throw new IllegalStateException("dispatched");
}
else
{
_dispatched = true;
boolean dispatched = _manager.dispatch(_handler);
if(!dispatched)
{
_dispatched = false;
LOG.warn("Dispatched Failed! "+this+" to "+_manager);
updateKey();
}
}
}
}
/* ------------------------------------------------------------ */
/**
* Called when a dispatched thread is no longer handling the endpoint.
* The selection key operations are updated.
* @return If false is returned, the endpoint has been redispatched and
* thread must keep handling the endpoint.
*/
protected boolean undispatch()
{
synchronized (this)
{
if (_asyncDispatch)
{
_asyncDispatch=false;
return false;
}
_dispatched = false;
updateKey();
}
return true;
if (can_read)
getConnection().canRead();
if (can_write)
getConnection().canWrite();
}
/* ------------------------------------------------------------ */
@ -270,12 +156,13 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
public void checkIdleTimestamp(long now)
{
long idleTimestamp=_idleTimestamp;
long max_idle_time=getMaxIdleTime();
if (idleTimestamp!=0 && _maxIdleTime>0)
if (idleTimestamp!=0 && max_idle_time>0)
{
long idleForMs=now-idleTimestamp;
if (idleForMs>_maxIdleTime)
if (idleForMs>max_idle_time)
{
onIdleExpired(idleForMs);
_idleTimestamp=now;
@ -286,7 +173,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
/* ------------------------------------------------------------ */
public void onIdleExpired(long idleForMs)
{
getAsyncConnection().onIdleExpired(idleForMs);
getConnection().onIdleExpired(idleForMs);
}
/* ------------------------------------------------------------ */
@ -301,179 +188,22 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
/* ------------------------------------------------------------ */
@Override
public int gather(ByteBuffer... buffers) throws IOException
public int flush(ByteBuffer... buffers) throws IOException
{
int l = super.gather(buffers);
if (l==0)
{
for (ByteBuffer b: buffers)
{
if (b.hasRemaining())
{
synchronized (this)
{
if (_dispatched)
_writable=false;
}
}
}
}
else if (l>0)
{
_writable=true;
int l = super.flush(buffers);
if (l>0)
notIdle();
}
return l;
}
/* ------------------------------------------------------------ */
/*
*/
@Override
public int flush(ByteBuffer buffer) throws IOException
{
int l = super.flush(buffer);
// If there was something to write and it wasn't written, then we are not writable.
if (l==0 && buffer!=null && buffer.remaining()>0)
{
synchronized (this)
{
if (_dispatched)
_writable=false;
}
}
else if (l>0)
{
_writable=true;
notIdle();
}
return l;
}
/* ------------------------------------------------------------ */
/*
* Allows thread to block waiting for further events.
*/
@Deprecated
public boolean blockReadable(long timeoutMs) throws IOException
{
synchronized (this)
{
if (isInputShutdown())
throw new EofException();
long now=_selectSet.getNow();
long end=now+timeoutMs;
boolean check=isCheckForIdle();
setCheckForIdle(true);
try
{
_readBlocked=true;
while (!isInputShutdown() && _readBlocked)
{
try
{
updateKey();
this.wait(timeoutMs>=0?(end-now):10000);
}
catch (InterruptedException e)
{
LOG.warn(e);
}
finally
{
now=_selectSet.getNow();
}
if (_readBlocked && timeoutMs>0 && now>=end)
return false;
}
}
finally
{
_readBlocked=false;
setCheckForIdle(check);
}
}
return true;
}
/* ------------------------------------------------------------ */
/*
* Allows thread to block waiting for further events.
*/
@Deprecated
public boolean blockWritable(long timeoutMs) throws IOException
{
synchronized (this)
{
if (isOutputShutdown())
throw new EofException();
long now=_selectSet.getNow();
long end=now+timeoutMs;
boolean check=isCheckForIdle();
setCheckForIdle(true);
try
{
_writeBlocked=true;
while (_writeBlocked && !isOutputShutdown())
{
try
{
updateKey();
this.wait(timeoutMs>=0?(end-now):10000);
}
catch (InterruptedException e)
{
LOG.warn(e);
}
finally
{
now=_selectSet.getNow();
}
if (_writeBlocked && timeoutMs>0 && now>=end)
return false;
}
}
finally
{
_writeBlocked=false;
setCheckForIdle(check);
}
}
return true;
}
/* ------------------------------------------------------------ */
/**
* @see org.eclipse.jetty.io.AsyncEndPoint#scheduleWrite()
*/
public void scheduleWrite()
{
if (_writable==true)
LOG.debug("Required scheduleWrite {}",this);
_writable=false;
updateKey();
}
/* ------------------------------------------------------------ */
public boolean isWritable()
{
return _writable;
}
/* ------------------------------------------------------------ */
/**
* Updates selection key. Adds operations types to the selection key as needed. No operations
* are removed as this is only done during dispatch. This method records the new key and
* schedules a call to doUpdateKey to do the keyChange
*/
private void updateKey()
public void updateKey()
{
final boolean changed;
synchronized (this)
@ -481,12 +211,11 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
int current_ops=-1;
if (getChannel().isOpen())
{
boolean read_interest = _readBlocked || (!_dispatched && getAsyncConnection().isReadInterested());
boolean write_interest= _writeBlocked || (!_dispatched && !_writable);
Socket socket = getSocket();
boolean read_interest = getConnection().isReadInterested() && !socket.isInputShutdown();
boolean write_interest= getConnection().isWriteInterested() && !socket.isOutputShutdown();
_interestOps =
((!_socket.isInputShutdown() && read_interest ) ? SelectionKey.OP_READ : 0)
| ((!_socket.isOutputShutdown()&& write_interest) ? SelectionKey.OP_WRITE : 0);
_interestOps = (read_interest?SelectionKey.OP_READ:0)|(write_interest?SelectionKey.OP_WRITE:0);
try
{
current_ops = ((_key!=null && _key.isValid())?_key.interestOps():-1);
@ -578,91 +307,6 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
}
}
/* ------------------------------------------------------------ */
/*
*/
protected void handle()
{
boolean dispatched=true;
try
{
while(dispatched)
{
try
{
while(true)
{
final AsyncConnection next = (AsyncConnection)getAsyncConnection().handle();
if (next!=getAsyncConnection())
{
LOG.debug("{} replaced {}",next,getAsyncConnection());
AsyncConnection old=getAsyncConnection();
setAsyncConnection(next);
_manager.endPointUpgraded(this,old);
continue;
}
break;
}
}
catch (ClosedChannelException e)
{
LOG.ignore(e);
}
catch (EofException e)
{
LOG.debug("EOF", e);
try{close();}
catch(IOException e2){LOG.ignore(e2);}
}
catch (IOException e)
{
LOG.warn(e.toString());
try{close();}
catch(IOException e2){LOG.ignore(e2);}
}
catch (Throwable e)
{
LOG.warn("handle failed", e);
try{close();}
catch(IOException e2){LOG.ignore(e2);}
}
finally
{
if (!_ishut && isInputShutdown() && isOpen())
{
_ishut=true;
try
{
getAsyncConnection().onInputShutdown();
}
catch(Throwable x)
{
LOG.warn("onInputShutdown failed", x);
try{close();}
catch(IOException e2){LOG.ignore(e2);}
}
finally
{
updateKey();
}
}
dispatched=!undispatch();
}
}
}
finally
{
if (dispatched)
{
dispatched=!undispatch();
while (dispatched)
{
LOG.warn("SCEP.run() finally DISPATCHED");
dispatched=!undispatch();
}
}
}
}
/* ------------------------------------------------------------ */
/*
@ -712,20 +356,18 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
{
keyString += "-";
}
return String.format("SCEP@%x{l(%s)<->r(%s),d=%b,open=%b,ishut=%b,oshut=%b,rb=%b,wb=%b,w=%b,i=%d%s}-{%s}",
return String.format("SCEP@%x{l(%s)<->r(%s),open=%b,ishut=%b,oshut=%b,i=%d%s}-{%s}",
hashCode(),
_socket.getRemoteSocketAddress(),
_socket.getLocalSocketAddress(),
_dispatched,
getRemoteAddress(),
getLocalAddress(),
isOpen(),
isInputShutdown(),
isOutputShutdown(),
_readBlocked,
_writeBlocked,
_writable,
_interestOps,
keyString,
getAsyncConnection());
getConnection());
}
/* ------------------------------------------------------------ */
@ -735,19 +377,8 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
}
/* ------------------------------------------------------------ */
/**
* Don't set the SoTimeout
* @see org.eclipse.jetty.io.nio.ChannelEndPoint#setMaxIdleTime(int)
*/
@Override
public void setMaxIdleTime(int timeMs) throws IOException
{
_maxIdleTime=timeMs;
}
/* ------------------------------------------------------------ */
@Override
public AsyncConnection getAsyncConnection()
public Connection getConnection()
{
return _connection;
}

View File

@ -31,7 +31,6 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.TypeUtil;
import org.eclipse.jetty.util.component.AbstractLifeCycle;
@ -339,10 +338,10 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
protected abstract void endPointOpened(SelectChannelEndPoint endpoint);
/* ------------------------------------------------------------ */
protected abstract void endPointUpgraded(EndPoint endpoint,AsyncConnection oldConnection);
protected abstract void endPointUpgraded(EndPoint endpoint,Connection oldConnection);
/* ------------------------------------------------------------------------------- */
public abstract AsyncConnection newConnection(SocketChannel channel, AsyncEndPoint endpoint, Object attachment);
public abstract Connection newConnection(SocketChannel channel, EndPoint endpoint, Object attachment);
/* ------------------------------------------------------------ */
/**
@ -473,7 +472,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
key = channel.register(selector,SelectionKey.OP_READ,att);
SelectChannelEndPoint endpoint = createEndPoint((SocketChannel)channel,key);
key.attach(endpoint);
endpoint.schedule();
endpoint.selected();
}
else if (channel.isOpen())
{
@ -488,7 +487,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
key = channel.register(selector,SelectionKey.OP_READ,null);
SelectChannelEndPoint endpoint = createEndPoint(channel,key);
key.attach(endpoint);
endpoint.schedule();
endpoint.selected();
}
else if (change instanceof ChangeTask)
{
@ -609,7 +608,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
if (att instanceof SelectChannelEndPoint)
{
if (key.isReadable()||key.isWritable())
((SelectChannelEndPoint)att).schedule();
((SelectChannelEndPoint)att).selected();
}
else if (key.isConnectable())
{
@ -631,7 +630,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
key.interestOps(SelectionKey.OP_READ);
SelectChannelEndPoint endpoint = createEndPoint(channel,key);
key.attach(endpoint);
endpoint.schedule();
endpoint.selected();
}
else
{
@ -646,7 +645,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
SelectChannelEndPoint endpoint = createEndPoint(channel,key);
key.attach(endpoint);
if (key.isReadable())
endpoint.schedule();
endpoint.selected();
}
key = null;
}

View File

@ -24,13 +24,11 @@ import javax.net.ssl.SSLEngineResult.HandshakeStatus;
import javax.net.ssl.SSLException;
import javax.net.ssl.SSLSession;
import org.eclipse.jetty.io.AbstractAsyncConnection;
import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Timeout.Task;
/* ------------------------------------------------------------ */
/** SSL Connection.
@ -41,7 +39,7 @@ import org.eclipse.jetty.util.thread.Timeout.Task;
* it's source/sink of encrypted data. It then provides {@link #getSslEndPoint()} to
* expose a source/sink of unencrypted data to another connection (eg HttpConnection).
*/
public class SslConnection extends AbstractAsyncConnection
public class SslConnection extends AbstractConnection
{
private final Logger _logger = Log.getLogger("org.eclipse.jetty.io.nio.ssl");
@ -50,7 +48,7 @@ public class SslConnection extends AbstractAsyncConnection
private static final ThreadLocal<SslBuffers> __buffers = new ThreadLocal<SslBuffers>();
private final SSLEngine _engine;
private final SSLSession _session;
private AsyncConnection _connection;
private Connection _connection;
private final SslEndPoint _sslEndPoint;
private int _allocations;
private SslBuffers _buffers;
@ -104,7 +102,7 @@ public class SslConnection extends AbstractAsyncConnection
}
/* ------------------------------------------------------------ */
public AsyncEndPoint getAsyncEndPoint()
public EndPoint getEndPoint()
{
return _aEndp;
}
@ -182,7 +180,7 @@ public class SslConnection extends AbstractAsyncConnection
}
/* ------------------------------------------------------------ */
public AsyncConnection handle() throws IOException
public void canRead() throws IOException
{
try
{
@ -199,12 +197,7 @@ public class SslConnection extends AbstractAsyncConnection
progress=process(null,null);
// handle the delegate connection
AsyncConnection next = (AsyncConnection)_connection.handle();
if (next!=_connection && next!=null)
{
_connection=next;
progress=true;
}
_connection.canRead();
_logger.debug("{} handle {} progress={}", _session, this, progress);
}
@ -229,10 +222,15 @@ public class SslConnection extends AbstractAsyncConnection
}
}
}
return this;
}
/* ------------------------------------------------------------ */
public void canWrite() throws IOException
{
// TODO
}
/* ------------------------------------------------------------ */
public boolean isIdle()
{
@ -623,21 +621,17 @@ public class SslConnection extends AbstractAsyncConnection
return filled;
}
public int flush(ByteBuffer buffer) throws IOException
{
int size = buffer.remaining();
process(null, buffer);
return size-buffer.remaining();
}
public int gather(ByteBuffer... buffers) throws IOException
public int flush(ByteBuffer... buffers) throws IOException
{
int len=0;
for (ByteBuffer b : buffers)
{
if (b.hasRemaining())
{
int l=flush(b);
int l = b.remaining();
process(null, b);
l=l-b.remaining();
if (l>0)
len+=l;
else
@ -647,29 +641,6 @@ public class SslConnection extends AbstractAsyncConnection
return len;
}
/*
public boolean blockReadable(long millisecs) throws IOException
{
long now = System.currentTimeMillis();
long end=millisecs>0?(now+millisecs):Long.MAX_VALUE;
while (now<end)
{
if (process(null,null))
break;
_aEndp.blockReadable(end-now);
now = System.currentTimeMillis();
}
return now<end;
}
public boolean blockWritable(long millisecs) throws IOException
{
return _aEndp.blockWritable(millisecs);
}
*/
public boolean isOpen()
{
return _endp.isOpen();
@ -685,16 +656,6 @@ public class SslConnection extends AbstractAsyncConnection
process(null, null);
}
public void asyncDispatch()
{
_aEndp.asyncDispatch();
}
public void scheduleWrite()
{
_aEndp.scheduleWrite();
}
public void onIdleExpired(long idleForMs)
{
_aEndp.onIdleExpired(idleForMs);
@ -710,21 +671,6 @@ public class SslConnection extends AbstractAsyncConnection
return _aEndp.isCheckForIdle();
}
public void scheduleTimeout(Task task, long timeoutMs)
{
_aEndp.scheduleTimeout(task,timeoutMs);
}
public void cancelTimeout(Task task)
{
_aEndp.cancelTimeout(task);
}
public boolean isWritable()
{
return _aEndp.isWritable();
}
public InetSocketAddress getLocalAddress()
{
return _aEndp.getLocalAddress();
@ -750,14 +696,14 @@ public class SslConnection extends AbstractAsyncConnection
_aEndp.setMaxIdleTime(timeMs);
}
public AsyncConnection getAsyncConnection()
public Connection getConnection()
{
return _connection;
}
public void setAsyncConnection(AsyncConnection connection)
public void setConnection(Connection connection)
{
_connection=(AsyncConnection)connection;
_connection=(Connection)connection;
}
public String toString()
@ -777,6 +723,5 @@ public class SslConnection extends AbstractAsyncConnection
_ishut, _oshut,
_connection);
}
}
}

View File

@ -11,8 +11,6 @@ import javax.net.ssl.SSLEngineResult;
import javax.net.ssl.SSLEngineResult.HandshakeStatus;
import javax.net.ssl.SSLSocket;
import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.toolchain.test.MavenTestingUtils;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.junit.Assert;
@ -43,14 +41,14 @@ public class SelectChannelEndPointSslTest extends SelectChannelEndPointTest
}
@Override
protected AsyncConnection newConnection(SocketChannel channel, AsyncEndPoint endpoint)
protected Connection newConnection(SocketChannel channel, AsyncEndPoint endpoint)
{
SSLEngine engine = __sslCtxFactory.newSslEngine();
engine.setUseClientMode(false);
SslConnection connection = new SslConnection(engine,endpoint);
AsyncConnection delegate = super.newConnection(channel,connection.getSslEndPoint());
connection.getSslEndPoint().setAsyncConnection(delegate);
Connection delegate = super.newConnection(channel,connection.getSslEndPoint());
connection.getSslEndPoint().setConnection(delegate);
return connection;
}

View File

@ -20,8 +20,7 @@ import java.nio.channels.SocketChannel;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.io.AbstractAsyncConnection;
import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.StringUtil;
@ -55,12 +54,12 @@ public class SelectChannelEndPointTest
}
@Override
protected void endPointUpgraded(EndPoint endpoint, AsyncConnection oldConnection)
protected void endPointUpgraded(EndPoint endpoint, Connection oldConnection)
{
}
@Override
public AsyncConnection newConnection(SocketChannel channel, AsyncEndPoint endpoint, Object attachment)
public Connection newConnection(SocketChannel channel, EndPoint endpoint, Object attachment)
{
return SelectChannelEndPointTest.this.newConnection(channel,endpoint);
}
@ -69,7 +68,7 @@ public class SelectChannelEndPointTest
protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key) throws IOException
{
SelectChannelEndPoint endp = new SelectChannelEndPoint(channel,selectSet,key,2000);
endp.setAsyncConnection(selectSet.getManager().newConnection(channel,endp, key.attachment()));
endp.setConnection(selectSet.getManager().newConnection(channel,endp, key.attachment()));
_lastEndp=endp;
return endp;
}
@ -100,22 +99,22 @@ public class SelectChannelEndPointTest
return new Socket(_connector.socket().getInetAddress(),_connector.socket().getLocalPort());
}
protected AsyncConnection newConnection(SocketChannel channel, AsyncEndPoint endpoint)
protected Connection newConnection(SocketChannel channel, EndPoint endpoint)
{
return new TestConnection(endpoint);
}
public class TestConnection extends AbstractAsyncConnection implements AsyncConnection
public class TestConnection extends AbstractConnection implements Connection
{
ByteBuffer _in = BufferUtil.allocate(32*1024);
ByteBuffer _out = BufferUtil.allocate(32*1024);
public TestConnection(AsyncEndPoint endp)
public TestConnection(EndPoint endp)
{
super(endp);
}
public AsyncConnection handle() throws IOException
public void canRead()
{
boolean progress=true;
while(progress)
@ -146,9 +145,13 @@ public class SelectChannelEndPointTest
if (BufferUtil.isEmpty(_out) && _endp.isInputShutdown())
_endp.shutdownOutput();
}
return this;
}
public void canWrite()
{
}
public boolean isIdle()
{
return false;
@ -161,7 +164,7 @@ public class SelectChannelEndPointTest
}
@Override
public AsyncEndPoint getAsyncEndPoint()
public EndPoint getEndPoint()
{
return _endp;
}

View File

@ -30,7 +30,7 @@ import org.eclipse.jetty.io.Buffers;
import org.eclipse.jetty.io.Buffers.Type;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.io.nio.AsyncConnection;
import org.eclipse.jetty.io.nio.Connection;
import org.eclipse.jetty.util.component.AggregateLifeCycle;
import org.eclipse.jetty.util.component.Dumpable;
import org.eclipse.jetty.util.log.Log;
@ -1125,7 +1125,7 @@ public abstract class AbstractConnector extends AggregateLifeCycle implements Ht
}
/* ------------------------------------------------------------ */
protected void connectionOpened(AsyncConnection connection)
protected void connectionOpened(Connection connection)
{
if (_statsStartedAt.get() == -1)
return;
@ -1134,13 +1134,13 @@ public abstract class AbstractConnector extends AggregateLifeCycle implements Ht
}
/* ------------------------------------------------------------ */
protected void connectionUpgraded(AsyncConnection oldConnection, AsyncConnection newConnection)
protected void connectionUpgraded(Connection oldConnection, Connection newConnection)
{
_requestStats.set((oldConnection instanceof HttpConnection)?((HttpConnection)oldConnection).getHttpChannel().getRequests():0);
}
/* ------------------------------------------------------------ */
protected void connectionClosed(AsyncConnection connection)
protected void connectionClosed(Connection connection)
{
connection.onClose();

View File

@ -819,7 +819,7 @@ public class AsyncContinuation implements AsyncContext, Continuation
{
synchronized (this)
{
return (_event!=null && _event.getSuppliedRequest()==_connection._request && _event.getSuppliedResponse()==_connection._response);
return (_event!=null && _event.getSuppliedRequest()==_connection.getRequest() && _event.getSuppliedResponse()==_connection.getResponse());
}
}

View File

@ -478,7 +478,7 @@ public class HttpChannel
/* ------------------------------------------------------------ */
/**
* @see org.eclipse.jetty.io.AsyncConnection#isSuspended()
* @see org.eclipse.jetty.io.Connection#isSuspended()
*/
public boolean isSuspended()
{

View File

@ -27,18 +27,17 @@ import org.eclipse.jetty.http.HttpGenerator;
import org.eclipse.jetty.http.HttpGenerator.Action;
import org.eclipse.jetty.http.HttpParser;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.io.AbstractAsyncConnection;
import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.io.nio.AsyncConnection;
import org.eclipse.jetty.io.nio.Connection;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
/**
*/
public abstract class HttpConnection extends AbstractAsyncConnection
public abstract class HttpConnection extends AbstractConnection
{
private static final Logger LOG = Log.getLogger(HttpConnection.class);
@ -74,7 +73,7 @@ public abstract class HttpConnection extends AbstractAsyncConnection
/** Constructor
*
*/
public HttpConnection(Connector connector, AsyncEndPoint endpoint, Server server)
public HttpConnection(Connector connector, EndPoint endpoint, Server server)
{
super(endpoint);
_connector = connector;
@ -179,12 +178,26 @@ public abstract class HttpConnection extends AbstractAsyncConnection
_parser);
}
/* ------------------------------------------------------------ */
@Override
public AsyncConnection handle() throws IOException
public void canRead()
{
AsyncConnection connection = this;
}
/* ------------------------------------------------------------ */
@Override
public void canWrite()
{
}
/* ------------------------------------------------------------ */
public void processInput()
{
Connection connection = this;
boolean progress=true;
try
@ -192,7 +205,7 @@ public abstract class HttpConnection extends AbstractAsyncConnection
setCurrentConnection(this);
// don't check for idle while dispatched (unless blocking IO is done).
getAsyncEndPoint().setCheckForIdle(false);
getEndPoint().setCheckForIdle(false);
// While progress and the connection has not changed
@ -201,15 +214,11 @@ public abstract class HttpConnection extends AbstractAsyncConnection
progress=false;
try
{
// Shall we try some reading
if (isReadInterested())
{
// We will need a buffer to read into
if (_requestBuffer==null)
_requestBuffer=_parser.isInContent()
?_connector.getRequestBuffers().getBuffer()
:_connector.getRequestBuffers().getHeader();
}
// We will need a buffer to read into
if (_requestBuffer==null)
_requestBuffer=_parser.isInContent()
?_connector.getRequestBuffers().getBuffer()
:_connector.getRequestBuffers().getHeader();
// If we parse to an event, call the connection
if (BufferUtil.hasContent(_requestBuffer) && _parser.parseNext(_requestBuffer))
@ -236,7 +245,7 @@ public abstract class HttpConnection extends AbstractAsyncConnection
// look for a switched connection instance?
if (_channel.getResponse().getStatus()==HttpStatus.SWITCHING_PROTOCOLS_101)
{
AsyncConnection switched=(AsyncConnection)_channel.getRequest().getAttribute("org.eclipse.jetty.io.Connection");
Connection switched=(Connection)_channel.getRequest().getAttribute("org.eclipse.jetty.io.Connection");
if (switched!=null)
connection=switched;
}
@ -255,6 +264,10 @@ public abstract class HttpConnection extends AbstractAsyncConnection
}
}
}
catch(IOException e)
{
// TODO
}
finally
{
setCurrentConnection(null);
@ -263,10 +276,9 @@ public abstract class HttpConnection extends AbstractAsyncConnection
if (!_channel.getRequest().getAsyncContinuation().isAsyncStarted())
{
// reenable idle checking unless request is suspended
getAsyncEndPoint().setCheckForIdle(true);
getEndPoint().setCheckForIdle(true);
}
}
return connection;
}
@ -336,7 +348,7 @@ public abstract class HttpConnection extends AbstractAsyncConnection
break;
case SHUTDOWN_OUT:
getAsyncEndPoint().shutdownOutput();
getEndPoint().shutdownOutput();
break;
case OK:
@ -360,39 +372,39 @@ public abstract class HttpConnection extends AbstractAsyncConnection
switch(_toFlush)
{
case 10:
_endp.gather(_responseHeader,_responseBuffer);
_endp.flush(_responseHeader,_responseBuffer);
_toFlush=(BufferUtil.hasContent(_responseHeader)?8:0)+(BufferUtil.hasContent(_responseBuffer)?2:0);
break;
case 9:
_endp.gather(_responseHeader,_content);
_endp.flush(_responseHeader,_content);
_toFlush=(BufferUtil.hasContent(_responseHeader)?8:0)+(BufferUtil.hasContent(_content)?1:0);
if (_toFlush==0)
_content=null;
break;
case 8:
_endp.gather(_responseHeader);
_endp.flush(_responseHeader);
_toFlush=(BufferUtil.hasContent(_responseHeader)?8:0);
break;
case 6:
_endp.gather(_chunk,_responseBuffer);
_endp.flush(_chunk,_responseBuffer);
_toFlush=(BufferUtil.hasContent(_chunk)?4:0)+(BufferUtil.hasContent(_responseBuffer)?2:0);
break;
case 5:
_endp.gather(_chunk,_content);
_endp.flush(_chunk,_content);
_toFlush=(BufferUtil.hasContent(_chunk)?4:0)+(BufferUtil.hasContent(_content)?1:0);
if (_toFlush==0)
_content=null;
break;
case 4:
_endp.gather(_chunk);
_endp.flush(_chunk);
_toFlush=(BufferUtil.hasContent(_chunk)?4:0);
break;
case 2:
_endp.gather(_responseBuffer);
_endp.flush(_responseBuffer);
_toFlush=(BufferUtil.hasContent(_responseBuffer)?2:0);
break;
case 1:
_endp.gather(_content);
_endp.flush(_content);
_toFlush=(BufferUtil.hasContent(_content)?1:0);
if (_toFlush==0)
_content=null;
@ -404,6 +416,7 @@ public abstract class HttpConnection extends AbstractAsyncConnection
if (!block)
break;
if (_toFlush>0)
blockUntilWritable(getMaxIdleTime());
@ -430,7 +443,11 @@ public abstract class HttpConnection extends AbstractAsyncConnection
if (_parser.isIdle())
_parser.setPersistent(false);
}
/* ------------------------------------------------------------ */
/* ------------------------------------------------------------ */
/* ------------------------------------------------------------ */
private final HttpTransport _transport = new HttpTransport()
{

View File

@ -20,6 +20,7 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.io.ByteArrayEndPoint;
import org.eclipse.jetty.io.nio.Connection;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
@ -106,7 +107,7 @@ public class LocalConnector extends AbstractConnector
ByteArrayEndPoint endPoint = new ByteArrayEndPoint(_requestsBuffer.asArray(), 1024)
{
@Override
public void setConnection(AsyncConnection connection)
public void setConnection(Connection connection)
{
if (getConnection()!=null && connection!=getConnection())
connectionUpgraded(getConnection(),connection);
@ -126,8 +127,8 @@ public class LocalConnector extends AbstractConnector
{
while (true)
{
final AsyncConnection con = endPoint.getConnection();
final AsyncConnection next = con.handle();
final Connection con = endPoint.getConnection();
final Connection next = con.handle();
if (next!=con)
{
endPoint.setConnection(next);

View File

@ -349,7 +349,7 @@ public class ConnectHandler extends HandlerWrapper
{
}
private void upgradeConnection(HttpServletRequest request, HttpServletResponse response, AsyncConnection connection) throws IOException
private void upgradeConnection(HttpServletRequest request, HttpServletResponse response, Connection connection) throws IOException
{
// Set the new connection as request attribute and change the status to 101
// so that Jetty understands that it has to upgrade the connection
@ -427,7 +427,7 @@ public class ConnectHandler extends HandlerWrapper
}
@Override
public AsyncConnection newConnection(SocketChannel channel, AsyncEndPoint endpoint, Object attachment)
public Connection newConnection(SocketChannel channel, AsyncEndPoint endpoint, Object attachment)
{
ProxyToServerConnection proxyToServer = (ProxyToServerConnection)attachment;
proxyToServer.setTimeStamp(System.currentTimeMillis());
@ -454,14 +454,14 @@ public class ConnectHandler extends HandlerWrapper
}
@Override
protected void endPointUpgraded(ConnectedEndPoint endpoint, AsyncConnection oldConnection)
protected void endPointUpgraded(ConnectedEndPoint endpoint, Connection oldConnection)
{
}
}
public class ProxyToServerConnection implements AsyncConnection
public class ProxyToServerConnection implements Connection
{
private final CountDownLatch _ready = new CountDownLatch(1);
private final ByteBuffer _buffer = new IndirectNIOBuffer(1024);
@ -486,7 +486,7 @@ public class ConnectHandler extends HandlerWrapper
return builder.append(")").toString();
}
public AsyncConnection handle() throws IOException
public Connection handle() throws IOException
{
_logger.debug("{}: begin reading from server", this);
try
@ -676,7 +676,7 @@ public class ConnectHandler extends HandlerWrapper
}
}
public class ClientToProxyConnection implements AsyncConnection
public class ClientToProxyConnection implements Connection
{
private final ByteBuffer _buffer = new IndirectNIOBuffer(1024);
private final ConcurrentMap<String, Object> _context;
@ -703,7 +703,7 @@ public class ConnectHandler extends HandlerWrapper
return builder.append(")").toString();
}
public AsyncConnection handle() throws IOException
public Connection handle() throws IOException
{
_logger.debug("{}: begin reading from client", this);
try

View File

@ -21,8 +21,8 @@ import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import org.eclipse.jetty.continuation.Continuation;
import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.nio.Connection;
import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
import org.eclipse.jetty.io.nio.SelectorManager;
import org.eclipse.jetty.io.nio.SelectorManager.SelectSet;
@ -125,8 +125,7 @@ public class SelectChannelConnector extends AbstractNIOConnector
@Override
public void persist(EndPoint endpoint) throws IOException
{
AsyncEndPoint aEndp = ((AsyncEndPoint)endpoint);
aEndp.setCheckForIdle(true);
endpoint.setCheckForIdle(true);
super.persist(endpoint);
}
@ -262,7 +261,7 @@ public class SelectChannelConnector extends AbstractNIOConnector
}
/* ------------------------------------------------------------------------------- */
protected AsyncConnection newConnection(SocketChannel channel,final AsyncEndPoint endpoint)
protected Connection newConnection(SocketChannel channel,final EndPoint endpoint)
{
return new AsyncHttpConnection(SelectChannelConnector.this,endpoint,getServer());
}
@ -296,13 +295,13 @@ public class SelectChannelConnector extends AbstractNIOConnector
}
@Override
protected void endPointUpgraded(ConnectedEndPoint endpoint, AsyncConnection oldConnection)
protected void endPointUpgraded(ConnectedEndPoint endpoint, Connection oldConnection)
{
connectionUpgraded(oldConnection,endpoint.getConnection());
}
@Override
public AsyncConnection newConnection(SocketChannel channel,AsyncEndPoint endpoint, Object attachment)
public Connection newConnection(SocketChannel channel,AsyncEndPoint endpoint, Object attachment)
{
return SelectChannelConnector.this.newConnection(channel,endpoint);
}

View File

@ -542,13 +542,13 @@ public class SslSelectChannelConnector extends SelectChannelConnector implements
/* ------------------------------------------------------------------------------- */
@Override
protected AsyncConnection newConnection(SocketChannel channel, AsyncEndPoint endpoint)
protected Connection newConnection(SocketChannel channel, AsyncEndPoint endpoint)
{
try
{
SSLEngine engine = createSSLEngine(channel);
SslConnection connection = newSslConnection(endpoint, engine);
AsyncConnection delegate = newPlainConnection(channel, connection.getSslEndPoint());
Connection delegate = newPlainConnection(channel, connection.getSslEndPoint());
connection.getSslEndPoint().setConnection(delegate);
connection.setAllowRenegotiate(_sslContextFactory.isAllowRenegotiate());
return connection;
@ -559,7 +559,7 @@ public class SslSelectChannelConnector extends SelectChannelConnector implements
}
}
protected AsyncConnection newPlainConnection(SocketChannel channel, AsyncEndPoint endPoint)
protected Connection newPlainConnection(SocketChannel channel, AsyncEndPoint endPoint)
{
return super.newConnection(channel, endPoint);
}

View File

@ -59,7 +59,7 @@ public class AbstractConnectorTest
_server = new Server();
_connector = new SelectChannelConnector()
{
public void connectionClosed(AsyncConnection connection)
public void connectionClosed(Connection connection)
{
super.connectionClosed(connection);
_closed.countDown();

View File

@ -76,7 +76,7 @@ public class HttpWriterTest
AbstractHttpConnection connection = new AbstractHttpConnection(null,endp,new Server(),null,generator,null)
{
@Override
public AsyncConnection handle() throws IOException
public Connection handle() throws IOException
{
return null;
}
@ -169,7 +169,7 @@ public class HttpWriterTest
AbstractHttpConnection connection = new AbstractHttpConnection(null,endp,new Server(),null,hb,null)
{
@Override
public AsyncConnection handle() throws IOException
public Connection handle() throws IOException
{
return null;
}

View File

@ -622,7 +622,7 @@ public class ResponseTest
}
@Override
public AsyncConnection handle() throws IOException
public Connection handle() throws IOException
{
return this;
}