mirror of https://github.com/apache/druid.git
Merge pull request #1471 from metamx/direct-client-metrics
add query/node/time metrics to DirectDruidClient
This commit is contained in:
commit
f2919b98b1
|
@ -29,6 +29,8 @@ Available Metrics
|
||||||
|Metric|Description|Dimensions|Normal Value|
|
|Metric|Description|Dimensions|Normal Value|
|
||||||
|------|-----------|----------|------------|
|
|------|-----------|----------|------------|
|
||||||
|`query/time`|Milliseconds taken to complete a query.|Common: dataSource, type, interval, hasFilters, duration, context, remoteAddress, id. Aggregation Queries: numMetrics, numComplexMetrics. GroupBy: numDimensions. TopN: threshold, dimension.|< 1s|
|
|`query/time`|Milliseconds taken to complete a query.|Common: dataSource, type, interval, hasFilters, duration, context, remoteAddress, id. Aggregation Queries: numMetrics, numComplexMetrics. GroupBy: numDimensions. TopN: threshold, dimension.|< 1s|
|
||||||
|
|`query/node/time`|Milliseconds taken to query individual historical/realtime nodes.|id, status, server.|< 1s|
|
||||||
|
|`query/node/ttfb`|Time to first byte. Milliseconds elapsed until broker starts receiving the response from individual historical/realtime nodes.|id, status, server.|< 1s|
|
||||||
|`query/intervalChunk/time`|Only emitted if interval chunking is enabled. Milliseconds required to query an interval chunk.|id, status, chunkInterval (if interval chunking is enabled).|< 1s|
|
|`query/intervalChunk/time`|Only emitted if interval chunking is enabled. Milliseconds required to query an interval chunk.|id, status, chunkInterval (if interval chunking is enabled).|< 1s|
|
||||||
|
|
||||||
### Historical
|
### Historical
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
package io.druid.query;
|
package io.druid.query;
|
||||||
|
|
||||||
import com.google.common.base.Function;
|
import com.google.common.base.Function;
|
||||||
|
import com.google.common.base.Strings;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
import com.google.common.collect.Maps;
|
import com.google.common.collect.Maps;
|
||||||
import com.metamx.common.guava.Accumulator;
|
import com.metamx.common.guava.Accumulator;
|
||||||
|
@ -103,11 +104,7 @@ public class MetricsEmittingQueryRunner<T> implements QueryRunner<T>
|
||||||
builder.setDimension(userDimension.getKey(), userDimension.getValue());
|
builder.setDimension(userDimension.getKey(), userDimension.getValue());
|
||||||
}
|
}
|
||||||
|
|
||||||
String queryId = query.getId();
|
builder.setDimension(DruidMetrics.ID, Strings.nullToEmpty(query.getId()));
|
||||||
if (queryId == null) {
|
|
||||||
queryId = "";
|
|
||||||
}
|
|
||||||
builder.setDimension(DruidMetrics.ID, queryId);
|
|
||||||
|
|
||||||
return new Sequence<T>()
|
return new Sequence<T>()
|
||||||
{
|
{
|
||||||
|
|
|
@ -24,6 +24,7 @@ import com.google.common.collect.Maps;
|
||||||
import com.google.common.collect.Ordering;
|
import com.google.common.collect.Ordering;
|
||||||
import com.google.inject.Inject;
|
import com.google.inject.Inject;
|
||||||
import com.metamx.common.logger.Logger;
|
import com.metamx.common.logger.Logger;
|
||||||
|
import com.metamx.emitter.service.ServiceEmitter;
|
||||||
import com.metamx.http.client.HttpClient;
|
import com.metamx.http.client.HttpClient;
|
||||||
import io.druid.client.selector.QueryableDruidServer;
|
import io.druid.client.selector.QueryableDruidServer;
|
||||||
import io.druid.client.selector.ServerSelector;
|
import io.druid.client.selector.ServerSelector;
|
||||||
|
@ -67,6 +68,7 @@ public class BrokerServerView implements TimelineServerView
|
||||||
private final HttpClient httpClient;
|
private final HttpClient httpClient;
|
||||||
private final ServerInventoryView baseView;
|
private final ServerInventoryView baseView;
|
||||||
private final TierSelectorStrategy tierSelectorStrategy;
|
private final TierSelectorStrategy tierSelectorStrategy;
|
||||||
|
private final ServiceEmitter emitter;
|
||||||
|
|
||||||
private volatile boolean initialized = false;
|
private volatile boolean initialized = false;
|
||||||
|
|
||||||
|
@ -77,7 +79,8 @@ public class BrokerServerView implements TimelineServerView
|
||||||
@Smile ObjectMapper smileMapper,
|
@Smile ObjectMapper smileMapper,
|
||||||
@Client HttpClient httpClient,
|
@Client HttpClient httpClient,
|
||||||
ServerInventoryView baseView,
|
ServerInventoryView baseView,
|
||||||
TierSelectorStrategy tierSelectorStrategy
|
TierSelectorStrategy tierSelectorStrategy,
|
||||||
|
ServiceEmitter emitter
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
this.warehouse = warehouse;
|
this.warehouse = warehouse;
|
||||||
|
@ -86,6 +89,7 @@ public class BrokerServerView implements TimelineServerView
|
||||||
this.httpClient = httpClient;
|
this.httpClient = httpClient;
|
||||||
this.baseView = baseView;
|
this.baseView = baseView;
|
||||||
this.tierSelectorStrategy = tierSelectorStrategy;
|
this.tierSelectorStrategy = tierSelectorStrategy;
|
||||||
|
this.emitter = emitter;
|
||||||
|
|
||||||
this.clients = Maps.newConcurrentMap();
|
this.clients = Maps.newConcurrentMap();
|
||||||
this.selectors = Maps.newHashMap();
|
this.selectors = Maps.newHashMap();
|
||||||
|
@ -173,7 +177,7 @@ public class BrokerServerView implements TimelineServerView
|
||||||
|
|
||||||
private DirectDruidClient makeDirectClient(DruidServer server)
|
private DirectDruidClient makeDirectClient(DruidServer server)
|
||||||
{
|
{
|
||||||
return new DirectDruidClient(warehouse, queryWatcher, smileMapper, httpClient, server.getHost());
|
return new DirectDruidClient(warehouse, queryWatcher, smileMapper, httpClient, server.getHost(), emitter);
|
||||||
}
|
}
|
||||||
|
|
||||||
private QueryableDruidServer removeServer(DruidServer server)
|
private QueryableDruidServer removeServer(DruidServer server)
|
||||||
|
|
|
@ -27,6 +27,7 @@ import com.fasterxml.jackson.databind.type.TypeFactory;
|
||||||
import com.fasterxml.jackson.dataformat.smile.SmileFactory;
|
import com.fasterxml.jackson.dataformat.smile.SmileFactory;
|
||||||
import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
|
import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
|
||||||
import com.google.common.base.Charsets;
|
import com.google.common.base.Charsets;
|
||||||
|
import com.google.common.base.Strings;
|
||||||
import com.google.common.base.Throwables;
|
import com.google.common.base.Throwables;
|
||||||
import com.google.common.collect.Maps;
|
import com.google.common.collect.Maps;
|
||||||
import com.google.common.io.ByteSource;
|
import com.google.common.io.ByteSource;
|
||||||
|
@ -41,6 +42,8 @@ import com.metamx.common.guava.CloseQuietly;
|
||||||
import com.metamx.common.guava.Sequence;
|
import com.metamx.common.guava.Sequence;
|
||||||
import com.metamx.common.guava.Sequences;
|
import com.metamx.common.guava.Sequences;
|
||||||
import com.metamx.common.logger.Logger;
|
import com.metamx.common.logger.Logger;
|
||||||
|
import com.metamx.emitter.service.ServiceEmitter;
|
||||||
|
import com.metamx.emitter.service.ServiceMetricEvent;
|
||||||
import com.metamx.http.client.HttpClient;
|
import com.metamx.http.client.HttpClient;
|
||||||
import com.metamx.http.client.Request;
|
import com.metamx.http.client.Request;
|
||||||
import com.metamx.http.client.response.ClientResponse;
|
import com.metamx.http.client.response.ClientResponse;
|
||||||
|
@ -48,6 +51,7 @@ import com.metamx.http.client.response.HttpResponseHandler;
|
||||||
import com.metamx.http.client.response.StatusResponseHandler;
|
import com.metamx.http.client.response.StatusResponseHandler;
|
||||||
import com.metamx.http.client.response.StatusResponseHolder;
|
import com.metamx.http.client.response.StatusResponseHolder;
|
||||||
import io.druid.query.BySegmentResultValueClass;
|
import io.druid.query.BySegmentResultValueClass;
|
||||||
|
import io.druid.query.DruidMetrics;
|
||||||
import io.druid.query.Query;
|
import io.druid.query.Query;
|
||||||
import io.druid.query.QueryInterruptedException;
|
import io.druid.query.QueryInterruptedException;
|
||||||
import io.druid.query.QueryRunner;
|
import io.druid.query.QueryRunner;
|
||||||
|
@ -94,6 +98,7 @@ public class DirectDruidClient<T> implements QueryRunner<T>
|
||||||
private final ObjectMapper objectMapper;
|
private final ObjectMapper objectMapper;
|
||||||
private final HttpClient httpClient;
|
private final HttpClient httpClient;
|
||||||
private final String host;
|
private final String host;
|
||||||
|
private final ServiceEmitter emitter;
|
||||||
|
|
||||||
private final AtomicInteger openConnections;
|
private final AtomicInteger openConnections;
|
||||||
private final boolean isSmile;
|
private final boolean isSmile;
|
||||||
|
@ -103,7 +108,8 @@ public class DirectDruidClient<T> implements QueryRunner<T>
|
||||||
QueryWatcher queryWatcher,
|
QueryWatcher queryWatcher,
|
||||||
ObjectMapper objectMapper,
|
ObjectMapper objectMapper,
|
||||||
HttpClient httpClient,
|
HttpClient httpClient,
|
||||||
String host
|
String host,
|
||||||
|
ServiceEmitter emitter
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
this.warehouse = warehouse;
|
this.warehouse = warehouse;
|
||||||
|
@ -111,6 +117,7 @@ public class DirectDruidClient<T> implements QueryRunner<T>
|
||||||
this.objectMapper = objectMapper;
|
this.objectMapper = objectMapper;
|
||||||
this.httpClient = httpClient;
|
this.httpClient = httpClient;
|
||||||
this.host = host;
|
this.host = host;
|
||||||
|
this.emitter = emitter;
|
||||||
|
|
||||||
this.isSmile = this.objectMapper.getFactory() instanceof SmileFactory;
|
this.isSmile = this.objectMapper.getFactory() instanceof SmileFactory;
|
||||||
this.openConnections = new AtomicInteger();
|
this.openConnections = new AtomicInteger();
|
||||||
|
@ -152,9 +159,16 @@ public class DirectDruidClient<T> implements QueryRunner<T>
|
||||||
try {
|
try {
|
||||||
log.debug("Querying url[%s]", url);
|
log.debug("Querying url[%s]", url);
|
||||||
|
|
||||||
|
final long requestStartTime = System.currentTimeMillis();
|
||||||
|
|
||||||
|
final ServiceMetricEvent.Builder builder = toolChest.makeMetricBuilder(query);
|
||||||
|
builder.setDimension("server", host);
|
||||||
|
builder.setDimension(DruidMetrics.ID, Strings.nullToEmpty(query.getId()));
|
||||||
|
|
||||||
|
|
||||||
final HttpResponseHandler<InputStream, InputStream> responseHandler = new HttpResponseHandler<InputStream, InputStream>()
|
final HttpResponseHandler<InputStream, InputStream> responseHandler = new HttpResponseHandler<InputStream, InputStream>()
|
||||||
{
|
{
|
||||||
private long startTime;
|
private long responseStartTime;
|
||||||
private final AtomicLong byteCount = new AtomicLong(0);
|
private final AtomicLong byteCount = new AtomicLong(0);
|
||||||
private final BlockingQueue<InputStream> queue = new LinkedBlockingQueue<>();
|
private final BlockingQueue<InputStream> queue = new LinkedBlockingQueue<>();
|
||||||
private final AtomicBoolean done = new AtomicBoolean(false);
|
private final AtomicBoolean done = new AtomicBoolean(false);
|
||||||
|
@ -163,7 +177,9 @@ public class DirectDruidClient<T> implements QueryRunner<T>
|
||||||
public ClientResponse<InputStream> handleResponse(HttpResponse response)
|
public ClientResponse<InputStream> handleResponse(HttpResponse response)
|
||||||
{
|
{
|
||||||
log.debug("Initial response from url[%s]", url);
|
log.debug("Initial response from url[%s]", url);
|
||||||
startTime = System.currentTimeMillis();
|
responseStartTime = System.currentTimeMillis();
|
||||||
|
emitter.emit(builder.build("query/node/ttfb", responseStartTime - requestStartTime));
|
||||||
|
|
||||||
try {
|
try {
|
||||||
final String responseContext = response.headers().get("X-Druid-Response-Context");
|
final String responseContext = response.headers().get("X-Druid-Response-Context");
|
||||||
// context may be null in case of error or query timeout
|
// context may be null in case of error or query timeout
|
||||||
|
@ -256,9 +272,10 @@ public class DirectDruidClient<T> implements QueryRunner<T>
|
||||||
"Completed request to url[%s] with %,d bytes returned in %,d millis [%,f b/s].",
|
"Completed request to url[%s] with %,d bytes returned in %,d millis [%,f b/s].",
|
||||||
url,
|
url,
|
||||||
byteCount.get(),
|
byteCount.get(),
|
||||||
stopTime - startTime,
|
stopTime - responseStartTime,
|
||||||
byteCount.get() / (0.0001 * (stopTime - startTime))
|
byteCount.get() / (0.0001 * (stopTime - responseStartTime))
|
||||||
);
|
);
|
||||||
|
emitter.emit(builder.build("query/node/time", stopTime - requestStartTime));
|
||||||
synchronized (done) {
|
synchronized (done) {
|
||||||
try {
|
try {
|
||||||
// An empty byte array is put at the end to give the SequenceInputStream.close() as something to close out
|
// An empty byte array is put at the end to give the SequenceInputStream.close() as something to close out
|
||||||
|
|
|
@ -30,6 +30,7 @@ import com.google.common.collect.ImmutableSet;
|
||||||
import com.google.common.collect.Iterables;
|
import com.google.common.collect.Iterables;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
import com.metamx.common.Pair;
|
import com.metamx.common.Pair;
|
||||||
|
import com.metamx.emitter.service.ServiceEmitter;
|
||||||
import com.metamx.http.client.HttpClient;
|
import com.metamx.http.client.HttpClient;
|
||||||
import io.druid.client.selector.HighestPriorityTierSelectorStrategy;
|
import io.druid.client.selector.HighestPriorityTierSelectorStrategy;
|
||||||
import io.druid.client.selector.RandomServerSelectorStrategy;
|
import io.druid.client.selector.RandomServerSelectorStrategy;
|
||||||
|
@ -41,6 +42,7 @@ import io.druid.query.QueryWatcher;
|
||||||
import io.druid.query.TableDataSource;
|
import io.druid.query.TableDataSource;
|
||||||
import io.druid.server.coordination.DruidServerMetadata;
|
import io.druid.server.coordination.DruidServerMetadata;
|
||||||
import io.druid.server.initialization.ZkPathsConfig;
|
import io.druid.server.initialization.ZkPathsConfig;
|
||||||
|
import io.druid.server.metrics.NoopServiceEmitter;
|
||||||
import io.druid.timeline.DataSegment;
|
import io.druid.timeline.DataSegment;
|
||||||
import io.druid.timeline.TimelineLookup;
|
import io.druid.timeline.TimelineLookup;
|
||||||
import io.druid.timeline.TimelineObjectHolder;
|
import io.druid.timeline.TimelineObjectHolder;
|
||||||
|
@ -364,7 +366,8 @@ public class BrokerServerViewTest extends CuratorTestBase
|
||||||
getSmileMapper(),
|
getSmileMapper(),
|
||||||
EasyMock.createMock(HttpClient.class),
|
EasyMock.createMock(HttpClient.class),
|
||||||
baseView,
|
baseView,
|
||||||
new HighestPriorityTierSelectorStrategy(new RandomServerSelectorStrategy())
|
new HighestPriorityTierSelectorStrategy(new RandomServerSelectorStrategy()),
|
||||||
|
new NoopServiceEmitter()
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -39,6 +39,7 @@ import io.druid.query.QueryRunnerTestHelper;
|
||||||
import io.druid.query.ReflectionQueryToolChestWarehouse;
|
import io.druid.query.ReflectionQueryToolChestWarehouse;
|
||||||
import io.druid.query.Result;
|
import io.druid.query.Result;
|
||||||
import io.druid.query.timeboundary.TimeBoundaryQuery;
|
import io.druid.query.timeboundary.TimeBoundaryQuery;
|
||||||
|
import io.druid.server.metrics.NoopServiceEmitter;
|
||||||
import io.druid.timeline.DataSegment;
|
import io.druid.timeline.DataSegment;
|
||||||
import io.druid.timeline.partition.NoneShardSpec;
|
import io.druid.timeline.partition.NoneShardSpec;
|
||||||
import org.easymock.Capture;
|
import org.easymock.Capture;
|
||||||
|
@ -117,14 +118,16 @@ public class DirectDruidClientTest
|
||||||
QueryRunnerTestHelper.NOOP_QUERYWATCHER,
|
QueryRunnerTestHelper.NOOP_QUERYWATCHER,
|
||||||
new DefaultObjectMapper(),
|
new DefaultObjectMapper(),
|
||||||
httpClient,
|
httpClient,
|
||||||
"foo"
|
"foo",
|
||||||
|
new NoopServiceEmitter()
|
||||||
);
|
);
|
||||||
DirectDruidClient client2 = new DirectDruidClient(
|
DirectDruidClient client2 = new DirectDruidClient(
|
||||||
new ReflectionQueryToolChestWarehouse(),
|
new ReflectionQueryToolChestWarehouse(),
|
||||||
QueryRunnerTestHelper.NOOP_QUERYWATCHER,
|
QueryRunnerTestHelper.NOOP_QUERYWATCHER,
|
||||||
new DefaultObjectMapper(),
|
new DefaultObjectMapper(),
|
||||||
httpClient,
|
httpClient,
|
||||||
"foo2"
|
"foo2",
|
||||||
|
new NoopServiceEmitter()
|
||||||
);
|
);
|
||||||
|
|
||||||
QueryableDruidServer queryableDruidServer1 = new QueryableDruidServer(
|
QueryableDruidServer queryableDruidServer1 = new QueryableDruidServer(
|
||||||
|
@ -225,7 +228,8 @@ public class DirectDruidClientTest
|
||||||
QueryRunnerTestHelper.NOOP_QUERYWATCHER,
|
QueryRunnerTestHelper.NOOP_QUERYWATCHER,
|
||||||
new DefaultObjectMapper(),
|
new DefaultObjectMapper(),
|
||||||
httpClient,
|
httpClient,
|
||||||
"foo"
|
"foo",
|
||||||
|
new NoopServiceEmitter()
|
||||||
);
|
);
|
||||||
|
|
||||||
QueryableDruidServer queryableDruidServer1 = new QueryableDruidServer(
|
QueryableDruidServer queryableDruidServer1 = new QueryableDruidServer(
|
||||||
|
|
Loading…
Reference in New Issue