Merge branch 'master' into offheap-incremental-index

This commit is contained in:
nishantmonu51 2014-06-13 15:39:34 +05:30
commit 49d3bd94da
2 changed files with 31 additions and 20 deletions

View File

@ -132,17 +132,17 @@ public class DruidDefaultSerializersModule extends SimpleModule
public void serialize(Yielder yielder, final JsonGenerator jgen, SerializerProvider provider)
throws IOException, JsonProcessingException
{
jgen.writeStartArray();
try {
jgen.writeStartArray();
while (!yielder.isDone()) {
final Object o = yielder.get();
jgen.writeObject(o);
yielder = yielder.next(null);
}
jgen.writeEndArray();
} finally {
yielder.close();
}
jgen.writeEndArray();
}
}
);

View File

@ -147,28 +147,29 @@ public class QueryResource
log.debug("Got query [%s]", query);
}
Sequence results = query.run(texasRanger);
if (results == null) {
Sequence res = query.run(texasRanger);
final Sequence results;
if (res == null) {
results = Sequences.empty();
} else {
results = res;
}
try (
final Yielder yielder = results.toYielder(
null,
new YieldingAccumulator()
{
@Override
public Object accumulate(Object accumulated, Object in)
{
yield();
return in;
}
}
)
) {
long requestTime = System.currentTimeMillis() - start;
final Yielder yielder = results.toYielder(
null,
new YieldingAccumulator()
{
@Override
public Object accumulate(Object accumulated, Object in)
{
yield();
return in;
}
}
);
try {
long requestTime = System.currentTimeMillis() - start;
emitter.emit(
new ServiceMetricEvent.Builder()
.setUser2(DataSourceUtil.getMetricName(query.getDataSource()))
@ -202,6 +203,7 @@ public class QueryResource
@Override
public void write(OutputStream outputStream) throws IOException, WebApplicationException
{
// json serializer will always close the yielder
jsonWriter.writeValue(outputStream, yielder);
outputStream.close();
}
@ -211,6 +213,15 @@ public class QueryResource
.header("X-Druid-Query-Id", queryId)
.build();
}
catch (Exception e) {
// make sure to close yieder if anything happened before starting to serialize the response.
yielder.close();
throw Throwables.propagate(e);
}
finally {
// do not close yielder here, since we do not want to close the yielder prior to
// StreamingOutput having iterated over all the results
}
}
catch (QueryInterruptedException e) {
try {