From 8f7c82d35152f9f869506b9495e768feb05fcb31 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Thu, 12 Jun 2014 15:44:04 -0700 Subject: [PATCH 1/3] avoid closing yielder prematurely --- .../DruidDefaultSerializersModule.java | 4 +- .../java/io/druid/server/QueryResource.java | 46 +++++++++++-------- 2 files changed, 30 insertions(+), 20 deletions(-) diff --git a/processing/src/main/java/io/druid/jackson/DruidDefaultSerializersModule.java b/processing/src/main/java/io/druid/jackson/DruidDefaultSerializersModule.java index 6184221a1db..a0cc2b87f73 100644 --- a/processing/src/main/java/io/druid/jackson/DruidDefaultSerializersModule.java +++ b/processing/src/main/java/io/druid/jackson/DruidDefaultSerializersModule.java @@ -132,17 +132,17 @@ public class DruidDefaultSerializersModule extends SimpleModule public void serialize(Yielder yielder, final JsonGenerator jgen, SerializerProvider provider) throws IOException, JsonProcessingException { - jgen.writeStartArray(); try { + jgen.writeStartArray(); while (!yielder.isDone()) { final Object o = yielder.get(); jgen.writeObject(o); yielder = yielder.next(null); } + jgen.writeEndArray(); } finally { yielder.close(); } - jgen.writeEndArray(); } } ); diff --git a/server/src/main/java/io/druid/server/QueryResource.java b/server/src/main/java/io/druid/server/QueryResource.java index 33bdd519c83..8300b028ff7 100644 --- a/server/src/main/java/io/druid/server/QueryResource.java +++ b/server/src/main/java/io/druid/server/QueryResource.java @@ -147,28 +147,29 @@ public class QueryResource log.debug("Got query [%s]", query); } - Sequence results = query.run(texasRanger); - - if (results == null) { + Sequence res = query.run(texasRanger); + final Sequence results; + if (res == null) { results = Sequences.empty(); + } else { + results = res; } - try ( - final Yielder yielder = results.toYielder( - null, - new YieldingAccumulator() - { - @Override - public Object accumulate(Object accumulated, Object in) - { - yield(); - return in; - } - } - ) - ) { - long requestTime = System.currentTimeMillis() - start; + final Yielder yielder = results.toYielder( + null, + new YieldingAccumulator() + { + @Override + public Object accumulate(Object accumulated, Object in) + { + yield(); + return in; + } + } + ); + try { + long requestTime = System.currentTimeMillis() - start; emitter.emit( new ServiceMetricEvent.Builder() .setUser2(DataSourceUtil.getMetricName(query.getDataSource())) @@ -202,6 +203,7 @@ public class QueryResource @Override public void write(OutputStream outputStream) throws IOException, WebApplicationException { + // json serializer will always close the yielder jsonWriter.writeValue(outputStream, yielder); outputStream.close(); } @@ -211,6 +213,14 @@ public class QueryResource .header("X-Druid-Query-Id", queryId) .build(); } + catch (Exception e) { + // make sure to close yieder if anything happened before starting to serialize the response. + yielder.close(); + } + finally { + // do not close yielder here, since we do not want to close the yielder prior to + // StreamingOutput having iterated over all the results + } } catch (QueryInterruptedException e) { try { From b2419b1530764ce72ea47f12f89208f9b2293488 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Thu, 12 Jun 2014 17:09:21 -0700 Subject: [PATCH 2/3] fix missing return statement --- server/src/main/java/io/druid/server/QueryResource.java | 1 + 1 file changed, 1 insertion(+) diff --git a/server/src/main/java/io/druid/server/QueryResource.java b/server/src/main/java/io/druid/server/QueryResource.java index 8300b028ff7..7d7d39b402d 100644 --- a/server/src/main/java/io/druid/server/QueryResource.java +++ b/server/src/main/java/io/druid/server/QueryResource.java @@ -216,6 +216,7 @@ public class QueryResource catch (Exception e) { // make sure to close yieder if anything happened before starting to serialize the response. yielder.close(); + throw Throwables.propagate(e); } finally { // do not close yielder here, since we do not want to close the yielder prior to From 4c4047165f626e9dbcae42bdcd00c916a8ec8069 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Fri, 13 Jun 2014 11:28:48 -0700 Subject: [PATCH 3/3] fix query cancellation test race condition --- .../io/druid/query/ChainedExecutionQueryRunnerTest.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/processing/src/test/java/io/druid/query/ChainedExecutionQueryRunnerTest.java b/processing/src/test/java/io/druid/query/ChainedExecutionQueryRunnerTest.java index f2555dd7214..a8c464a8703 100644 --- a/processing/src/test/java/io/druid/query/ChainedExecutionQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/ChainedExecutionQueryRunnerTest.java @@ -243,7 +243,9 @@ public class ChainedExecutionQueryRunnerTest Assert.assertTrue(future.isCancelled()); Assert.assertTrue(runner1.hasStarted); Assert.assertTrue(runner2.hasStarted); - Assert.assertFalse(runner3.hasStarted); + Assert.assertTrue(runner1.interrupted); + Assert.assertTrue(runner2.interrupted); + Assert.assertTrue(!runner3.hasStarted || runner3.interrupted); Assert.assertFalse(runner1.hasCompleted); Assert.assertFalse(runner2.hasCompleted); Assert.assertFalse(runner3.hasCompleted); @@ -256,6 +258,7 @@ public class ChainedExecutionQueryRunnerTest private final CountDownLatch latch; private boolean hasStarted = false; private boolean hasCompleted = false; + private boolean interrupted = false; public DyingQueryRunner(CountDownLatch latch) { @@ -268,6 +271,7 @@ public class ChainedExecutionQueryRunnerTest hasStarted = true; latch.countDown(); if (Thread.interrupted()) { + interrupted = true; throw new QueryInterruptedException("I got killed"); } @@ -276,6 +280,7 @@ public class ChainedExecutionQueryRunnerTest Thread.sleep(500); } catch (InterruptedException e) { + interrupted = true; throw new QueryInterruptedException("I got killed"); }