diff --git a/processing/src/main/java/io/druid/jackson/DruidDefaultSerializersModule.java b/processing/src/main/java/io/druid/jackson/DruidDefaultSerializersModule.java index 6184221a1db..a0cc2b87f73 100644 --- a/processing/src/main/java/io/druid/jackson/DruidDefaultSerializersModule.java +++ b/processing/src/main/java/io/druid/jackson/DruidDefaultSerializersModule.java @@ -132,17 +132,17 @@ public class DruidDefaultSerializersModule extends SimpleModule public void serialize(Yielder yielder, final JsonGenerator jgen, SerializerProvider provider) throws IOException, JsonProcessingException { - jgen.writeStartArray(); try { + jgen.writeStartArray(); while (!yielder.isDone()) { final Object o = yielder.get(); jgen.writeObject(o); yielder = yielder.next(null); } + jgen.writeEndArray(); } finally { yielder.close(); } - jgen.writeEndArray(); } } ); diff --git a/server/src/main/java/io/druid/server/QueryResource.java b/server/src/main/java/io/druid/server/QueryResource.java index 33bdd519c83..8300b028ff7 100644 --- a/server/src/main/java/io/druid/server/QueryResource.java +++ b/server/src/main/java/io/druid/server/QueryResource.java @@ -147,28 +147,29 @@ public class QueryResource log.debug("Got query [%s]", query); } - Sequence results = query.run(texasRanger); - - if (results == null) { + Sequence res = query.run(texasRanger); + final Sequence results; + if (res == null) { results = Sequences.empty(); + } else { + results = res; } - try ( - final Yielder yielder = results.toYielder( - null, - new YieldingAccumulator() - { - @Override - public Object accumulate(Object accumulated, Object in) - { - yield(); - return in; - } - } - ) - ) { - long requestTime = System.currentTimeMillis() - start; + final Yielder yielder = results.toYielder( + null, + new YieldingAccumulator() + { + @Override + public Object accumulate(Object accumulated, Object in) + { + yield(); + return in; + } + } + ); + try { + long requestTime = System.currentTimeMillis() - start; emitter.emit( new ServiceMetricEvent.Builder() .setUser2(DataSourceUtil.getMetricName(query.getDataSource())) @@ -202,6 +203,7 @@ public class QueryResource @Override public void write(OutputStream outputStream) throws IOException, WebApplicationException { + // json serializer will always close the yielder jsonWriter.writeValue(outputStream, yielder); outputStream.close(); } @@ -211,6 +213,14 @@ public class QueryResource .header("X-Druid-Query-Id", queryId) .build(); } + catch (Exception e) { + // make sure to close yieder if anything happened before starting to serialize the response. + yielder.close(); + } + finally { + // do not close yielder here, since we do not want to close the yielder prior to + // StreamingOutput having iterated over all the results + } } catch (QueryInterruptedException e) { try {