Merge pull request #538 from metamx/fix-jetty-exception-handling

fix jetty threads getting stuck
This commit is contained in:
fjy 2014-05-15 10:55:46 -06:00
commit 793bc3539b
3 changed files with 121 additions and 45 deletions

View File

@ -79,7 +79,7 @@
<dependency> <dependency>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>http-client</artifactId> <artifactId>http-client</artifactId>
<version>0.9.4</version> <version>0.9.5</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>

View File

@ -42,6 +42,7 @@ import org.joda.time.DateTime;
import javax.servlet.AsyncContext; import javax.servlet.AsyncContext;
import javax.servlet.ServletException; import javax.servlet.ServletException;
import javax.servlet.ServletOutputStream;
import javax.servlet.annotation.WebServlet; import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletRequest;
@ -159,6 +160,15 @@ public class AsyncQueryForwardingServlet extends HttpServlet
return ClientResponse.finished(obj); return ClientResponse.finished(obj);
} }
@Override
public void exceptionCaught(
ClientResponse<OutputStream> clientResponse,
Throwable e
)
{
handleException(resp, asyncContext, e);
}
}; };
asyncContext.start( asyncContext.start(
@ -176,23 +186,7 @@ public class AsyncQueryForwardingServlet extends HttpServlet
req.setAttribute(DISPATCHED, true); req.setAttribute(DISPATCHED, true);
} }
catch (Exception e) { catch (Exception e) {
if (!resp.isCommitted()) { handleException(resp, ctx, e);
resp.setStatus(500);
resp.resetBuffer();
if (out == null) {
out = resp.getOutputStream();
}
if (ctx != null) {
ctx.complete();
}
out.write((e.getMessage() == null) ? "Exception null".getBytes(UTF8) : e.getMessage().getBytes(UTF8));
out.write("\n".getBytes(UTF8));
}
resp.flushBuffer();
} }
} }
@ -316,6 +310,15 @@ public class AsyncQueryForwardingServlet extends HttpServlet
return ClientResponse.finished(obj); return ClientResponse.finished(obj);
} }
@Override
public void exceptionCaught(
ClientResponse<OutputStream> clientResponse,
Throwable e
)
{
handleException(resp, asyncContext, e);
}
}; };
asyncContext.start( asyncContext.start(
@ -333,23 +336,7 @@ public class AsyncQueryForwardingServlet extends HttpServlet
req.setAttribute(DISPATCHED, true); req.setAttribute(DISPATCHED, true);
} }
catch (Exception e) { catch (Exception e) {
if (!resp.isCommitted()) { handleException(resp, ctx, e);
resp.setStatus(500);
resp.resetBuffer();
if (out == null) {
out = resp.getOutputStream();
}
out.write((e.getMessage() == null) ? "Exception null".getBytes(UTF8) : e.getMessage().getBytes(UTF8));
out.write("\n".getBytes(UTF8));
}
resp.flushBuffer();
if (ctx != null) {
ctx.complete();
}
try { try {
requestLogger.log( requestLogger.log(
@ -382,4 +369,25 @@ public class AsyncQueryForwardingServlet extends HttpServlet
} }
return String.format("http://%s%s?%s", host, requestURI, queryString); return String.format("http://%s%s?%s", host, requestURI, queryString);
} }
private static void handleException(HttpServletResponse resp, AsyncContext ctx, Throwable e)
{
try {
final ServletOutputStream out = resp.getOutputStream();
if (!resp.isCommitted()) {
resp.setStatus(500);
resp.resetBuffer();
out.write((e.getMessage() == null) ? "Exception null".getBytes(UTF8) : e.getMessage().getBytes(UTF8));
out.write("\n".getBytes(UTF8));
}
if (ctx != null) {
ctx.complete();
}
resp.flushBuffer();
}
catch (IOException e1) {
Throwables.propagate(e1);
}
}
} }

View File

