avoid closing yielder prematurely

This commit is contained in:
Xavier Léauté 2014-06-12 15:44:04 -07:00
parent ada2d92c16
commit 8f7c82d351
2 changed files with 30 additions and 20 deletions

View File

@ -132,17 +132,17 @@ public class DruidDefaultSerializersModule extends SimpleModule
public void serialize(Yielder yielder, final JsonGenerator jgen, SerializerProvider provider) public void serialize(Yielder yielder, final JsonGenerator jgen, SerializerProvider provider)
throws IOException, JsonProcessingException throws IOException, JsonProcessingException
{ {
jgen.writeStartArray();
try { try {
jgen.writeStartArray();
while (!yielder.isDone()) { while (!yielder.isDone()) {
final Object o = yielder.get(); final Object o = yielder.get();
jgen.writeObject(o); jgen.writeObject(o);
yielder = yielder.next(null); yielder = yielder.next(null);
} }
jgen.writeEndArray();
} finally { } finally {
yielder.close(); yielder.close();
} }
jgen.writeEndArray();
} }
} }
); );

View File

@ -147,28 +147,29 @@ public class QueryResource
log.debug("Got query [%s]", query); log.debug("Got query [%s]", query);
} }
Sequence results = query.run(texasRanger); Sequence res = query.run(texasRanger);
final Sequence results;
if (results == null) { if (res == null) {
results = Sequences.empty(); results = Sequences.empty();
} else {
results = res;
} }
try ( final Yielder yielder = results.toYielder(
final Yielder yielder = results.toYielder( null,
null, new YieldingAccumulator()
new YieldingAccumulator() {
{ @Override
@Override public Object accumulate(Object accumulated, Object in)
public Object accumulate(Object accumulated, Object in) {
{ yield();
yield(); return in;
return in; }
} }
} );
)
) {
long requestTime = System.currentTimeMillis() - start;
try {
long requestTime = System.currentTimeMillis() - start;
emitter.emit( emitter.emit(
new ServiceMetricEvent.Builder() new ServiceMetricEvent.Builder()
.setUser2(DataSourceUtil.getMetricName(query.getDataSource())) .setUser2(DataSourceUtil.getMetricName(query.getDataSource()))
@ -202,6 +203,7 @@ public class QueryResource
@Override @Override
public void write(OutputStream outputStream) throws IOException, WebApplicationException public void write(OutputStream outputStream) throws IOException, WebApplicationException
{ {
// json serializer will always close the yielder
jsonWriter.writeValue(outputStream, yielder); jsonWriter.writeValue(outputStream, yielder);
outputStream.close(); outputStream.close();
} }
@ -211,6 +213,14 @@ public class QueryResource
.header("X-Druid-Query-Id", queryId) .header("X-Druid-Query-Id", queryId)
.build(); .build();
} }
catch (Exception e) {
// make sure to close yieder if anything happened before starting to serialize the response.
yielder.close();
}
finally {
// do not close yielder here, since we do not want to close the yielder prior to
// StreamingOutput having iterated over all the results
}
} }
catch (QueryInterruptedException e) { catch (QueryInterruptedException e) {
try { try {