From 551a89bd678e506103e309a4b7619cbcebe3993b Mon Sep 17 00:00:00 2001 From: kaijianding Date: Wed, 7 Jun 2017 01:08:51 +0800 Subject: [PATCH] serialize DateTime As Long to improve json serde performance (#4038) --- docs/content/querying/query-context.md | 2 + .../java/io/druid/query/QueryContexts.java | 10 +++++ .../io/druid/query/select/EventHolder.java | 2 +- .../timeboundary/TimeBoundaryResultValue.java | 2 +- .../java/io/druid/server/QueryResource.java | 43 ++++++++++++++++--- 5 files changed, 50 insertions(+), 9 deletions(-) diff --git a/docs/content/querying/query-context.md b/docs/content/querying/query-context.md index 440c6d8c87b..aa05cfce8a1 100644 --- a/docs/content/querying/query-context.md +++ b/docs/content/querying/query-context.md @@ -18,6 +18,8 @@ The query context is used for various query configuration parameters. The follow |bySegment | `false` | Return "by segment" results. Primarily used for debugging, setting it to `true` returns results associated with the data segment they came from | |finalize | `true` | Flag indicating whether to "finalize" aggregation results. Primarily used for debugging. For instance, the `hyperUnique` aggregator will return the full HyperLogLog sketch instead of the estimated cardinality when this flag is set to `false` | |chunkPeriod | `P0D` (off) | At the broker node level, long interval queries (of any type) may be broken into shorter interval queries to parallelize merging more than normal. Broken up queries will use a larger share of cluster resources, but may be able to complete faster as a result. Use ISO 8601 periods. For example, if this property is set to `P1M` (one month), then a query covering a year would be broken into 12 smaller queries. The broker uses its query processing executor service to initiate processing for query chunks, so make sure "druid.processing.numThreads" is configured appropriately on the broker. [groupBy queries](groupbyquery.html) do not support chunkPeriod by default, although they do if using the legacy "v1" engine. | +|serializeDateTimeAsLong| `false` | If true, DateTime is serialized as long in the result returned by broker and the data transportation between broker and compute node| +|serializeDateTimeAsLongInner| `false` | If true, DateTime is serialized as long in the data transportation between broker and compute node| In addition, some query types offer context parameters specific to that query type. diff --git a/processing/src/main/java/io/druid/query/QueryContexts.java b/processing/src/main/java/io/druid/query/QueryContexts.java index 473c3c50318..2e9dfdd9b68 100644 --- a/processing/src/main/java/io/druid/query/QueryContexts.java +++ b/processing/src/main/java/io/druid/query/QueryContexts.java @@ -75,6 +75,16 @@ public class QueryContexts return parseBoolean(query, "finalize", defaultValue); } + public static boolean isSerializeDateTimeAsLong(Query query, boolean defaultValue) + { + return parseBoolean(query, "serializeDateTimeAsLong", defaultValue); + } + + public static boolean isSerializeDateTimeAsLongInner(Query query, boolean defaultValue) + { + return parseBoolean(query, "serializeDateTimeAsLongInner", defaultValue); + } + public static int getUncoveredIntervalsLimit(Query query) { return getUncoveredIntervalsLimit(query, DEFAULT_UNCOVERED_INTERVALS_LIMIT); diff --git a/processing/src/main/java/io/druid/query/select/EventHolder.java b/processing/src/main/java/io/druid/query/select/EventHolder.java index 7a7c619f9e3..9120b2075df 100644 --- a/processing/src/main/java/io/druid/query/select/EventHolder.java +++ b/processing/src/main/java/io/druid/query/select/EventHolder.java @@ -52,7 +52,7 @@ public class EventHolder public DateTime getTimestamp() { Object retVal = event.get(timestampKey); - if (retVal instanceof String) { + if (retVal instanceof String || retVal instanceof Long) { return new DateTime(retVal); } else if (retVal instanceof DateTime) { return (DateTime) retVal; diff --git a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryResultValue.java b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryResultValue.java index d7784115379..18982199787 100644 --- a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryResultValue.java +++ b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryResultValue.java @@ -105,7 +105,7 @@ public class TimeBoundaryResultValue if (val instanceof DateTime) { return (DateTime) val; - } else if (val instanceof String) { + } else if (val instanceof String || val instanceof Long) { return new DateTime(val); } else { throw new IAE("Cannot get time from type[%s]", val.getClass()); diff --git a/server/src/main/java/io/druid/server/QueryResource.java b/server/src/main/java/io/druid/server/QueryResource.java index 2ff2766b6c3..b1f1013f47e 100644 --- a/server/src/main/java/io/druid/server/QueryResource.java +++ b/server/src/main/java/io/druid/server/QueryResource.java @@ -21,6 +21,8 @@ package io.druid.server; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectWriter; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.fasterxml.jackson.datatype.joda.ser.DateTimeSerializer; import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; @@ -40,6 +42,7 @@ import io.druid.java.util.common.guava.Yielders; import io.druid.query.DruidMetrics; import io.druid.query.GenericQueryMetricsFactory; import io.druid.query.Query; +import io.druid.query.QueryContexts; import io.druid.query.QueryInterruptedException; import io.druid.query.QueryMetrics; import io.druid.query.QueryPlus; @@ -97,6 +100,8 @@ public class QueryResource implements QueryCountStatsProvider protected final ServerConfig config; protected final ObjectMapper jsonMapper; protected final ObjectMapper smileMapper; + protected final ObjectMapper serializeDateTimeAsLongJsonMapper; + protected final ObjectMapper serializeDateTimeAsLongSmileMapper; protected final QuerySegmentWalker texasRanger; protected final ServiceEmitter emitter; protected final RequestLogger requestLogger; @@ -125,6 +130,8 @@ public class QueryResource implements QueryCountStatsProvider this.config = config; this.jsonMapper = jsonMapper; this.smileMapper = smileMapper; + this.serializeDateTimeAsLongJsonMapper = serializeDataTimeAsLong(jsonMapper); + this.serializeDateTimeAsLongSmileMapper = serializeDataTimeAsLong(smileMapper); this.texasRanger = texasRanger; this.emitter = emitter; this.requestLogger = requestLogger; @@ -250,7 +257,11 @@ public class QueryResource implements QueryCountStatsProvider try { final Query theQuery = query; final QueryToolChest theToolChest = toolChest; - final ObjectWriter jsonWriter = context.newOutputWriter(); + boolean shouldFinalize = QueryContexts.isFinalize(query, true); + boolean serializeDateTimeAsLong = + QueryContexts.isSerializeDateTimeAsLong(query, false) + || (!shouldFinalize && QueryContexts.isSerializeDateTimeAsLongInner(query, false)); + final ObjectWriter jsonWriter = context.newOutputWriter(serializeDateTimeAsLong); Response.ResponseBuilder builder = Response .ok( new StreamingOutput() @@ -451,24 +462,41 @@ public class QueryResource implements QueryCountStatsProvider } } + protected ObjectMapper serializeDataTimeAsLong(ObjectMapper mapper) + { + return mapper.copy().registerModule(new SimpleModule().addSerializer(DateTime.class, new DateTimeSerializer())); + } + protected ResponseContext createContext(String requestType, boolean pretty) { boolean isSmile = SmileMediaTypes.APPLICATION_JACKSON_SMILE.equals(requestType) || APPLICATION_SMILE.equals(requestType); String contentType = isSmile ? SmileMediaTypes.APPLICATION_JACKSON_SMILE : MediaType.APPLICATION_JSON; - return new ResponseContext(contentType, isSmile ? smileMapper : jsonMapper, pretty); + return new ResponseContext( + contentType, + isSmile ? smileMapper : jsonMapper, + isSmile ? serializeDateTimeAsLongSmileMapper : serializeDateTimeAsLongJsonMapper, + pretty + ); } protected static class ResponseContext { private final String contentType; private final ObjectMapper inputMapper; + private final ObjectMapper serializeDateTimeAsLongInputMapper; private final boolean isPretty; - ResponseContext(String contentType, ObjectMapper inputMapper, boolean isPretty) + ResponseContext( + String contentType, + ObjectMapper inputMapper, + ObjectMapper serializeDateTimeAsLongInputMapper, + boolean isPretty + ) { this.contentType = contentType; this.inputMapper = inputMapper; + this.serializeDateTimeAsLongInputMapper = serializeDateTimeAsLongInputMapper; this.isPretty = isPretty; } @@ -482,21 +510,22 @@ public class QueryResource implements QueryCountStatsProvider return inputMapper; } - ObjectWriter newOutputWriter() + ObjectWriter newOutputWriter(boolean serializeDateTimeAsLong) { - return isPretty ? inputMapper.writerWithDefaultPrettyPrinter() : inputMapper.writer(); + ObjectMapper mapper = serializeDateTimeAsLong ? serializeDateTimeAsLongInputMapper : inputMapper; + return isPretty ? mapper.writerWithDefaultPrettyPrinter() : mapper.writer(); } Response ok(Object object) throws IOException { - return Response.ok(newOutputWriter().writeValueAsString(object), contentType).build(); + return Response.ok(newOutputWriter(false).writeValueAsString(object), contentType).build(); } Response gotError(Exception e) throws IOException { return Response.serverError() .type(contentType) - .entity(newOutputWriter().writeValueAsBytes(QueryInterruptedException.wrapIfNeeded(e))) + .entity(newOutputWriter(false).writeValueAsBytes(QueryInterruptedException.wrapIfNeeded(e))) .build(); } }