jetty-9 improved SSL shutdown input

This commit is contained in:
Greg Wilkins 2012-08-24 17:56:22 +10:00
parent fac03a306b
commit c707ba11d1
6 changed files with 38 additions and 9 deletions

View File

@ -218,7 +218,6 @@ public class SslConnection extends AbstractConnection
private boolean _flushRequiresFillToProgress;
private boolean _cannotAcceptMoreAppDataToFlush;
private boolean _underFlown;
private boolean _ishut = false;
// TODO: use ExecutorCallback ?
// private final Callback<Void> _writeCallback = new ExecutorCallback<Void>(getExecutor())
@ -777,7 +776,7 @@ public class SslConnection extends AbstractConnection
@Override
public boolean isInputShutdown()
{
return _ishut;
return _sslEngine.isInboundDone();
}
@Override

View File

@ -503,6 +503,10 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
// Do we have content ready to parse?
if (BufferUtil.isEmpty(_requestBuffer))
{
// If no more input
if (getEndPoint().isInputShutdown())
return;
// Wait until we can read
FutureCallback<Void> block=new FutureCallback<>();
getEndPoint().fillInterested(null,block);
@ -524,9 +528,9 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
}
}
}
catch (InterruptedException x)
catch (final InterruptedException x)
{
throw new InterruptedIOException();
throw new InterruptedIOException(getEndPoint().toString()){{initCause(x);}};
}
catch (ExecutionException e)
{

View File

@ -226,7 +226,7 @@ public class HttpInput extends ServletInputStream
{
synchronized (lock())
{
while(!_inputEOF)
while(!_inputEOF&&!_earlyEOF)
{
ByteBuffer content=_inputQ.peekUnsafe();
while(content!=null)
@ -242,10 +242,12 @@ public class HttpInput extends ServletInputStream
try
{
System.err.println("consume block");
blockForContent();
}
catch(IOException e)
{
e.printStackTrace();
throw new RuntimeIOException(e);
}
}

View File

@ -108,6 +108,7 @@ public class SslBytesServerTest extends SslBytesTest
server.addConnector(connector);
server.setHandler(new AbstractHandler()
{
@Override
public void handle(String target, Request request, HttpServletRequest httpRequest, HttpServletResponse httpResponse) throws IOException, ServletException
{
try
@ -166,6 +167,7 @@ public class SslBytesServerTest extends SslBytesTest
Future<Object> handshake = threadPool.submit(new Callable<Object>()
{
@Override
public Object call() throws Exception
{
client.startHandshake();
@ -226,6 +228,7 @@ public class SslBytesServerTest extends SslBytesTest
Future<Object> handshake = threadPool.submit(new Callable<Object>()
{
@Override
public Object call() throws Exception
{
client.startHandshake();
@ -327,6 +330,7 @@ public class SslBytesServerTest extends SslBytesTest
threadPool.submit(new Callable<Object>()
{
@Override
public Object call() throws Exception
{
client.startHandshake();
@ -359,6 +363,7 @@ public class SslBytesServerTest extends SslBytesTest
threadPool.submit(new Callable<Object>()
{
@Override
public Object call() throws Exception
{
client.startHandshake();
@ -413,6 +418,7 @@ public class SslBytesServerTest extends SslBytesTest
threadPool.submit(new Callable<Object>()
{
@Override
public Object call() throws Exception
{
OutputStream clientOutput = client.getOutputStream();
@ -454,6 +460,7 @@ public class SslBytesServerTest extends SslBytesTest
Future<Object> request = threadPool.submit(new Callable<Object>()
{
@Override
public Object call() throws Exception
{
OutputStream clientOutput = client.getOutputStream();
@ -502,6 +509,7 @@ public class SslBytesServerTest extends SslBytesTest
Future<Object> handshake = threadPool.submit(new Callable<Object>()
{
@Override
public Object call() throws Exception
{
client.startHandshake();
@ -545,6 +553,7 @@ public class SslBytesServerTest extends SslBytesTest
Future<Object> request = threadPool.submit(new Callable<Object>()
{
@Override
public Object call() throws Exception
{
OutputStream clientOutput = client.getOutputStream();
@ -620,6 +629,7 @@ public class SslBytesServerTest extends SslBytesTest
Future<Object> request = threadPool.submit(new Callable<Object>()
{
@Override
public Object call() throws Exception
{
OutputStream clientOutput = client.getOutputStream();
@ -694,6 +704,7 @@ public class SslBytesServerTest extends SslBytesTest
Future<Object> request = threadPool.submit(new Callable<Object>()
{
@Override
public Object call() throws Exception
{
OutputStream clientOutput = client.getOutputStream();
@ -763,6 +774,7 @@ public class SslBytesServerTest extends SslBytesTest
Future<Object> request = threadPool.submit(new Callable<Object>()
{
@Override
public Object call() throws Exception
{
OutputStream clientOutput = client.getOutputStream();
@ -817,6 +829,7 @@ public class SslBytesServerTest extends SslBytesTest
final String content = new String(data, "UTF-8");
Future<Object> request = threadPool.submit(new Callable<Object>()
{
@Override
public Object call() throws Exception
{
OutputStream clientOutput = client.getOutputStream();
@ -874,6 +887,7 @@ public class SslBytesServerTest extends SslBytesTest
final String content = new String(data, "UTF-8");
Future<Object> request = threadPool.submit(new Callable<Object>()
{
@Override
public Object call() throws Exception
{
OutputStream clientOutput = client.getOutputStream();
@ -909,9 +923,9 @@ public class SslBytesServerTest extends SslBytesTest
Assert.assertThat(sslFlushes.get(), Matchers.lessThan(20));
Assert.assertThat(httpParses.get(), Matchers.lessThan(50));
// Thread.sleep(100000);
System.err.println("--");
client.close();
System.err.println("==");
}
@Test
@ -939,6 +953,7 @@ public class SslBytesServerTest extends SslBytesTest
Future<Object> request = threadPool.submit(new Callable<Object>()
{
@Override
public Object call() throws Exception
{
OutputStream clientOutput = client.getOutputStream();
@ -1018,6 +1033,7 @@ public class SslBytesServerTest extends SslBytesTest
Future<Object> request = threadPool.submit(new Callable<Object>()
{
@Override
public Object call() throws Exception
{
OutputStream clientOutput = client.getOutputStream();
@ -1083,6 +1099,7 @@ public class SslBytesServerTest extends SslBytesTest
Future<Object> request = threadPool.submit(new Callable<Object>()
{
@Override
public Object call() throws Exception
{
OutputStream clientOutput = client.getOutputStream();
@ -1178,6 +1195,7 @@ public class SslBytesServerTest extends SslBytesTest
// Renegotiate
Future<Object> renegotiation = threadPool.submit(new Callable<Object>()
{
@Override
public Object call() throws Exception
{
client.startHandshake();
@ -1232,6 +1250,7 @@ public class SslBytesServerTest extends SslBytesTest
// Write the rest of the request
Future<Object> request = threadPool.submit(new Callable<Object>()
{
@Override
public Object call() throws Exception
{
clientOutput.write(content2.getBytes("UTF-8"));
@ -1310,6 +1329,7 @@ public class SslBytesServerTest extends SslBytesTest
// Renegotiate
Future<Object> renegotiation = threadPool.submit(new Callable<Object>()
{
@Override
public Object call() throws Exception
{
client.startHandshake();
@ -1382,6 +1402,7 @@ public class SslBytesServerTest extends SslBytesTest
// Write the rest of the request
Future<Object> request = threadPool.submit(new Callable<Object>()
{
@Override
public Object call() throws Exception
{
clientOutput.write(content2.getBytes("UTF-8"));
@ -1496,6 +1517,7 @@ public class SslBytesServerTest extends SslBytesTest
threadPool.submit(new Callable<Object>()
{
@Override
public Object call() throws Exception
{
client.startHandshake();

View File

@ -124,6 +124,8 @@ public abstract class SslBytesTest
public void stop() throws Exception
{
server.close();
client.close();
serverSocket.close();
}

View File

@ -159,7 +159,7 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
{
LOG.warn("{} threads could not be stopped", size);
if (size<=Runtime.getRuntime().availableProcessors() || LOG.isDebugEnabled())
if ((size<=Runtime.getRuntime().availableProcessors()) || LOG.isDebugEnabled())
{
for (Thread unstopped : _threads)
{
@ -168,7 +168,7 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
{
dmp.append(StringUtil.__LINE_SEPARATOR).append("\tat ").append(element);
}
LOG.debug("Couldn't stop {}{}", unstopped, dmp.toString());
LOG.warn("Couldn't stop {}{}", unstopped, dmp.toString());
}
}
}