478923 - threads stuck at SharedBlockingCallback$Blocker.block

Fixed the failure path so that it also checks for non blocking callbacks
and directly calls them
This commit is contained in:
Greg Wilkins 2015-10-07 15:10:51 +11:00
parent 80675ce1d2
commit b2f8192c7b
6 changed files with 479 additions and 235 deletions

View File

@ -84,29 +84,42 @@ public abstract class AbstractConnection implements Connection
protected void failedCallback(final Callback callback, final Throwable x)
{
// TODO always dispatch failure ?
try
if (callback.isNonBlocking())
{
getExecutor().execute(new Runnable()
try
{
@Override
public void run()
{
try
{
callback.failed(x);
}
catch (Exception e)
{
LOG.warn(e);
}
}
});
callback.failed(x);
}
catch (Exception e)
{
LOG.warn(e);
}
}
catch(RejectedExecutionException e)
else
{
LOG.debug(e);
callback.failed(x);
try
{
getExecutor().execute(new Runnable()
{
@Override
public void run()
{
try
{
callback.failed(x);
}
catch (Exception e)
{
LOG.warn(e);
}
}
});
}
catch(RejectedExecutionException e)
{
LOG.debug(e);
callback.failed(x);
}
}
}

View File

@ -216,8 +216,8 @@ public abstract class AbstractEndPoint extends IdleTimeout implements EndPoint
isOpen()?"Open":"CLOSED",
isInputShutdown()?"ISHUT":"in",
isOutputShutdown()?"OSHUT":"out",
_fillInterest.isInterested()?"R":"-",
_writeFlusher.isInProgress()?"W":"-",
_fillInterest.toStateString(),
_writeFlusher.toStateString(),
getIdleFor(),
getIdleTimeout(),
getConnection()==null?null:getConnection().getClass().getSimpleName());

View File

