Merge branch 'master' of github.com:metamx/druid into new-guava

This commit is contained in:
fjy 2014-06-13 11:30:07 -07:00
commit 4cc0353be2
3 changed files with 37 additions and 21 deletions

View File

@ -132,17 +132,17 @@ public class DruidDefaultSerializersModule extends SimpleModule
public void serialize(Yielder yielder, final JsonGenerator jgen, SerializerProvider provider) public void serialize(Yielder yielder, final JsonGenerator jgen, SerializerProvider provider)
throws IOException, JsonProcessingException throws IOException, JsonProcessingException
{ {
jgen.writeStartArray();
try { try {
jgen.writeStartArray();
while (!yielder.isDone()) { while (!yielder.isDone()) {
final Object o = yielder.get(); final Object o = yielder.get();
jgen.writeObject(o); jgen.writeObject(o);
yielder = yielder.next(null); yielder = yielder.next(null);
} }
jgen.writeEndArray();
} finally { } finally {
yielder.close(); yielder.close();
} }
jgen.writeEndArray();
} }
} }
); );

View File

@ -243,7 +243,9 @@ public class ChainedExecutionQueryRunnerTest
Assert.assertTrue(future.isCancelled()); Assert.assertTrue(future.isCancelled());
Assert.assertTrue(runner1.hasStarted); Assert.assertTrue(runner1.hasStarted);
Assert.assertTrue(runner2.hasStarted); Assert.assertTrue(runner2.hasStarted);
Assert.assertFalse(runner3.hasStarted); Assert.assertTrue(runner1.interrupted);
Assert.assertTrue(runner2.interrupted);
Assert.assertTrue(!runner3.hasStarted || runner3.interrupted);
Assert.assertFalse(runner1.hasCompleted); Assert.assertFalse(runner1.hasCompleted);
Assert.assertFalse(runner2.hasCompleted); Assert.assertFalse(runner2.hasCompleted);
Assert.assertFalse(runner3.hasCompleted); Assert.assertFalse(runner3.hasCompleted);
@ -256,6 +258,7 @@ public class ChainedExecutionQueryRunnerTest
private final CountDownLatch latch; private final CountDownLatch latch;
private boolean hasStarted = false; private boolean hasStarted = false;
private boolean hasCompleted = false; private boolean hasCompleted = false;
private boolean interrupted = false;
public DyingQueryRunner(CountDownLatch latch) public DyingQueryRunner(CountDownLatch latch)
{ {
@ -268,6 +271,7 @@ public class ChainedExecutionQueryRunnerTest
hasStarted = true; hasStarted = true;
latch.countDown(); latch.countDown();
if (Thread.interrupted()) { if (Thread.interrupted()) {
interrupted = true;
throw new QueryInterruptedException("I got killed"); throw new QueryInterruptedException("I got killed");
} }
@ -276,6 +280,7 @@ public class ChainedExecutionQueryRunnerTest
Thread.sleep(500); Thread.sleep(500);
} }
catch (InterruptedException e) { catch (InterruptedException e) {
interrupted = true;
throw new QueryInterruptedException("I got killed"); throw new QueryInterruptedException("I got killed");
} }

View File

@ -139,28 +139,29 @@ public class QueryResource
log.debug("Got query [%s]", query); log.debug("Got query [%s]", query);
} }
Sequence results = query.run(texasRanger); Sequence res = query.run(texasRanger);
final Sequence results;
if (results == null) { if (res == null) {
results = Sequences.empty(); results = Sequences.empty();
} else {
results = res;
} }
try ( final Yielder yielder = results.toYielder(
final Yielder yielder = results.toYielder( null,
null, new YieldingAccumulator()
new YieldingAccumulator() {
{ @Override
@Override public Object accumulate(Object accumulated, Object in)
public Object accumulate(Object accumulated, Object in) {
{ yield();
yield(); return in;
return in; }
} }
} );
)
) {
long requestTime = System.currentTimeMillis() - start;
try {
long requestTime = System.currentTimeMillis() - start;
emitter.emit( emitter.emit(
new ServiceMetricEvent.Builder() new ServiceMetricEvent.Builder()
.setUser2(DataSourceUtil.getMetricName(query.getDataSource())) .setUser2(DataSourceUtil.getMetricName(query.getDataSource()))
@ -194,6 +195,7 @@ public class QueryResource
@Override @Override
public void write(OutputStream outputStream) throws IOException, WebApplicationException public void write(OutputStream outputStream) throws IOException, WebApplicationException
{ {
// json serializer will always close the yielder
jsonWriter.writeValue(outputStream, yielder); jsonWriter.writeValue(outputStream, yielder);
outputStream.close(); outputStream.close();
} }
@ -203,6 +205,15 @@ public class QueryResource
.header("X-Druid-Query-Id", queryId) .header("X-Druid-Query-Id", queryId)
.build(); .build();
} }
catch (Exception e) {
// make sure to close yieder if anything happened before starting to serialize the response.
yielder.close();
throw Throwables.propagate(e);
}
finally {
// do not close yielder here, since we do not want to close the yielder prior to
// StreamingOutput having iterated over all the results
}
} }
catch (QueryInterruptedException e) { catch (QueryInterruptedException e) {
try { try {