mirror of https://github.com/apache/druid.git
Merge pull request #475 from metamx/better-request-logs
Add more meaningful request logging
This commit is contained in:
commit
4d5cd7b087
|
@ -21,6 +21,8 @@ package io.druid.server;
|
|||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.api.client.repackaged.com.google.common.base.Throwables;
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.metamx.emitter.EmittingLogger;
|
||||
import com.metamx.emitter.service.ServiceEmitter;
|
||||
import com.metamx.emitter.service.ServiceMetricEvent;
|
||||
|
@ -46,8 +48,10 @@ import javax.servlet.http.HttpServletResponse;
|
|||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.nio.charset.Charset;
|
||||
import java.util.UUID;
|
||||
|
||||
/**
|
||||
* This class does async query processing and should be merged with QueryResource at some point
|
||||
*/
|
||||
@WebServlet(asyncSupported = true)
|
||||
public class AsyncQueryForwardingServlet extends HttpServlet
|
||||
|
@ -55,6 +59,7 @@ public class AsyncQueryForwardingServlet extends HttpServlet
|
|||
private static final EmittingLogger log = new EmittingLogger(AsyncQueryForwardingServlet.class);
|
||||
private static final Charset UTF8 = Charset.forName("UTF-8");
|
||||
private static final String DISPATCHED = "dispatched";
|
||||
private static final Joiner COMMA_JOIN = Joiner.on(",");
|
||||
|
||||
private final ObjectMapper jsonMapper;
|
||||
private final ObjectMapper smileMapper;
|
||||
|
@ -62,7 +67,6 @@ public class AsyncQueryForwardingServlet extends HttpServlet
|
|||
private final RoutingDruidClient routingDruidClient;
|
||||
private final ServiceEmitter emitter;
|
||||
private final RequestLogger requestLogger;
|
||||
private final QueryIDProvider idProvider;
|
||||
|
||||
public AsyncQueryForwardingServlet(
|
||||
@Json ObjectMapper jsonMapper,
|
||||
|
@ -70,8 +74,7 @@ public class AsyncQueryForwardingServlet extends HttpServlet
|
|||
QueryHostFinder hostFinder,
|
||||
RoutingDruidClient routingDruidClient,
|
||||
ServiceEmitter emitter,
|
||||
RequestLogger requestLogger,
|
||||
QueryIDProvider idProvider
|
||||
RequestLogger requestLogger
|
||||
)
|
||||
{
|
||||
this.jsonMapper = jsonMapper;
|
||||
|
@ -80,7 +83,6 @@ public class AsyncQueryForwardingServlet extends HttpServlet
|
|||
this.routingDruidClient = routingDruidClient;
|
||||
this.emitter = emitter;
|
||||
this.requestLogger = requestLogger;
|
||||
this.idProvider = idProvider;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -94,7 +96,7 @@ public class AsyncQueryForwardingServlet extends HttpServlet
|
|||
|
||||
final boolean isSmile = "application/smile".equals(req.getContentType());
|
||||
|
||||
ObjectMapper objectMapper = isSmile ? smileMapper : jsonMapper;
|
||||
final ObjectMapper objectMapper = isSmile ? smileMapper : jsonMapper;
|
||||
|
||||
OutputStream out = null;
|
||||
|
||||
|
@ -110,13 +112,14 @@ public class AsyncQueryForwardingServlet extends HttpServlet
|
|||
query = objectMapper.readValue(req.getInputStream(), Query.class);
|
||||
queryId = query.getId();
|
||||
if (queryId == null) {
|
||||
queryId = idProvider.next(query);
|
||||
queryId = UUID.randomUUID().toString();
|
||||
query = query.withId(queryId);
|
||||
}
|
||||
|
||||
requestLogger.log(
|
||||
new RequestLogLine(new DateTime(), req.getRemoteAddr(), query)
|
||||
);
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("Got query [%s]", query);
|
||||
}
|
||||
|
||||
out = resp.getOutputStream();
|
||||
final OutputStream outputStream = out;
|
||||
|
||||
|
@ -167,13 +170,13 @@ public class AsyncQueryForwardingServlet extends HttpServlet
|
|||
{
|
||||
final long requestTime = System.currentTimeMillis() - start;
|
||||
|
||||
log.info("Request time: %d", requestTime);
|
||||
log.debug("Request time: %d", requestTime);
|
||||
|
||||
emitter.emit(
|
||||
new ServiceMetricEvent.Builder()
|
||||
.setUser2(theQuery.getDataSource().getName())
|
||||
.setUser4(theQuery.getType())
|
||||
.setUser5(theQuery.getIntervals().get(0).toString())
|
||||
.setUser5(COMMA_JOIN.join(theQuery.getIntervals()))
|
||||
.setUser6(String.valueOf(theQuery.hasFilters()))
|
||||
.setUser7(req.getRemoteAddr())
|
||||
.setUser8(theQueryId)
|
||||
|
@ -183,6 +186,15 @@ public class AsyncQueryForwardingServlet extends HttpServlet
|
|||
|
||||
final OutputStream obj = clientResponse.getObj();
|
||||
try {
|
||||
requestLogger.log(
|
||||
new RequestLogLine(
|
||||
new DateTime(),
|
||||
req.getRemoteAddr(),
|
||||
theQuery,
|
||||
new QueryStats(ImmutableMap.<String, Object>of("request/time", requestTime, "success", true))
|
||||
)
|
||||
);
|
||||
|
||||
resp.flushBuffer();
|
||||
outputStream.close();
|
||||
}
|
||||
|
@ -230,6 +242,20 @@ public class AsyncQueryForwardingServlet extends HttpServlet
|
|||
|
||||
resp.flushBuffer();
|
||||
|
||||
try {
|
||||
requestLogger.log(
|
||||
new RequestLogLine(
|
||||
new DateTime(),
|
||||
req.getRemoteAddr(),
|
||||
query,
|
||||
new QueryStats(ImmutableMap.<String, Object>of("success", false, "exception", e.toString()))
|
||||
)
|
||||
);
|
||||
}
|
||||
catch (Exception e2) {
|
||||
log.error(e2, "Unable to log query [%s]!", query);
|
||||
}
|
||||
|
||||
log.makeAlert(e, "Exception handling request")
|
||||
.addData("query", query)
|
||||
.addData("peer", req.getRemoteAddr())
|
||||
|
|
|
@ -22,6 +22,7 @@ package io.druid.server;
|
|||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.databind.ObjectWriter;
|
||||
import com.google.common.base.Charsets;
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.io.ByteStreams;
|
||||
import com.google.common.io.Closeables;
|
||||
|
@ -49,6 +50,7 @@ import javax.ws.rs.core.Context;
|
|||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.nio.charset.Charset;
|
||||
import java.util.UUID;
|
||||
|
||||
/**
|
||||
*/
|
||||
|
@ -57,12 +59,13 @@ public class QueryResource
|
|||
{
|
||||
private static final Logger log = new Logger(QueryResource.class);
|
||||
private static final Charset UTF8 = Charset.forName("UTF-8");
|
||||
private static final Joiner COMMA_JOIN = Joiner.on(",");
|
||||
|
||||
private final ObjectMapper jsonMapper;
|
||||
private final ObjectMapper smileMapper;
|
||||
private final QuerySegmentWalker texasRanger;
|
||||
private final ServiceEmitter emitter;
|
||||
private final RequestLogger requestLogger;
|
||||
private final QueryIDProvider idProvider;
|
||||
|
||||
@Inject
|
||||
public QueryResource(
|
||||
|
@ -70,8 +73,7 @@ public class QueryResource
|
|||
@Smile ObjectMapper smileMapper,
|
||||
QuerySegmentWalker texasRanger,
|
||||
ServiceEmitter emitter,
|
||||
RequestLogger requestLogger,
|
||||
QueryIDProvider idProvider
|
||||
RequestLogger requestLogger
|
||||
)
|
||||
{
|
||||
this.jsonMapper = jsonMapper;
|
||||
|
@ -79,7 +81,6 @@ public class QueryResource
|
|||
this.texasRanger = texasRanger;
|
||||
this.emitter = emitter;
|
||||
this.requestLogger = requestLogger;
|
||||
this.idProvider = idProvider;
|
||||
}
|
||||
|
||||
@POST
|
||||
|
@ -107,13 +108,13 @@ public class QueryResource
|
|||
query = objectMapper.readValue(requestQuery, Query.class);
|
||||
queryId = query.getId();
|
||||
if (queryId == null) {
|
||||
queryId = idProvider.next(query);
|
||||
queryId = UUID.randomUUID().toString();
|
||||
query = query.withId(queryId);
|
||||
}
|
||||
|
||||
requestLogger.log(
|
||||
new RequestLogLine(new DateTime(), req.getRemoteAddr(), query)
|
||||
);
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("Got query [%s]", query);
|
||||
}
|
||||
|
||||
Sequence<?> results = query.run(texasRanger);
|
||||
|
||||
|
@ -133,13 +134,27 @@ public class QueryResource
|
|||
new ServiceMetricEvent.Builder()
|
||||
.setUser2(query.getDataSource().toString())
|
||||
.setUser4(query.getType())
|
||||
.setUser5(query.getIntervals().get(0).toString())
|
||||
.setUser5(COMMA_JOIN.join(query.getIntervals()))
|
||||
.setUser6(String.valueOf(query.hasFilters()))
|
||||
.setUser7(req.getRemoteAddr())
|
||||
.setUser8(queryId)
|
||||
.setUser9(query.getDuration().toPeriod().toStandardMinutes().toString())
|
||||
.build("request/time", requestTime)
|
||||
);
|
||||
|
||||
requestLogger.log(
|
||||
new RequestLogLine(
|
||||
new DateTime(),
|
||||
req.getRemoteAddr(),
|
||||
query,
|
||||
new QueryStats(
|
||||
ImmutableMap.<String, Object>of(
|
||||
"request/time", requestTime,
|
||||
"success", true
|
||||
)
|
||||
)
|
||||
)
|
||||
);
|
||||
}
|
||||
catch (Exception e) {
|
||||
final String queryString =
|
||||
|
@ -163,6 +178,20 @@ public class QueryResource
|
|||
|
||||
resp.flushBuffer();
|
||||
|
||||
try {
|
||||
requestLogger.log(
|
||||
new RequestLogLine(
|
||||
new DateTime(),
|
||||
req.getRemoteAddr(),
|
||||
query,
|
||||
new QueryStats(ImmutableMap.<String, Object>of("success", false, "exception", e.toString()))
|
||||
)
|
||||
);
|
||||
}
|
||||
catch (Exception e2) {
|
||||
log.error(e2, "Unable to log query [%s]!", queryString);
|
||||
}
|
||||
|
||||
emitter.emit(
|
||||
new AlertEvent.Builder().build(
|
||||
"Exception handling request",
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc.
|
||||
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
|
@ -19,35 +19,24 @@
|
|||
|
||||
package io.druid.server;
|
||||
|
||||
import com.google.inject.Inject;
|
||||
import com.google.inject.Singleton;
|
||||
import io.druid.guice.annotations.Self;
|
||||
import io.druid.query.Query;
|
||||
import org.joda.time.DateTime;
|
||||
import com.fasterxml.jackson.annotation.JsonValue;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.Map;
|
||||
|
||||
@Singleton
|
||||
public class QueryIDProvider
|
||||
/**
|
||||
*/
|
||||
public class QueryStats
|
||||
{
|
||||
private final String host;
|
||||
private final AtomicLong id = new AtomicLong();
|
||||
private final Map<String, Object> stats;
|
||||
|
||||
@Inject
|
||||
public QueryIDProvider(@Self DruidNode node)
|
||||
public QueryStats(Map<String, Object> stats)
|
||||
{
|
||||
host = node.getHost();
|
||||
this.stats = stats;
|
||||
}
|
||||
|
||||
public String next(Query query)
|
||||
@JsonValue
|
||||
public Map<String, Object> getStats()
|
||||
{
|
||||
return String.format(
|
||||
"%s_%s_%s_%s_%s",
|
||||
query.getDataSource(),
|
||||
query.getIntervals(),
|
||||
host,
|
||||
new DateTime(),
|
||||
id.incrementAndGet()
|
||||
);
|
||||
return stats;
|
||||
}
|
||||
}
|
|
@ -35,12 +35,14 @@ public class RequestLogLine
|
|||
private final DateTime timestamp;
|
||||
private final String remoteAddr;
|
||||
private final Query query;
|
||||
private final QueryStats queryStats;
|
||||
|
||||
public RequestLogLine(DateTime timestamp, String remoteAddr, Query query)
|
||||
public RequestLogLine(DateTime timestamp, String remoteAddr, Query query, QueryStats queryStats)
|
||||
{
|
||||
this.timestamp = timestamp;
|
||||
this.remoteAddr = remoteAddr;
|
||||
this.query = query;
|
||||
this.queryStats = queryStats;
|
||||
}
|
||||
|
||||
public String getLine(ObjectMapper objectMapper) throws JsonProcessingException
|
||||
|
@ -49,7 +51,8 @@ public class RequestLogLine
|
|||
Arrays.asList(
|
||||
timestamp,
|
||||
remoteAddr,
|
||||
objectMapper.writeValueAsString(query)
|
||||
objectMapper.writeValueAsString(query),
|
||||
objectMapper.writeValueAsString(queryStats)
|
||||
)
|
||||
);
|
||||
}
|
||||
|
@ -71,4 +74,10 @@ public class RequestLogLine
|
|||
{
|
||||
return remoteAddr;
|
||||
}
|
||||
|
||||
@JsonProperty("queryStats")
|
||||
public QueryStats getQueryStats()
|
||||
{
|
||||
return queryStats;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -28,7 +28,6 @@ import io.druid.client.RoutingDruidClient;
|
|||
import io.druid.guice.annotations.Json;
|
||||
import io.druid.guice.annotations.Smile;
|
||||
import io.druid.server.AsyncQueryForwardingServlet;
|
||||
import io.druid.server.QueryIDProvider;
|
||||
import io.druid.server.initialization.JettyServerInitializer;
|
||||
import io.druid.server.log.RequestLogger;
|
||||
import io.druid.server.router.QueryHostFinder;
|
||||
|
@ -51,7 +50,6 @@ public class RouterJettyServerInitializer implements JettyServerInitializer
|
|||
private final RoutingDruidClient routingDruidClient;
|
||||
private final ServiceEmitter emitter;
|
||||
private final RequestLogger requestLogger;
|
||||
private final QueryIDProvider idProvider;
|
||||
|
||||
@Inject
|
||||
public RouterJettyServerInitializer(
|
||||
|
@ -60,8 +58,7 @@ public class RouterJettyServerInitializer implements JettyServerInitializer
|
|||
QueryHostFinder hostFinder,
|
||||
RoutingDruidClient routingDruidClient,
|
||||
ServiceEmitter emitter,
|
||||
RequestLogger requestLogger,
|
||||
QueryIDProvider idProvider
|
||||
RequestLogger requestLogger
|
||||
)
|
||||
{
|
||||
this.jsonMapper = jsonMapper;
|
||||
|
@ -70,7 +67,6 @@ public class RouterJettyServerInitializer implements JettyServerInitializer
|
|||
this.routingDruidClient = routingDruidClient;
|
||||
this.emitter = emitter;
|
||||
this.requestLogger = requestLogger;
|
||||
this.idProvider = idProvider;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -85,8 +81,7 @@ public class RouterJettyServerInitializer implements JettyServerInitializer
|
|||
hostFinder,
|
||||
routingDruidClient,
|
||||
emitter,
|
||||
requestLogger,
|
||||
idProvider
|
||||
requestLogger
|
||||
)
|
||||
), "/druid/v2/*"
|
||||
);
|
||||
|
|
Loading…
Reference in New Issue