mirror of https://github.com/apache/druid.git
Merge pull request #597 from metamx/fix-yielder-close
avoid closing yielder prematurely
This commit is contained in:
commit
f06a85ec26
|
@ -132,17 +132,17 @@ public class DruidDefaultSerializersModule extends SimpleModule
|
|||
public void serialize(Yielder yielder, final JsonGenerator jgen, SerializerProvider provider)
|
||||
throws IOException, JsonProcessingException
|
||||
{
|
||||
jgen.writeStartArray();
|
||||
try {
|
||||
jgen.writeStartArray();
|
||||
while (!yielder.isDone()) {
|
||||
final Object o = yielder.get();
|
||||
jgen.writeObject(o);
|
||||
yielder = yielder.next(null);
|
||||
}
|
||||
jgen.writeEndArray();
|
||||
} finally {
|
||||
yielder.close();
|
||||
}
|
||||
jgen.writeEndArray();
|
||||
}
|
||||
}
|
||||
);
|
||||
|
|
|
@ -147,28 +147,29 @@ public class QueryResource
|
|||
log.debug("Got query [%s]", query);
|
||||
}
|
||||
|
||||
Sequence results = query.run(texasRanger);
|
||||
|
||||
if (results == null) {
|
||||
Sequence res = query.run(texasRanger);
|
||||
final Sequence results;
|
||||
if (res == null) {
|
||||
results = Sequences.empty();
|
||||
} else {
|
||||
results = res;
|
||||
}
|
||||
|
||||
try (
|
||||
final Yielder yielder = results.toYielder(
|
||||
null,
|
||||
new YieldingAccumulator()
|
||||
{
|
||||
@Override
|
||||
public Object accumulate(Object accumulated, Object in)
|
||||
{
|
||||
yield();
|
||||
return in;
|
||||
}
|
||||
}
|
||||
)
|
||||
) {
|
||||
long requestTime = System.currentTimeMillis() - start;
|
||||
final Yielder yielder = results.toYielder(
|
||||
null,
|
||||
new YieldingAccumulator()
|
||||
{
|
||||
@Override
|
||||
public Object accumulate(Object accumulated, Object in)
|
||||
{
|
||||
yield();
|
||||
return in;
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
try {
|
||||
long requestTime = System.currentTimeMillis() - start;
|
||||
emitter.emit(
|
||||
new ServiceMetricEvent.Builder()
|
||||
.setUser2(DataSourceUtil.getMetricName(query.getDataSource()))
|
||||
|
@ -202,6 +203,7 @@ public class QueryResource
|
|||
@Override
|
||||
public void write(OutputStream outputStream) throws IOException, WebApplicationException
|
||||
{
|
||||
// json serializer will always close the yielder
|
||||
jsonWriter.writeValue(outputStream, yielder);
|
||||
outputStream.close();
|
||||
}
|
||||
|
@ -211,6 +213,14 @@ public class QueryResource
|
|||
.header("X-Druid-Query-Id", queryId)
|
||||
.build();
|
||||
}
|
||||
catch (Exception e) {
|
||||
// make sure to close yieder if anything happened before starting to serialize the response.
|
||||
yielder.close();
|
||||
}
|
||||
finally {
|
||||
// do not close yielder here, since we do not want to close the yielder prior to
|
||||
// StreamingOutput having iterated over all the results
|
||||
}
|
||||
}
|
||||
catch (QueryInterruptedException e) {
|
||||
try {
|
||||
|
|
Loading…
Reference in New Issue