serialize DateTime As Long to improve json serde performance (#4038)

This commit is contained in:
kaijianding 2017-06-07 01:08:51 +08:00 committed by Jonathan Wei
parent d22db30db4
commit 551a89bd67
5 changed files with 50 additions and 9 deletions

View File

@ -18,6 +18,8 @@ The query context is used for various query configuration parameters. The follow
|bySegment | `false` | Return "by segment" results. Primarily used for debugging, setting it to `true` returns results associated with the data segment they came from |
|finalize | `true` | Flag indicating whether to "finalize" aggregation results. Primarily used for debugging. For instance, the `hyperUnique` aggregator will return the full HyperLogLog sketch instead of the estimated cardinality when this flag is set to `false` |
|chunkPeriod | `P0D` (off) | At the broker node level, long interval queries (of any type) may be broken into shorter interval queries to parallelize merging more than normal. Broken up queries will use a larger share of cluster resources, but may be able to complete faster as a result. Use ISO 8601 periods. For example, if this property is set to `P1M` (one month), then a query covering a year would be broken into 12 smaller queries. The broker uses its query processing executor service to initiate processing for query chunks, so make sure "druid.processing.numThreads" is configured appropriately on the broker. [groupBy queries](groupbyquery.html) do not support chunkPeriod by default, although they do if using the legacy "v1" engine. |
|serializeDateTimeAsLong| `false` | If true, DateTime is serialized as long in the result returned by broker and the data transportation between broker and compute node|
|serializeDateTimeAsLongInner| `false` | If true, DateTime is serialized as long in the data transportation between broker and compute node|
In addition, some query types offer context parameters specific to that query type.

View File

@ -75,6 +75,16 @@ public class QueryContexts
return parseBoolean(query, "finalize", defaultValue);
}
public static <T> boolean isSerializeDateTimeAsLong(Query<T> query, boolean defaultValue)
{
return parseBoolean(query, "serializeDateTimeAsLong", defaultValue);
}
public static <T> boolean isSerializeDateTimeAsLongInner(Query<T> query, boolean defaultValue)
{
return parseBoolean(query, "serializeDateTimeAsLongInner", defaultValue);
}
public static <T> int getUncoveredIntervalsLimit(Query<T> query)
{
return getUncoveredIntervalsLimit(query, DEFAULT_UNCOVERED_INTERVALS_LIMIT);

View File

@ -52,7 +52,7 @@ public class EventHolder
public DateTime getTimestamp()
{
Object retVal = event.get(timestampKey);
if (retVal instanceof String) {
if (retVal instanceof String || retVal instanceof Long) {
return new DateTime(retVal);
} else if (retVal instanceof DateTime) {
return (DateTime) retVal;

View File

@ -105,7 +105,7 @@ public class TimeBoundaryResultValue
if (val instanceof DateTime) {
return (DateTime) val;
} else if (val instanceof String) {
} else if (val instanceof String || val instanceof Long) {
return new DateTime(val);
} else {
throw new IAE("Cannot get time from type[%s]", val.getClass());

View File

@ -21,6 +21,8 @@ package io.druid.server;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.fasterxml.jackson.datatype.joda.ser.DateTimeSerializer;
import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
@ -40,6 +42,7 @@ import io.druid.java.util.common.guava.Yielders;
import io.druid.query.DruidMetrics;
import io.druid.query.GenericQueryMetricsFactory;
import io.druid.query.Query;
import io.druid.query.QueryContexts;
import io.druid.query.QueryInterruptedException;
import io.druid.query.QueryMetrics;
import io.druid.query.QueryPlus;
@ -97,6 +100,8 @@ public class QueryResource implements QueryCountStatsProvider
protected final ServerConfig config;
protected final ObjectMapper jsonMapper;
protected final ObjectMapper smileMapper;
protected final ObjectMapper serializeDateTimeAsLongJsonMapper;
protected final ObjectMapper serializeDateTimeAsLongSmileMapper;
protected final QuerySegmentWalker texasRanger;
protected final ServiceEmitter emitter;
protected final RequestLogger requestLogger;
@ -125,6 +130,8 @@ public class QueryResource implements QueryCountStatsProvider
this.config = config;
this.jsonMapper = jsonMapper;
this.smileMapper = smileMapper;
this.serializeDateTimeAsLongJsonMapper = serializeDataTimeAsLong(jsonMapper);
this.serializeDateTimeAsLongSmileMapper = serializeDataTimeAsLong(smileMapper);
this.texasRanger = texasRanger;
this.emitter = emitter;
this.requestLogger = requestLogger;
@ -250,7 +257,11 @@ public class QueryResource implements QueryCountStatsProvider
try {
final Query theQuery = query;
final QueryToolChest theToolChest = toolChest;
final ObjectWriter jsonWriter = context.newOutputWriter();
boolean shouldFinalize = QueryContexts.isFinalize(query, true);
boolean serializeDateTimeAsLong =
QueryContexts.isSerializeDateTimeAsLong(query, false)
|| (!shouldFinalize && QueryContexts.isSerializeDateTimeAsLongInner(query, false));
final ObjectWriter jsonWriter = context.newOutputWriter(serializeDateTimeAsLong);
Response.ResponseBuilder builder = Response
.ok(
new StreamingOutput()
@ -451,24 +462,41 @@ public class QueryResource implements QueryCountStatsProvider
}
}
protected ObjectMapper serializeDataTimeAsLong(ObjectMapper mapper)
{
return mapper.copy().registerModule(new SimpleModule().addSerializer(DateTime.class, new DateTimeSerializer()));
}
protected ResponseContext createContext(String requestType, boolean pretty)
{
boolean isSmile = SmileMediaTypes.APPLICATION_JACKSON_SMILE.equals(requestType) ||
APPLICATION_SMILE.equals(requestType);
String contentType = isSmile ? SmileMediaTypes.APPLICATION_JACKSON_SMILE : MediaType.APPLICATION_JSON;
return new ResponseContext(contentType, isSmile ? smileMapper : jsonMapper, pretty);
return new ResponseContext(
contentType,
isSmile ? smileMapper : jsonMapper,
isSmile ? serializeDateTimeAsLongSmileMapper : serializeDateTimeAsLongJsonMapper,
pretty
);
}
protected static class ResponseContext
{
private final String contentType;
private final ObjectMapper inputMapper;
private final ObjectMapper serializeDateTimeAsLongInputMapper;
private final boolean isPretty;
ResponseContext(String contentType, ObjectMapper inputMapper, boolean isPretty)
ResponseContext(
String contentType,
ObjectMapper inputMapper,
ObjectMapper serializeDateTimeAsLongInputMapper,
boolean isPretty
)
{
this.contentType = contentType;
this.inputMapper = inputMapper;
this.serializeDateTimeAsLongInputMapper = serializeDateTimeAsLongInputMapper;
this.isPretty = isPretty;
}
@ -482,21 +510,22 @@ public class QueryResource implements QueryCountStatsProvider
return inputMapper;
}
ObjectWriter newOutputWriter()
ObjectWriter newOutputWriter(boolean serializeDateTimeAsLong)
{
return isPretty ? inputMapper.writerWithDefaultPrettyPrinter() : inputMapper.writer();
ObjectMapper mapper = serializeDateTimeAsLong ? serializeDateTimeAsLongInputMapper : inputMapper;
return isPretty ? mapper.writerWithDefaultPrettyPrinter() : mapper.writer();
}
Response ok(Object object) throws IOException
{
return Response.ok(newOutputWriter().writeValueAsString(object), contentType).build();
return Response.ok(newOutputWriter(false).writeValueAsString(object), contentType).build();
}
Response gotError(Exception e) throws IOException
{
return Response.serverError()
.type(contentType)
.entity(newOutputWriter().writeValueAsBytes(QueryInterruptedException.wrapIfNeeded(e)))
.entity(newOutputWriter(false).writeValueAsBytes(QueryInterruptedException.wrapIfNeeded(e)))
.build();
}
}