@ -138,7 +138,13 @@ public abstract class FillInterest
@Override
public String toString()
{
return String.format("FillInterest@%x{%b,%s}", hashCode(), _interested.get(), _interested.get());
return String.format("FillInterest@%x{%b,%s}", hashCode(), _interested.get()!=null, _interested.get());
}
public String toStateString()
{
return _interested.get()==null?"-":"FI";
}
/**

View File

@ -522,4 +522,23 @@ abstract public class WriteFlusher
{
return String.format("WriteFlusher@%x{%s}", hashCode(), _state.get());
}
public String toStateString()
{
switch(_state.get().getType())
{
case WRITING:
return "W";
case PENDING:
return "P";
case COMPLETING:
return "C";
case IDLE:
return "-";
case FAILED:
return "F";
default:
return "?";
}
}
}

View File

@ -1,213 +0,0 @@
//
// ========================================================================
// Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.servlets;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.io.ManagedSelector;
import org.eclipse.jetty.io.SelectChannelEndPoint;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.servlet.DefaultServlet;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.toolchain.test.MavenTestingUtils;
import org.eclipse.jetty.toolchain.test.TestTracker;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.junit.After;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
public class DefaultServletStarvationTest
{
@Rule
public TestTracker tracker = new TestTracker();
private Server _server;
@After
public void dispose() throws Exception
{
if (_server != null)
_server.stop();
}
@Test
public void testDefaultServletStarvation() throws Exception
{
int maxThreads = 2;
QueuedThreadPool threadPool = new QueuedThreadPool(maxThreads, maxThreads);
threadPool.setDetailedDump(true);
_server = new Server(threadPool);
// Prepare a big file to download.
File directory = MavenTestingUtils.getTargetTestingDir();
Files.createDirectories(directory.toPath());
String resourceName = "resource.bin";
Path resourcePath = Paths.get(directory.getPath(), resourceName);
try (OutputStream output = Files.newOutputStream(resourcePath, StandardOpenOption.CREATE, StandardOpenOption.WRITE))
{
byte[] chunk = new byte[1024];
Arrays.fill(chunk,(byte)'X');
chunk[chunk.length-2]='\r';
chunk[chunk.length-1]='\n';
for (int i = 0; i < 256 * 1024; ++i)
output.write(chunk);
}
final CountDownLatch writePending = new CountDownLatch(1);
ServerConnector connector = new ServerConnector(_server, 0, 1)
{
@Override
protected SelectChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey key) throws IOException
{
return new SelectChannelEndPoint(channel, selectSet, key, getScheduler(), getIdleTimeout())
{
@Override
protected void onIncompleteFlush()
{
super.onIncompleteFlush();
writePending.countDown();
}
};
}
};
_server.addConnector(connector);
ServletContextHandler context = new ServletContextHandler(_server, "/");
context.setResourceBase(directory.toURI().toString());
context.addServlet(DefaultServlet.class, "/*").setAsyncSupported(false);
_server.setHandler(context);
_server.start();
List<Socket> sockets = new ArrayList<>();
for (int i = 0; i < maxThreads; ++i)
{
Socket socket = new Socket("localhost", connector.getLocalPort());
sockets.add(socket);
OutputStream output = socket.getOutputStream();
String request = "" +
"GET /" + resourceName + " HTTP/1.1\r\n" +
"Host: localhost\r\n" +
// "Connection: close\r\n" +
"\r\n";
output.write(request.getBytes(StandardCharsets.UTF_8));
output.flush();
Thread.sleep(100);
}
// Wait for a the servlet to block.
Assert.assertTrue(writePending.await(5, TimeUnit.SECONDS));
Thread.sleep(1000);
_server.dumpStdErr();
Thread.sleep(1000);
ScheduledFuture<?> dumper = Executors.newSingleThreadScheduledExecutor().schedule(new Runnable()
{
@Override
public void run()
{
_server.dumpStdErr();
}
}, 10, TimeUnit.SECONDS);
long expected = Files.size(resourcePath);
byte[] buffer = new byte[48 * 1024];
for (Socket socket : sockets)
{
String socketString = socket.toString();
long total = 0;
InputStream input = socket.getInputStream();
// look for CRLFCRLF
StringBuilder header = new StringBuilder();
int state=0;
while (state<4 && header.length()<2048)
{
int ch=input.read();
if (ch<0)
break;
header.append((char)ch);
switch(state)
{
case 0:
if (ch=='\r')
state=1;
break;
case 1:
if (ch=='\n')
state=2;
else
state=0;
break;
case 2:
if (ch=='\r')
state=3;
else
state=0;
break;
case 3:
if (ch=='\n')
state=4;
else
state=0;
break;
}
}
while (total<expected)
{
int read=input.read(buffer);
if (read<0)
break;
total+=read;
}
Assert.assertEquals(expected,total);
}
dumper.cancel(false);
// We could read everything, good.
for (Socket socket : sockets)
socket.close();
}
}

View File

@ -0,0 +1,419 @@
//
// ========================================================================
// Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.servlets;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Exchanger;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.io.ManagedSelector;
import org.eclipse.jetty.io.SelectChannelEndPoint;
import org.eclipse.jetty.server.HttpChannel;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.servlet.DefaultServlet;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.toolchain.test.MavenTestingUtils;
import org.eclipse.jetty.toolchain.test.TestTracker;
import org.eclipse.jetty.toolchain.test.annotation.Slow;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.StdErrLog;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.junit.After;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
public class ThreadStarvationTest
{
@Rule
public TestTracker tracker = new TestTracker();
private Server _server;
@After
public void dispose() throws Exception
{
if (_server != null)
_server.stop();
}
@Test
@Slow
public void testDefaultServletSuccess() throws Exception
{
int maxThreads = 10;
QueuedThreadPool threadPool = new QueuedThreadPool(maxThreads, maxThreads);
threadPool.setDetailedDump(true);
_server = new Server(threadPool);
// Prepare a big file to download.
File directory = MavenTestingUtils.getTargetTestingDir();
Files.createDirectories(directory.toPath());
String resourceName = "resource.bin";
Path resourcePath = Paths.get(directory.getPath(), resourceName);
try (OutputStream output = Files.newOutputStream(resourcePath, StandardOpenOption.CREATE, StandardOpenOption.WRITE))
{
byte[] chunk = new byte[1024];
Arrays.fill(chunk,(byte)'X');
chunk[chunk.length-2]='\r';
chunk[chunk.length-1]='\n';
for (int i = 0; i < 256 * 1024; ++i)
output.write(chunk);
}
final CountDownLatch writePending = new CountDownLatch(1);
ServerConnector connector = new ServerConnector(_server, 0, 1)
{
@Override
protected SelectChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey key) throws IOException
{
return new SelectChannelEndPoint(channel, selectSet, key, getScheduler(), getIdleTimeout())
{
@Override
protected void onIncompleteFlush()
{
super.onIncompleteFlush();
writePending.countDown();
}
};
}
};
connector.setIdleTimeout(Long.MAX_VALUE);
_server.addConnector(connector);
ServletContextHandler context = new ServletContextHandler(_server, "/");
context.setResourceBase(directory.toURI().toString());
context.addServlet(DefaultServlet.class, "/*").setAsyncSupported(false);
_server.setHandler(context);
_server.start();
List<Socket> sockets = new ArrayList<>();
for (int i = 0; i < maxThreads*2; ++i)
{
Socket socket = new Socket("localhost", connector.getLocalPort());
sockets.add(socket);
OutputStream output = socket.getOutputStream();
String request = "" +
"GET /" + resourceName + " HTTP/1.1\r\n" +
"Host: localhost\r\n" +
"\r\n";
output.write(request.getBytes(StandardCharsets.UTF_8));
output.flush();
Thread.sleep(100);
}
// Wait for a the servlet to block.
Assert.assertTrue(writePending.await(5, TimeUnit.SECONDS));
long expected = Files.size(resourcePath);
byte[] buffer = new byte[48 * 1024];
List<Exchanger<Long>> totals = new ArrayList<>();
for (Socket socket : sockets)
{
final Exchanger<Long> x = new Exchanger<>();
totals.add(x);
final InputStream input = socket.getInputStream();
new Thread()
{
@Override
public void run()
{
long total=0;
try
{
// look for CRLFCRLF
StringBuilder header = new StringBuilder();
int state=0;
while (state<4 && header.length()<2048)
{
int ch=input.read();
if (ch<0)
break;
header.append((char)ch);
switch(state)
{
case 0:
if (ch=='\r')
state=1;
break;
case 1:
if (ch=='\n')
state=2;
else
state=0;
break;
case 2:
if (ch=='\r')
state=3;
else
state=0;
break;
case 3:
if (ch=='\n')
state=4;
else
state=0;
break;
}
}
while (total<expected)
{
int read=input.read(buffer);
if (read<0)
break;
total+=read;
}
}
catch (IOException e)
{
e.printStackTrace();
}
finally
{
try
{
x.exchange(total);
}
catch (InterruptedException e)
{
e.printStackTrace();
}
}
}
}.start();
}
for (Exchanger<Long> x : totals)
{
Long total = x.exchange(-1L,10000,TimeUnit.SECONDS);
Assert.assertEquals(expected,total.longValue());
}
// We could read everything, good.
for (Socket socket : sockets)
socket.close();
}
@Test
public void testFailureStarvation() throws Exception
{
try
{
((StdErrLog)Log.getLogger(HttpChannel.class)).setHideStacks(true);
int acceptors = 0;
int selectors = 1;
int maxThreads = 10;
final int barried=maxThreads-acceptors-selectors;
final CyclicBarrier barrier = new CyclicBarrier(barried);
QueuedThreadPool threadPool = new QueuedThreadPool(maxThreads, maxThreads);
threadPool.setDetailedDump(true);
_server = new Server(threadPool);
ServerConnector connector = new ServerConnector(_server, acceptors, selectors)
{
@Override
protected SelectChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey key) throws IOException
{
return new SelectChannelEndPoint(channel, selectSet, key, getScheduler(), getIdleTimeout())
{
@Override
public boolean flush(ByteBuffer... buffers) throws IOException
{
super.flush(buffers[0]);
throw new IOException("TEST FAILURE");
}
};
}
};
connector.setIdleTimeout(Long.MAX_VALUE);
_server.addConnector(connector);
final AtomicInteger count = new AtomicInteger(0);
_server.setHandler(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
int c=count.getAndIncrement();
try
{
if (c<barried)
{
barrier.await(10,TimeUnit.SECONDS);
}
}
catch (InterruptedException | BrokenBarrierException | TimeoutException e)
{
throw new ServletException(e);
}
baseRequest.setHandled(true);
response.setStatus(200);
response.setContentLength(13);
response.getWriter().print("Hello World!\n");
response.getWriter().flush();
}
});
_server.start();
List<Socket> sockets = new ArrayList<>();
for (int i = 0; i < maxThreads*2; ++i)
{
Socket socket = new Socket("localhost", connector.getLocalPort());
sockets.add(socket);
OutputStream output = socket.getOutputStream();
String request = "" +
"GET / HTTP/1.1\r\n" +
"Host: localhost\r\n" +
// "Connection: close\r\n" +
"\r\n";
output.write(request.getBytes(StandardCharsets.UTF_8));
output.flush();
}
byte[] buffer = new byte[48 * 1024];
List<Exchanger<Integer>> totals = new ArrayList<>();
for (Socket socket : sockets)
{
final Exchanger<Integer> x = new Exchanger<>();
totals.add(x);
final InputStream input = socket.getInputStream();
new Thread()
{
@Override
public void run()
{
int read=0;
try
{
// look for CRLFCRLF
StringBuilder header = new StringBuilder();
int state=0;
while (state<4 && header.length()<2048)
{
int ch=input.read();
if (ch<0)
break;
header.append((char)ch);
switch(state)
{
case 0:
if (ch=='\r')
state=1;
break;
case 1:
if (ch=='\n')
state=2;
else
state=0;
break;
case 2:
if (ch=='\r')
state=3;
else
state=0;
break;
case 3:
if (ch=='\n')
state=4;
else
state=0;
break;
}
}
read=input.read(buffer);
}
catch (IOException e)
{
// e.printStackTrace();
}
finally
{
try
{
x.exchange(read);
}
catch (InterruptedException e)
{
e.printStackTrace();
}
}
}
}.start();
}
for (Exchanger<Integer> x : totals)
{
Integer read = x.exchange(-1,10,TimeUnit.SECONDS);
Assert.assertEquals(-1,read.intValue());
}
// We could read everything, good.
for (Socket socket : sockets)
socket.close();
_server.stop();
}
finally
{
((StdErrLog)Log.getLogger(HttpChannel.class)).setHideStacks(false);
}
}
}