357240 work in progress

This commit is contained in:
Greg Wilkins 2011-09-28 20:33:03 +10:00
parent d0ffa5cb1a
commit dbf636df4d
6 changed files with 74 additions and 46 deletions

View File

@ -34,7 +34,7 @@ import org.eclipse.jetty.util.log.Logger;
*/
public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPoint, ConnectedEndPoint
{
public static final Logger __log=Log.getLogger("org.eclipse.jetty.io.nio");
public static final Logger LOG=Log.getLogger("org.eclipse.jetty.io.nio");
private final SelectorManager.SelectSet _selectSet;
private final SelectorManager _manager;
@ -54,7 +54,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
private boolean _writeBlocked;
private boolean _open;
private volatile long _idleTimestamp;
/* ------------------------------------------------------------ */
public SelectChannelEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key, int maxIdleTime)
throws IOException
@ -206,7 +206,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
if(!dispatched)
{
_dispatched = false;
__log.warn("Dispatched Failed! "+this+" to "+_manager);
LOG.warn("Dispatched Failed! "+this+" to "+_manager);
updateKey();
}
}
@ -350,7 +350,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
}
catch (InterruptedException e)
{
__log.warn(e);
LOG.warn(e);
}
finally
{
@ -395,7 +395,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
}
catch (InterruptedException e)
{
__log.warn(e);
LOG.warn(e);
}
finally
{
@ -408,7 +408,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
catch(Throwable e)
{
// TODO remove this if it finds nothing
__log.warn(e);
LOG.warn(e);
if (e instanceof RuntimeException)
throw (RuntimeException)e;
if (e instanceof Error)
@ -424,10 +424,22 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
}
return true;
}
/* ------------------------------------------------------------ */
/* short cut for busyselectChannelServerTest */
public void clearWritable()
{
_writable=false;
}
/* ------------------------------------------------------------ */
public void scheduleWrite()
{
if (_writable==true)
{
LOG.warn("Required scheduleWrite");
}
_writable=false;
updateKey();
}
@ -455,7 +467,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
catch(Exception e)
{
_key=null;
__log.ignore(e);
LOG.ignore(e);
}
}
@ -493,7 +505,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
}
catch (Exception e)
{
__log.ignore(e);
LOG.ignore(e);
if (_key!=null && _key.isValid())
{
_key.cancel();
@ -555,7 +567,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
final Connection next = _connection.handle();
if (next!=_connection)
{
__log.debug("{} replaced {}",next,_connection);
LOG.debug("{} replaced {}",next,_connection);
_connection=next;
continue;
}
@ -564,26 +576,26 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
}
catch (ClosedChannelException e)
{
__log.ignore(e);
LOG.ignore(e);
}
catch (EofException e)
{
__log.debug("EOF", e);
LOG.debug("EOF", e);
try{getChannel().close();}
catch(IOException e2){__log.ignore(e2);}
catch(IOException e2){LOG.ignore(e2);}
}
catch (IOException e)
{
__log.warn(e.toString());
__log.debug(e);
LOG.warn(e.toString());
LOG.debug(e);
try{getChannel().close();}
catch(IOException e2){__log.ignore(e2);}
catch(IOException e2){LOG.ignore(e2);}
}
catch (Throwable e)
{
__log.warn("handle failed", e);
LOG.warn("handle failed", e);
try{getChannel().close();}
catch(IOException e2){__log.ignore(e2);}
catch(IOException e2){LOG.ignore(e2);}
}
dispatched=!undispatch();
}
@ -595,7 +607,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
dispatched=!undispatch();
while (dispatched)
{
__log.warn("SCEP.run() finally DISPATCHED");
LOG.warn("SCEP.run() finally DISPATCHED");
dispatched=!undispatch();
}
}
@ -615,7 +627,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
}
catch (IOException e)
{
__log.ignore(e);
LOG.ignore(e);
}
finally
{
@ -630,7 +642,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
synchronized(this)
{
return "SCEP@" + hashCode() + _channel+
"[d=" + _dispatched + ",io=" + _interestOps+
"[o="+isOpen()+" d=" + _dispatched + ",io=" + _interestOps+
",w=" + _writable + ",rb=" + _readBlocked + ",wb=" + _writeBlocked + "]";
}
}

View File

@ -938,28 +938,31 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
}
Selector selector=_selector;
final ArrayList<Object> dump = new ArrayList<Object>(selector.keys().size()*2);
dump.add(where);
if (selector!=null)
{
final ArrayList<Object> dump = new ArrayList<Object>(selector.keys().size()*2);
dump.add(where);
final CountDownLatch latch = new CountDownLatch(1);
final CountDownLatch latch = new CountDownLatch(1);
addChange(new ChangeTask(){
public void run()
addChange(new ChangeTask(){
public void run()
{
dumpKeyState(dump);
latch.countDown();
}
});
try
{
dumpKeyState(dump);
latch.countDown();
latch.await(5,TimeUnit.SECONDS);
}
});
try
{
latch.await(5,TimeUnit.SECONDS);
catch(InterruptedException e)
{
LOG.ignore(e);
}
AggregateLifeCycle.dump(out,indent,dump);
}
catch(InterruptedException e)
{
LOG.ignore(e);
}
AggregateLifeCycle.dump(out,indent,dump);
}
/* ------------------------------------------------------------ */

View File

