diff --git a/processing/src/main/java/io/druid/jackson/DefaultObjectMapper.java b/processing/src/main/java/io/druid/jackson/DefaultObjectMapper.java index 952b506bd95..e55e2299d2b 100644 --- a/processing/src/main/java/io/druid/jackson/DefaultObjectMapper.java +++ b/processing/src/main/java/io/druid/jackson/DefaultObjectMapper.java @@ -32,7 +32,12 @@ public class DefaultObjectMapper extends ObjectMapper { public DefaultObjectMapper() { - this(null); + this((JsonFactory)null); + } + + public DefaultObjectMapper(DefaultObjectMapper mapper) + { + super(mapper); } public DefaultObjectMapper(JsonFactory factory) @@ -52,4 +57,10 @@ public class DefaultObjectMapper extends ObjectMapper configure(MapperFeature.AUTO_DETECT_SETTERS, false); configure(SerializationFeature.INDENT_OUTPUT, false); } + + @Override + public ObjectMapper copy() + { + return new DefaultObjectMapper(this); + } } diff --git a/processing/src/main/java/io/druid/jackson/DruidDefaultSerializersModule.java b/processing/src/main/java/io/druid/jackson/DruidDefaultSerializersModule.java index 068bfecd963..6184221a1db 100644 --- a/processing/src/main/java/io/druid/jackson/DruidDefaultSerializersModule.java +++ b/processing/src/main/java/io/druid/jackson/DruidDefaultSerializersModule.java @@ -105,7 +105,7 @@ public class DruidDefaultSerializersModule extends SimpleModule jgen.writeStartArray(); value.accumulate( null, - new Accumulator() + new Accumulator() { @Override public Object accumulate(Object o, Object o1) @@ -116,7 +116,7 @@ public class DruidDefaultSerializersModule extends SimpleModule catch (IOException e) { throw Throwables.propagate(e); } - return o; + return null; } } ); @@ -124,6 +124,28 @@ public class DruidDefaultSerializersModule extends SimpleModule } } ); + addSerializer( + Yielder.class, + new JsonSerializer() + { + @Override + public void serialize(Yielder yielder, final JsonGenerator jgen, SerializerProvider provider) + throws IOException, JsonProcessingException + { + jgen.writeStartArray(); + try { + while (!yielder.isDone()) { + final Object o = yielder.get(); + jgen.writeObject(o); + yielder = yielder.next(null); + } + } finally { + yielder.close(); + } + jgen.writeEndArray(); + } + } + ); addSerializer(ByteOrder.class, ToStringSerializer.instance); addDeserializer( ByteOrder.class, diff --git a/server/src/main/java/io/druid/server/QueryResource.java b/server/src/main/java/io/druid/server/QueryResource.java index e710a6c97c9..1e6ea06607f 100644 --- a/server/src/main/java/io/druid/server/QueryResource.java +++ b/server/src/main/java/io/druid/server/QueryResource.java @@ -19,16 +19,23 @@ package io.druid.server; +import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectWriter; +import com.google.api.client.repackaged.com.google.common.base.Throwables; import com.google.common.base.Charsets; import com.google.common.base.Joiner; import com.google.common.collect.ImmutableMap; import com.google.common.io.ByteStreams; import com.google.common.io.Closeables; import com.google.inject.Inject; +import com.metamx.common.guava.Accumulator; +import com.metamx.common.guava.Accumulators; import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequences; +import com.metamx.common.guava.Yielder; +import com.metamx.common.guava.YieldingAccumulator; +import com.metamx.common.guava.YieldingAccumulators; import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceMetricEvent; @@ -36,21 +43,25 @@ import io.druid.guice.annotations.Json; import io.druid.guice.annotations.Smile; import io.druid.query.DataSourceUtil; import io.druid.query.Query; +import io.druid.query.QueryInterruptedException; import io.druid.query.QuerySegmentWalker; -import io.druid.query.QueryWatcher; import io.druid.server.log.RequestLogger; import org.joda.time.DateTime; import javax.servlet.ServletException; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; +import javax.ws.rs.Consumes; import javax.ws.rs.DELETE; import javax.ws.rs.POST; import javax.ws.rs.Path; import javax.ws.rs.PathParam; import javax.ws.rs.Produces; +import javax.ws.rs.WebApplicationException; import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; +import javax.ws.rs.core.StreamingOutput; import java.io.IOException; import java.io.OutputStream; import java.nio.charset.Charset; @@ -64,6 +75,8 @@ public class QueryResource private static final EmittingLogger log = new EmittingLogger(QueryResource.class); private static final Charset UTF8 = Charset.forName("UTF-8"); private static final Joiner COMMA_JOIN = Joiner.on(","); + public static final String APPLICATION_SMILE = "application/smile"; + public static final String APPLICATION_JSON = "application/json"; private final ObjectMapper jsonMapper; private final ObjectMapper smileMapper; @@ -82,8 +95,12 @@ public class QueryResource QueryManager queryManager ) { - this.jsonMapper = jsonMapper; - this.smileMapper = smileMapper; + this.jsonMapper = jsonMapper.copy(); + this.jsonMapper.getFactory().configure(JsonGenerator.Feature.AUTO_CLOSE_TARGET, false); + + this.smileMapper = smileMapper.copy(); + this.smileMapper.getFactory().configure(JsonGenerator.Feature.AUTO_CLOSE_TARGET, false); + this.texasRanger = texasRanger; this.emitter = emitter; this.requestLogger = requestLogger; @@ -97,13 +114,13 @@ public class QueryResource { queryManager.cancelQuery(queryId); return Response.status(Response.Status.ACCEPTED).build(); + } @POST - @Produces("application/json") - public void doPost( + public Response doPost( @Context HttpServletRequest req, - @Context HttpServletResponse resp + @Context final HttpServletResponse resp ) throws ServletException, IOException { final long start = System.currentTimeMillis(); @@ -111,13 +128,12 @@ public class QueryResource byte[] requestQuery = null; String queryId; - final boolean isSmile = "application/smile".equals(req.getContentType()); + final boolean isSmile = APPLICATION_SMILE.equals(req.getContentType()); ObjectMapper objectMapper = isSmile ? smileMapper : jsonMapper; - ObjectWriter jsonWriter = req.getParameter("pretty") == null + final ObjectWriter jsonWriter = req.getParameter("pretty") == null ? objectMapper.writer() : objectMapper.writerWithDefaultPrettyPrinter(); - OutputStream out = null; try { requestQuery = ByteStreams.toByteArray(req.getInputStream()); @@ -132,48 +148,70 @@ public class QueryResource log.debug("Got query [%s]", query); } - Sequence results = query.run(texasRanger); + Sequence results = query.run(texasRanger); if (results == null) { results = Sequences.empty(); } - resp.setStatus(200); - resp.setContentType("application/x-javascript"); - resp.setHeader("X-Druid-Query-Id", query.getId()); - - out = resp.getOutputStream(); - jsonWriter.writeValue(out, results); - -// JsonGenerator jgen = jsonWriter.getFactory().createGenerator(out); - - long requestTime = System.currentTimeMillis() - start; - - emitter.emit( - new ServiceMetricEvent.Builder() - .setUser2(DataSourceUtil.getMetricName(query.getDataSource())) - .setUser4(query.getType()) - .setUser5(COMMA_JOIN.join(query.getIntervals())) - .setUser6(String.valueOf(query.hasFilters())) - .setUser7(req.getRemoteAddr()) - .setUser8(queryId) - .setUser9(query.getDuration().toPeriod().toStandardMinutes().toString()) - .build("request/time", requestTime) - ); - - requestLogger.log( - new RequestLogLine( - new DateTime(), - req.getRemoteAddr(), - query, - new QueryStats( - ImmutableMap.of( - "request/time", requestTime, - "success", true - ) - ) + try ( + final Yielder yielder = results.toYielder( + null, + new YieldingAccumulator() + { + @Override + public Object accumulate(Object accumulated, Object in) + { + yield(); + return in; + } + } ) - ); + ) { + long requestTime = System.currentTimeMillis() - start; + + emitter.emit( + new ServiceMetricEvent.Builder() + .setUser2(DataSourceUtil.getMetricName(query.getDataSource())) + .setUser4(query.getType()) + .setUser5(COMMA_JOIN.join(query.getIntervals())) + .setUser6(String.valueOf(query.hasFilters())) + .setUser7(req.getRemoteAddr()) + .setUser8(queryId) + .setUser9(query.getDuration().toPeriod().toStandardMinutes().toString()) + .build("request/time", requestTime) + ); + + requestLogger.log( + new RequestLogLine( + new DateTime(), + req.getRemoteAddr(), + query, + new QueryStats( + ImmutableMap.of( + "request/time", requestTime, + "success", true + ) + ) + ) + ); + + return Response + .ok( + new StreamingOutput() + { + @Override + public void write(OutputStream outputStream) throws IOException, WebApplicationException + { + jsonWriter.writeValue(outputStream, yielder); + outputStream.close(); + } + }, + isSmile ? APPLICATION_JSON : APPLICATION_SMILE + ) + .header("X-Druid-Query-Id", queryId) + .build(); + } } catch (Exception e) { final String queryString = @@ -183,20 +221,6 @@ public class QueryResource log.warn(e, "Exception occurred on request [%s]", queryString); - if (!resp.isCommitted()) { - resp.setStatus(500); - resp.resetBuffer(); - - if (out == null) { - out = resp.getOutputStream(); - } - - out.write((e.getMessage() == null) ? "Exception null".getBytes(UTF8) : e.getMessage().getBytes(UTF8)); - out.write("\n".getBytes(UTF8)); - } - - resp.flushBuffer(); - try { requestLogger.log( new RequestLogLine( @@ -216,10 +240,14 @@ public class QueryResource .addData("query", queryString) .addData("peer", req.getRemoteAddr()) .emit(); - } - finally { - resp.flushBuffer(); - Closeables.closeQuietly(out); + + return Response.serverError().entity( + jsonWriter.writeValueAsString( + ImmutableMap.of( + "error", (e.getMessage() == null) ? "null Exception" : e.getMessage() + ) + ) + ).build(); } } }