clean ups from review from sbordet.
Removed isBufferred fixed instanceof ThreadDeath avoid race with onInputShutdown
This commit is contained in:
parent
7e37d1a428
commit
d61258ec4e
|
@ -65,7 +65,7 @@ public class AsyncHttpConnection extends AbstractHttpConnection implements Async
|
|||
// While we are making progress and have not changed connection
|
||||
while (progress && connection==this)
|
||||
{
|
||||
LOG.debug("while open={} more={} buffering={} progress={}",_endp.isOpen(),_parser.isMoreInBuffer(),_endp.isBufferingInput(),progress);
|
||||
LOG.debug("while open={} more={} progress={}",_endp.isOpen(),_parser.isMoreInBuffer(),progress);
|
||||
|
||||
progress=false;
|
||||
HttpExchange exchange=_exchange;
|
||||
|
@ -142,13 +142,14 @@ public class AsyncHttpConnection extends AbstractHttpConnection implements Async
|
|||
progress=true;
|
||||
}
|
||||
}
|
||||
catch (ThreadDeath e)
|
||||
{
|
||||
throw e;
|
||||
}
|
||||
catch (Throwable e)
|
||||
{
|
||||
LOG.debug("Failure on " + _exchange, e);
|
||||
|
||||
if (e instanceof ThreadDeath)
|
||||
throw (ThreadDeath)e;
|
||||
|
||||
failed = true;
|
||||
|
||||
synchronized (this)
|
||||
|
|
|
@ -59,11 +59,10 @@ public class BlockingHttpConnection extends AbstractHttpConnection
|
|||
{
|
||||
boolean failed = false;
|
||||
|
||||
|
||||
// While we are making progress and have not changed connection
|
||||
while (_endp.isOpen() && connection==this)
|
||||
{
|
||||
LOG.debug("open={} more={} buffering={}",_endp.isOpen(),_parser.isMoreInBuffer(),_endp.isBufferingInput());
|
||||
LOG.debug("open={} more={}",_endp.isOpen(),_parser.isMoreInBuffer());
|
||||
|
||||
HttpExchange exchange;
|
||||
synchronized (this)
|
||||
|
@ -141,15 +140,15 @@ public class BlockingHttpConnection extends AbstractHttpConnection
|
|||
{
|
||||
LOG.debug("parsed");
|
||||
}
|
||||
|
||||
}
|
||||
catch (ThreadDeath e)
|
||||
{
|
||||
throw e;
|
||||
}
|
||||
catch (Throwable e)
|
||||
{
|
||||
LOG.debug("Failure on " + _exchange, e);
|
||||
|
||||
if (e instanceof ThreadDeath)
|
||||
throw (ThreadDeath)e;
|
||||
|
||||
failed = true;
|
||||
|
||||
synchronized (this)
|
||||
|
|
|
@ -273,7 +273,7 @@ class SelectConnector extends AbstractLifeCycle implements HttpClient.Connector
|
|||
((SelectChannelEndPoint)_endp).setConnection(sslConnection);
|
||||
|
||||
_endp=sslConnection.getSslEndPoint();
|
||||
sslConnection.setConnection(connection);
|
||||
sslConnection.getSslEndPoint().setConnection(connection);
|
||||
|
||||
LOG.debug("upgrade {} to {} for {}",this,sslConnection,connection);
|
||||
}
|
||||
|
@ -404,11 +404,6 @@ class SelectConnector extends AbstractLifeCycle implements HttpClient.Connector
|
|||
return _endp.isBlocking();
|
||||
}
|
||||
|
||||
public boolean isBufferred()
|
||||
{
|
||||
return _endp.isBufferred();
|
||||
}
|
||||
|
||||
public boolean blockReadable(long millisecs) throws IOException
|
||||
{
|
||||
return _endp.blockReadable(millisecs);
|
||||
|
@ -429,11 +424,6 @@ class SelectConnector extends AbstractLifeCycle implements HttpClient.Connector
|
|||
return _endp.getTransport();
|
||||
}
|
||||
|
||||
public boolean isBufferingInput()
|
||||
{
|
||||
return _endp.isBufferingInput();
|
||||
}
|
||||
|
||||
public boolean isBufferingOutput()
|
||||
{
|
||||
return _endp.isBufferingOutput();
|
||||
|
|
|
@ -299,7 +299,9 @@ public class HttpParser implements Parser
|
|||
ex=e;
|
||||
}
|
||||
|
||||
if (filled < 0 )
|
||||
if (filled>0)
|
||||
progress++;
|
||||
else if (filled < 0 )
|
||||
{
|
||||
_persistent=false;
|
||||
|
||||
|
@ -1193,7 +1195,7 @@ public class HttpParser implements Parser
|
|||
{
|
||||
if (!_endp.isBlocking())
|
||||
{
|
||||
if (_endp.isBufferingInput() && parseNext()>0)
|
||||
if (parseNext()>0)
|
||||
continue;
|
||||
|
||||
if (!_endp.blockReadable(maxIdleTime))
|
||||
|
|
|
@ -15,7 +15,7 @@ package org.eclipse.jetty.io;
|
|||
|
||||
import org.eclipse.jetty.util.thread.Timeout;
|
||||
|
||||
public interface AsyncEndPoint extends EndPoint
|
||||
public interface AsyncEndPoint extends ConnectedEndPoint
|
||||
{
|
||||
/* ------------------------------------------------------------ */
|
||||
/**
|
||||
|
@ -50,14 +50,6 @@ public interface AsyncEndPoint extends EndPoint
|
|||
*/
|
||||
public boolean hasProgressed();
|
||||
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
public Connection getConnection();
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
public void setConnection(Connection connection);
|
||||
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
/**
|
||||
*/
|
||||
|
|
|
@ -363,24 +363,12 @@ public class ByteArrayEndPoint implements ConnectedEndPoint
|
|||
{
|
||||
}
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
public boolean isBufferingInput()
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
public boolean isBufferingOutput()
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
public boolean isBufferred()
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
/**
|
||||
* @return the growOutput
|
||||
|
|
|
@ -122,13 +122,9 @@ public interface EndPoint
|
|||
*/
|
||||
public int getRemotePort();
|
||||
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
public boolean isBlocking();
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
public boolean isBufferred();
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
public boolean blockReadable(long millisecs) throws IOException;
|
||||
|
||||
|
@ -144,12 +140,6 @@ public interface EndPoint
|
|||
*/
|
||||
public Object getTransport();
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
/**
|
||||
* @return True if the endpoint has some buffered input data
|
||||
*/
|
||||
public boolean isBufferingInput();
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
/**
|
||||
* @return True if the endpoint has some buffered output data
|
||||
|
@ -163,7 +153,6 @@ public interface EndPoint
|
|||
*/
|
||||
public void flush() throws IOException;
|
||||
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
/** Get the max idle time in ms.
|
||||
* <p>The max idle time is the time the endpoint can be idle before
|
||||
|
|
|
@ -296,7 +296,6 @@ public class StreamEndPoint implements EndPoint
|
|||
_out=out;
|
||||
}
|
||||
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
public void flush()
|
||||
throws IOException
|
||||
|
@ -305,24 +304,12 @@ public class StreamEndPoint implements EndPoint
|
|||
_out.flush();
|
||||
}
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
public boolean isBufferingInput()
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
public boolean isBufferingOutput()
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
public boolean isBufferred()
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
public int getMaxIdleTime()
|
||||
{
|
||||
|
|
|
@ -510,25 +510,13 @@ public class ChannelEndPoint implements EndPoint
|
|||
throws IOException
|
||||
{
|
||||
}
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
public boolean isBufferingInput()
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
public boolean isBufferingOutput()
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
public boolean isBufferred()
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
public int getMaxIdleTime()
|
||||
{
|
||||
|
|
|
@ -41,9 +41,9 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
|
|||
private final SelectorManager _manager;
|
||||
private SelectionKey _key;
|
||||
private final Runnable _handler = new Runnable()
|
||||
{
|
||||
public void run() { handle(); }
|
||||
};
|
||||
{
|
||||
public void run() { handle(); }
|
||||
};
|
||||
|
||||
/** The desired value for {@link SelectionKey#interestOps()} */
|
||||
private int _interestOps;
|
||||
|
@ -65,7 +65,6 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
|
|||
/** true if the last write operation succeed and wrote all offered bytes */
|
||||
private volatile boolean _writable = true;
|
||||
|
||||
|
||||
/** True if a thread has is blocked in {@link #blockReadable(long)} */
|
||||
private boolean _readBlocked;
|
||||
|
||||
|
@ -630,8 +629,6 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
|
|||
}
|
||||
finally
|
||||
{
|
||||
dispatched=!undispatch();
|
||||
|
||||
if (!_ishut && isInputShutdown() && isOpen())
|
||||
{
|
||||
_ishut=true;
|
||||
|
@ -639,10 +636,12 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
|
|||
{
|
||||
_connection.onInputShutdown();
|
||||
}
|
||||
catch(ThreadDeath e)
|
||||
{
|
||||
throw e;
|
||||
}
|
||||
catch(Throwable x)
|
||||
{
|
||||
if (x instanceof ThreadDeath)
|
||||
throw (ThreadDeath)x;
|
||||
LOG.warn("onInputShutdown failed", x);
|
||||
try{close();}
|
||||
catch(IOException e2){LOG.ignore(e2);}
|
||||
|
@ -652,6 +651,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
|
|||
updateKey();
|
||||
}
|
||||
}
|
||||
dispatched=!undispatch();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -507,11 +507,12 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
|
|||
{
|
||||
LOG.ignore(e);
|
||||
}
|
||||
catch(ThreadDeath e)
|
||||
{
|
||||
throw e;
|
||||
}
|
||||
catch (Throwable e)
|
||||
{
|
||||
if (e instanceof ThreadDeath)
|
||||
throw (ThreadDeath)e;
|
||||
|
||||
if (isRunning())
|
||||
LOG.warn(e);
|
||||
else
|
||||
|
|
|
@ -64,6 +64,7 @@ public class SslConnection extends AbstractConnection implements AsyncConnection
|
|||
private AsyncEndPoint _aEndp;
|
||||
private boolean _allowRenegotiate=true;
|
||||
private boolean _handshook;
|
||||
private boolean _ishut;
|
||||
private boolean _oshut;
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
|
@ -98,12 +99,6 @@ public class SslConnection extends AbstractConnection implements AsyncConnection
|
|||
_aEndp=(AsyncEndPoint)endp;
|
||||
}
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
public synchronized void setConnection(AsyncConnection connection)
|
||||
{
|
||||
_connection=connection;
|
||||
}
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
public synchronized AsyncConnection getConnection()
|
||||
{
|
||||
|
@ -198,7 +193,7 @@ public class SslConnection extends AbstractConnection implements AsyncConnection
|
|||
{
|
||||
// handle the delegate connection
|
||||
AsyncConnection next = (AsyncConnection)_connection.handle();
|
||||
if (next!=_connection && next==null)
|
||||
if (next!=_connection && next!=null)
|
||||
{
|
||||
_connection=next;
|
||||
progress=true;
|
||||
|
@ -211,6 +206,25 @@ public class SslConnection extends AbstractConnection implements AsyncConnection
|
|||
finally
|
||||
{
|
||||
releaseBuffers();
|
||||
|
||||
if (!_ishut && _sslEndPoint.isInputShutdown() && _sslEndPoint.isOpen())
|
||||
{
|
||||
_ishut=true;
|
||||
try
|
||||
{
|
||||
_connection.onInputShutdown();
|
||||
}
|
||||
catch(ThreadDeath e)
|
||||
{
|
||||
throw e;
|
||||
}
|
||||
catch(Throwable x)
|
||||
{
|
||||
LOG.warn("onInputShutdown failed", x);
|
||||
try{_endp.close();}
|
||||
catch(IOException e2){LOG.ignore(e2);}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return this;
|
||||
|
@ -681,18 +695,6 @@ public class SslConnection extends AbstractConnection implements AsyncConnection
|
|||
return _endp;
|
||||
}
|
||||
|
||||
public boolean isBufferingInput()
|
||||
{
|
||||
synchronized (this)
|
||||
{
|
||||
if (_unwrapBuf!=null && _unwrapBuf.hasContent())
|
||||
return true;
|
||||
if (_inbound!=null && _inbound.hasContent())
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
public boolean isBufferingOutput()
|
||||
{
|
||||
synchronized (this)
|
||||
|
@ -781,11 +783,6 @@ public class SslConnection extends AbstractConnection implements AsyncConnection
|
|||
return false;
|
||||
}
|
||||
|
||||
public boolean isBufferred()
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
public int getMaxIdleTime()
|
||||
{
|
||||
return _aEndp.getMaxIdleTime();
|
||||
|
|
|
@ -49,7 +49,7 @@ public class SelectChannelEndPointSslTest extends SelectChannelEndPointTest
|
|||
SslConnection connection = new SslConnection(engine,endpoint);
|
||||
|
||||
AsyncConnection delegate = super.newConnection(channel,connection.getSslEndPoint());
|
||||
connection.setConnection(delegate);
|
||||
connection.getSslEndPoint().setConnection(delegate);
|
||||
return connection;
|
||||
}
|
||||
|
||||
|
|
|
@ -472,11 +472,12 @@ public abstract class AbstractHttpConnection extends AbstractConnection
|
|||
_request.setHandled(true);
|
||||
_response.sendError(e.getStatus(), e.getReason());
|
||||
}
|
||||
catch(ThreadDeath e)
|
||||
{
|
||||
throw e;
|
||||
}
|
||||
catch (Throwable e)
|
||||
{
|
||||
if (e instanceof ThreadDeath)
|
||||
throw (ThreadDeath)e;
|
||||
|
||||
LOG.warn(String.valueOf(_uri),e);
|
||||
error=true;
|
||||
_request.setHandled(true);
|
||||
|
|
|
@ -557,7 +557,7 @@ public class AsyncContinuation implements AsyncContext, Continuation
|
|||
protected void scheduleDispatch()
|
||||
{
|
||||
EndPoint endp=_connection.getEndPoint();
|
||||
if (!endp.isBlocking())
|
||||
if (endp instanceof AsyncEndPoint)
|
||||
{
|
||||
((AsyncEndPoint)endp).asyncDispatch();
|
||||
}
|
||||
|
|
|
@ -309,12 +309,13 @@ public class HashSessionManager extends AbstractSessionManager
|
|||
}
|
||||
}
|
||||
}
|
||||
catch(ThreadDeath e)
|
||||
{
|
||||
throw e;
|
||||
}
|
||||
catch (Throwable t)
|
||||
{
|
||||
if (t instanceof ThreadDeath)
|
||||
throw ((ThreadDeath)t);
|
||||
else
|
||||
__log.warn("Problem scavenging sessions", t);
|
||||
__log.warn("Problem scavenging sessions", t);
|
||||
}
|
||||
finally
|
||||
{
|
||||
|
|
|
@ -299,21 +299,21 @@ public class JDBCSessionManager extends AbstractSessionManager
|
|||
}
|
||||
|
||||
@Override
|
||||
public void setAttribute (String name, Object value)
|
||||
public void setAttribute (String name, Object value)
|
||||
{
|
||||
super.setAttribute(name, value);
|
||||
_dirty=true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removeAttribute (String name)
|
||||
public void removeAttribute (String name)
|
||||
{
|
||||
super.removeAttribute(name);
|
||||
_dirty=true;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void cookieSet()
|
||||
protected void cookieSet()
|
||||
{
|
||||
_data.setCookieSet(_data.getAccessed());
|
||||
}
|
||||
|
@ -805,12 +805,13 @@ public class JDBCSessionManager extends AbstractSessionManager
|
|||
}
|
||||
}
|
||||
}
|
||||
catch(ThreadDeath e)
|
||||
{
|
||||
throw e;
|
||||
}
|
||||
catch (Throwable t)
|
||||
{
|
||||
if (t instanceof ThreadDeath)
|
||||
throw ((ThreadDeath)t);
|
||||
else
|
||||
LOG.warn("Problem expiring sessions", t);
|
||||
LOG.warn("Problem expiring sessions", t);
|
||||
}
|
||||
finally
|
||||
{
|
||||
|
|
|
@ -559,7 +559,7 @@ public class SslSelectChannelConnector extends SelectChannelConnector implements
|
|||
SslConnection connection = new SslConnection(engine,endpoint);
|
||||
|
||||
AsyncConnection delegate = super.newConnection(channel,connection.getSslEndPoint());
|
||||
connection.setConnection(delegate);
|
||||
connection.getSslEndPoint().setConnection(delegate);
|
||||
connection.setAllowRenegotiate(_sslContextFactory.isAllowRenegotiate());
|
||||
return connection;
|
||||
}
|
||||
|
|
|
@ -113,7 +113,7 @@ public class SslContextFactoryTest
|
|||
assertTrue(cf.getSslContext()!=null);
|
||||
}
|
||||
|
||||
@Test
|
||||
@Test(expected=java.security.UnrecoverableKeyException.class)
|
||||
public void testResourceTsResourceKsWrongPW() throws Exception
|
||||
{
|
||||
Resource keystoreResource = Resource.newSystemResource("keystore");
|
||||
|
@ -126,18 +126,12 @@ public class SslContextFactoryTest
|
|||
cf.setKeyManagerPassword("wrong_keypwd");
|
||||
cf.setTrustStorePassword("storepwd");
|
||||
|
||||
try
|
||||
{
|
||||
((StdErrLog)Log.getLogger(AbstractLifeCycle.class)).setHideStacks(true);
|
||||
cf.start();
|
||||
Assert.fail();
|
||||
}
|
||||
catch(java.security.UnrecoverableKeyException e)
|
||||
{
|
||||
}
|
||||
((StdErrLog)Log.getLogger(AbstractLifeCycle.class)).setHideStacks(true);
|
||||
cf.start();
|
||||
Assert.fail();
|
||||
}
|
||||
|
||||
@Test
|
||||
@Test(expected=java.io.IOException.class)
|
||||
public void testResourceTsWrongPWResourceKs() throws Exception
|
||||
{
|
||||
Resource keystoreResource = Resource.newSystemResource("keystore");
|
||||
|
@ -150,14 +144,8 @@ public class SslContextFactoryTest
|
|||
cf.setKeyManagerPassword("keypwd");
|
||||
cf.setTrustStorePassword("wrong_storepwd");
|
||||
|
||||
try
|
||||
{
|
||||
((StdErrLog)Log.getLogger(AbstractLifeCycle.class)).setHideStacks(true);
|
||||
cf.start();
|
||||
Assert.fail();
|
||||
}
|
||||
catch(IOException e)
|
||||
{
|
||||
}
|
||||
((StdErrLog)Log.getLogger(AbstractLifeCycle.class)).setHideStacks(true);
|
||||
cf.start();
|
||||
Assert.fail();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -131,7 +131,7 @@ public class WebSocketParserD13 implements WebSocketParser
|
|||
int filled=-1;
|
||||
|
||||
// Loop until a datagram call back or can't fill anymore
|
||||
while(!progress && (!_endp.isInputShutdown()||_endp.isBufferingInput()||_buffer.length()>0))
|
||||
while(!progress && (!_endp.isInputShutdown()||_buffer.length()>0))
|
||||
{
|
||||
int available=_buffer.length();
|
||||
|
||||
|
|
|
@ -1120,7 +1120,6 @@ public class XmlConfiguration
|
|||
@SuppressWarnings("unchecked")
|
||||
public static void main(final String[] args) throws Exception
|
||||
{
|
||||
|
||||
final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
|
||||
|
||||
AccessController.doPrivileged(new PrivilegedAction()
|
||||
|
|
Loading…
Reference in New Issue