@ -56,8 +56,8 @@ public class SslSelectChannelEndPoint extends SelectChannelEndPoint
private boolean _closing=false;
private SSLEngineResult _result;
private boolean _handshook=false;
private boolean _allowRenegotiate=false;
private volatile boolean _handshook=false;
private boolean _allowRenegotiate=true;
private final boolean _debug = LOG.isDebugEnabled(); // snapshot debug status for optimizer
@ -239,8 +239,6 @@ public class SslSelectChannelEndPoint extends SelectChannelEndPoint
case NOT_HANDSHAKING:
_handshook=true;
// If closing, don't process application data
if (_closing)
{
@ -388,9 +386,7 @@ public class SslSelectChannelEndPoint extends SelectChannelEndPoint
}
// return the number of unencrypted bytes filled.
int filled=buffer.length()-size;
if (filled>0)
_handshook=true;
else if (filled==0 && isInputShutdown())
if (filled==0 && isInputShutdown())
return -1;
return filled;
@ -533,6 +529,8 @@ public class SslSelectChannelEndPoint extends SelectChannelEndPoint
// Do the unwrap
_result=_engine.unwrap(in_buffer,buffer);
if (!_handshook && _result.getHandshakeStatus()==SSLEngineResult.HandshakeStatus.FINISHED)
_handshook=true;
if (_debug) LOG.debug(_session+" unwrap "+_result);
// skip the bytes consumed
@ -620,6 +618,8 @@ public class SslSelectChannelEndPoint extends SelectChannelEndPoint
_result=null;
_result=_engine.wrap(bbuf,out_buffer);
if (_debug) LOG.debug(_session+" wrap "+_result);
if (!_handshook && _result.getHandshakeStatus()==SSLEngineResult.HandshakeStatus.FINISHED)
_handshook=true;
_outNIOBuffer.setPutIndex(out_buffer.position());
consumed=_result.bytesConsumed();
}
@ -710,7 +710,7 @@ public class SslSelectChannelEndPoint extends SelectChannelEndPoint
{
final NIOBuffer i=_inNIOBuffer;
final NIOBuffer o=_outNIOBuffer;
return "SSL"+super.toString()+","+_engine.getHandshakeStatus()+", in/out="+
return "SSL"+super.toString()+","+(_engine==null?"-":_engine.getHandshakeStatus())+", in/out="+
(i==null?0:i.length())+"/"+(o==null?0:o.length())+
" bi/o="+isBufferingInput()+"/"+isBufferingOutput()+
" "+_result;

View File

@ -124,7 +124,7 @@ public class AsyncHttpConnection extends HttpConnection
_parser.returnBuffers();
// Are we write blocked
if (_generator.isCommitted() && !_generator.isComplete())
if (_generator.isCommitted() && !_generator.isComplete() && _endp.isOpen())
((AsyncEndPoint)_endp).scheduleWrite();
else
_generator.returnBuffers();

View File

@ -52,7 +52,10 @@ public class BusySelectChannelServerTest extends HttpServerTestBase
{
int x=write++&0xff;
if (x<8)
{
clearWritable();
return 0;
}
if (x<32)
return flush(header);
return super.flush(header,buffer,trailer);
@ -67,7 +70,10 @@ public class BusySelectChannelServerTest extends HttpServerTestBase
{
int x=write++&0xff;
if (x<8)
{
clearWritable();
return 0;
}
if (x<32)
{
View v = new View(buffer);
@ -75,6 +81,7 @@ public class BusySelectChannelServerTest extends HttpServerTestBase
int l=super.flush(v);
if (l>0)
buffer.skip(l);
clearWritable();
return l;
}
return super.flush(buffer);

View File

@ -45,6 +45,7 @@ public abstract class ConnectorTimeoutTest extends HttpServerTestFixture
@Test
public void testMaxIdleWithRequest10() throws Exception
{
System.err.println("testMaxIdleWithRequest10");
configureServer(new HelloWorldHandler());
Socket client=newSocket(HOST,_connector.getLocalPort());
client.setSoTimeout(10000);
@ -76,6 +77,7 @@ public abstract class ConnectorTimeoutTest extends HttpServerTestFixture
@Test
public void testMaxIdleWithRequest11() throws Exception
{
System.err.println("testMaxIdleWithRequest11");
configureServer(new EchoHandler());
Socket client=newSocket(HOST,_connector.getLocalPort());
client.setSoTimeout(10000);
@ -110,6 +112,7 @@ public abstract class ConnectorTimeoutTest extends HttpServerTestFixture
@Test
public void testMaxIdleNoRequest() throws Exception
{
System.err.println("testMaxIdleNoRequest");
configureServer(new EchoHandler());
Socket client=newSocket(HOST,_connector.getLocalPort());
client.setSoTimeout(10000);
@ -138,6 +141,7 @@ public abstract class ConnectorTimeoutTest extends HttpServerTestFixture
@Test
public void testMaxIdleWithSlowRequest() throws Exception
{
System.err.println("testMaxIdleWithSlowRequest");
configureServer(new EchoHandler());
Socket client=newSocket(HOST,_connector.getLocalPort());
client.setSoTimeout(10000);
@ -178,6 +182,7 @@ public abstract class ConnectorTimeoutTest extends HttpServerTestFixture
@Test
public void testMaxIdleWithSlowResponse() throws Exception
{
System.err.println("testMaxIdleWithSlowResponse");
configureServer(new SlowResponseHandler());
Socket client=newSocket(HOST,_connector.getLocalPort());
client.setSoTimeout(10000);
@ -207,6 +212,7 @@ public abstract class ConnectorTimeoutTest extends HttpServerTestFixture
@Test
public void testMaxIdleWithWait() throws Exception
{
System.err.println("testMaxIdleWithWait");
configureServer(new WaitHandler());
Socket client=newSocket(HOST,_connector.getLocalPort());
client.setSoTimeout(10000);