@ -17,7 +17,7 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/ */
package io.druid; package io.druid.server.initialization;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
@ -28,13 +28,14 @@ import com.google.inject.Module;
import com.google.inject.servlet.GuiceFilter; import com.google.inject.servlet.GuiceFilter;
import com.metamx.common.lifecycle.Lifecycle; import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.http.client.HttpClient; import com.metamx.http.client.HttpClient;
import com.metamx.http.client.response.InputStreamResponseHandler;
import com.metamx.http.client.response.StatusResponseHandler; import com.metamx.http.client.response.StatusResponseHandler;
import com.metamx.http.client.response.StatusResponseHolder; import com.metamx.http.client.response.StatusResponseHolder;
import io.druid.guice.Jerseys; import io.druid.guice.Jerseys;
import io.druid.guice.LazySingleton; import io.druid.guice.LazySingleton;
import io.druid.guice.annotations.Global; import io.druid.guice.annotations.Global;
import io.druid.initialization.Initialization; import io.druid.initialization.Initialization;
import io.druid.server.initialization.JettyServerInitializer; import org.apache.commons.io.IOUtils;
import org.eclipse.jetty.server.Handler; import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.DefaultHandler; import org.eclipse.jetty.server.handler.DefaultHandler;
@ -43,13 +44,22 @@ import org.eclipse.jetty.servlet.DefaultServlet;
import org.eclipse.jetty.servlet.ServletContextHandler; import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder; import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.servlets.GzipFilter; import org.eclipse.jetty.servlets.GzipFilter;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore; import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
import javax.servlet.ServletOutputStream;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.GET; import javax.ws.rs.GET;
import javax.ws.rs.Path; import javax.ws.rs.Path;
import javax.ws.rs.Produces; import javax.ws.rs.Produces;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.Response; import javax.ws.rs.core.Response;
import java.io.IOException;
import java.io.InputStream;
import java.io.StringWriter;
import java.net.URL; import java.net.URL;
import java.nio.charset.Charset; import java.nio.charset.Charset;
import java.util.Random; import java.util.Random;
@ -61,6 +71,9 @@ import java.util.concurrent.atomic.AtomicLong;
public class JettyTest public class JettyTest
{ {
private Lifecycle lifecycle;
private HttpClient client;
public static void setProperties() public static void setProperties()
{ {
System.setProperty("druid.host", "localhost:9999"); System.setProperty("druid.host", "localhost:9999");
@ -71,11 +84,9 @@ public class JettyTest
System.setProperty("druid.global.http.readTimeout", "PT1S"); System.setProperty("druid.global.http.readTimeout", "PT1S");
} }
@Test @Ignore // this test will deadlock if it hits an issue, so ignored by default @Before
public void testTimeouts() throws Exception public void setup() throws Exception
{ {
// test for request timeouts properly not locking up all threads
setProperties(); setProperties();
Injector injector = Initialization.makeInjectorWithModules( Injector injector = Initialization.makeInjectorWithModules(
Initialization.makeStartupInjector(), Lists.<Object>newArrayList( Initialization.makeStartupInjector(), Lists.<Object>newArrayList(
@ -86,17 +97,24 @@ public class JettyTest
{ {
binder.bind(JettyServerInitializer.class).to(JettyServerInit.class).in(LazySingleton.class); binder.bind(JettyServerInitializer.class).to(JettyServerInit.class).in(LazySingleton.class);
Jerseys.addResource(binder, SlowResource.class); Jerseys.addResource(binder, SlowResource.class);
Jerseys.addResource(binder, ExceptionResource.class);
} }
} }
) )
); );
Lifecycle lifecycle = injector.getInstance(Lifecycle.class); lifecycle = injector.getInstance(Lifecycle.class);
// Jetty is Lazy Initialized do a getInstance // Jetty is Lazy Initialized do a getInstance
injector.getInstance(Server.class); injector.getInstance(Server.class);
lifecycle.start(); lifecycle.start();
ClientHolder holder = injector.getInstance(ClientHolder.class); ClientHolder holder = injector.getInstance(ClientHolder.class);
final HttpClient client = holder.getClient(); client = holder.getClient();
}
@Test
@Ignore // this test will deadlock if it hits an issue, so ignored by default
public void testTimeouts() throws Exception
{
// test for request timeouts properly not locking up all threads
final Executor executor = Executors.newFixedThreadPool(100); final Executor executor = Executors.newFixedThreadPool(100);
final AtomicLong count = new AtomicLong(0); final AtomicLong count = new AtomicLong(0);
final CountDownLatch latch = new CountDownLatch(1000); final CountDownLatch latch = new CountDownLatch(1000);
@ -149,8 +167,35 @@ public class JettyTest
} }
latch.await(); latch.await();
lifecycle.stop(); }
// Tests that threads are not stuck when partial chunk is not finalized
// https://bugs.eclipse.org/bugs/show_bug.cgi?id=424107
@Test
public void testChunkNotFinalized() throws Exception
{
ListenableFuture<InputStream> go = client.get(
new URL(
"http://localhost:9999/exception/exception"
)
)
.go(new InputStreamResponseHandler());
try {
StringWriter writer = new StringWriter();
IOUtils.copy(go.get(), writer, "utf-8");
Assert.fail("Should have thrown Exception");
}
catch (IOException e) {
// Expected.
}
}
@After
public void teardown()
{
lifecycle.stop();
} }
public static class ClientHolder public static class ClientHolder
@ -206,4 +251,27 @@ public class JettyTest
return Response.ok("hello").build(); return Response.ok("hello").build();
} }
} }
@Path("/exception")
public static class ExceptionResource
{
@GET
@Path("/exception")
@Produces("application/json")
public Response exception(
@Context HttpServletResponse resp
) throws IOException
{
final ServletOutputStream outputStream = resp.getOutputStream();
outputStream.println("hello");
outputStream.flush();
try {
TimeUnit.MILLISECONDS.sleep(200);
}
catch (InterruptedException e) {
//
}
throw new IOException();
}
}
} }