add back metrics

This commit is contained in:
fjy 2014-07-29 10:22:55 -07:00
parent 088c2386dc
commit 2ad1bd3f44
3 changed files with 152 additions and 27 deletions

View File

@ -19,10 +19,14 @@
package io.druid.query;
import com.google.common.base.Joiner;
import java.util.List;
public class DataSourceUtil
{
public static final Joiner COMMA_JOIN = Joiner.on(",");
public static String getMetricName(DataSource dataSource)
{
final List<String> names = dataSource.getNames();

View File

@ -22,16 +22,22 @@ package io.druid.server;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.guice.annotations.Json;
import io.druid.guice.annotations.Smile;
import io.druid.query.DataSourceUtil;
import io.druid.query.Query;
import io.druid.server.log.RequestLogger;
import io.druid.server.router.QueryHostFinder;
import io.druid.server.router.Router;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.client.util.BytesContentProvider;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.proxy.AsyncProxyServlet;
import org.joda.time.DateTime;
@ -72,6 +78,7 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet
private final ObjectMapper smileMapper;
private final QueryHostFinder hostFinder;
private final HttpClient httpClient;
private final ServiceEmitter emitter;
private final RequestLogger requestLogger;
public AsyncQueryForwardingServlet(
@ -79,6 +86,7 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet
@Smile ObjectMapper smileMapper,
QueryHostFinder hostFinder,
@Router HttpClient httpClient,
ServiceEmitter emitter,
RequestLogger requestLogger
)
{
@ -86,6 +94,7 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet
this.smileMapper = smileMapper;
this.hostFinder = hostFinder;
this.httpClient = httpClient;
this.emitter = emitter;
this.requestLogger = requestLogger;
}
@ -97,6 +106,11 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet
String host = hostFinder.getDefaultHost();
Query inputQuery = null;
boolean hasContent = request.getContentLength() > 0 || request.getContentType() != null;
long startTime = System.currentTimeMillis();
// queries only exist for POST
if (request.getMethod().equalsIgnoreCase(HttpMethod.POST.asString())) {
try {
inputQuery = objectMapper.readValue(request.getInputStream(), Query.class);
if (inputQuery != null) {
@ -123,6 +137,7 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet
catch (Exception e) {
handleException(response, objectMapper, e);
}
}
final int requestId = getRequestId(request);
@ -148,7 +163,6 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet
.version(HttpVersion.fromString(request.getProtocol()));
// Copy headers
boolean hasContent = request.getContentLength() > 0 || request.getContentType() != null;
for (Enumeration<String> headerNames = request.getHeaderNames(); headerNames.hasMoreElements(); ) {
String headerName = headerNames.nextElement();
@ -176,8 +190,12 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet
getTimeout(), TimeUnit.MILLISECONDS
);
if (hasContent && inputQuery != null) {
if (hasContent) {
if (inputQuery != null) {
proxyRequest.content(new BytesContentProvider(jsonMapper.writeValueAsBytes(inputQuery)));
} else {
proxyRequest.content(proxyRequestContent(proxyRequest, request));
}
}
customizeProxyRequest(proxyRequest, request);
@ -238,4 +256,102 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet
}
return URI.create(uri.toString());
}
private class MetricsEmittingProxyResponseListener extends ProxyResponseListener
{
private final HttpServletRequest req;
private final HttpServletResponse res;
private final Query query;
private final long start;
public MetricsEmittingProxyResponseListener(
HttpServletRequest request,
HttpServletResponse response,
Query query,
long start
)
{
super(request, response);
this.req = request;
this.res = response;
this.query = query;
this.start = start;
}
@Override
public void onComplete(Result result)
{
final long requestTime = System.currentTimeMillis() - start;
emitter.emit(
new ServiceMetricEvent.Builder()
.setUser2(DataSourceUtil.getMetricName(query.getDataSource()))
.setUser3(String.valueOf(query.getContextPriority(0)))
.setUser4(query.getType())
.setUser5(DataSourceUtil.COMMA_JOIN.join(query.getIntervals()))
.setUser6(String.valueOf(query.hasFilters()))
.setUser7(req.getRemoteAddr())
.setUser8(query.getId())
.setUser9(query.getDuration().toPeriod().toStandardMinutes().toString())
.build("request/time", requestTime)
);
try {
requestLogger.log(
new RequestLogLine(
new DateTime(),
req.getRemoteAddr(),
query,
new QueryStats(
ImmutableMap.<String, Object>of(
"request/time",
requestTime,
"success",
true
)
)
)
);
}
catch (Exception e) {
log.error(e, "Unable to log query [%s]!", query);
}
super.onComplete(result);
}
@Override
public void onFailure(Response response, Throwable failure)
{
try {
final String errorMessage = failure.getMessage();
requestLogger.log(
new RequestLogLine(
new DateTime(),
req.getRemoteAddr(),
query,
new QueryStats(
ImmutableMap.<String, Object>of(
"success",
false,
"exception",
errorMessage == null ? "no message" : errorMessage
)
)
)
);
}
catch (IOException logError) {
log.error(logError, "Unable to log query [%s]!", query);
}
log.makeAlert(failure, "Exception handling request")
.addData("exception", failure.toString())
.addData("query", query)
.addData("peer", req.getRemoteAddr())
.emit();
super.onFailure(response, failure);
}
}
}

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject;
import com.google.inject.Injector;
import com.google.inject.servlet.GuiceFilter;
import com.metamx.emitter.service.ServiceEmitter;
import io.druid.guice.annotations.Json;
import io.druid.guice.annotations.Smile;
import io.druid.server.AsyncQueryForwardingServlet;
@ -48,6 +49,7 @@ public class RouterJettyServerInitializer implements JettyServerInitializer
private final ObjectMapper smileMapper;
private final QueryHostFinder hostFinder;
private final HttpClient httpClient;
private final ServiceEmitter emitter;
private final RequestLogger requestLogger;
@Inject
@ -56,6 +58,7 @@ public class RouterJettyServerInitializer implements JettyServerInitializer
@Smile ObjectMapper smileMapper,
QueryHostFinder hostFinder,
@Router HttpClient httpClient,
ServiceEmitter emitter,
RequestLogger requestLogger
)
{
@ -63,6 +66,7 @@ public class RouterJettyServerInitializer implements JettyServerInitializer
this.smileMapper = smileMapper;
this.hostFinder = hostFinder;
this.httpClient = httpClient;
this.emitter = emitter;
this.requestLogger = requestLogger;
}
@ -77,6 +81,7 @@ public class RouterJettyServerInitializer implements JettyServerInitializer
smileMapper,
hostFinder,
httpClient,
emitter,
requestLogger
)
), "/druid/v2/*"