mirror of https://github.com/apache/druid.git
add node time metrics to DirectDruidClient
This commit is contained in:
parent
f883ff2dab
commit
28fa1642b9
|
@ -18,6 +18,7 @@
|
|||
package io.druid.query;
|
||||
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.base.Strings;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.metamx.common.guava.Accumulator;
|
||||
|
@ -103,11 +104,7 @@ public class MetricsEmittingQueryRunner<T> implements QueryRunner<T>
|
|||
builder.setDimension(userDimension.getKey(), userDimension.getValue());
|
||||
}
|
||||
|
||||
String queryId = query.getId();
|
||||
if (queryId == null) {
|
||||
queryId = "";
|
||||
}
|
||||
builder.setDimension(DruidMetrics.ID, queryId);
|
||||
builder.setDimension(DruidMetrics.ID, Strings.nullToEmpty(query.getId()));
|
||||
|
||||
return new Sequence<T>()
|
||||
{
|
||||
|
|
|
@ -24,6 +24,7 @@ import com.google.common.collect.Maps;
|
|||
import com.google.common.collect.Ordering;
|
||||
import com.google.inject.Inject;
|
||||
import com.metamx.common.logger.Logger;
|
||||
import com.metamx.emitter.service.ServiceEmitter;
|
||||
import com.metamx.http.client.HttpClient;
|
||||
import io.druid.client.selector.QueryableDruidServer;
|
||||
import io.druid.client.selector.ServerSelector;
|
||||
|
@ -67,6 +68,7 @@ public class BrokerServerView implements TimelineServerView
|
|||
private final HttpClient httpClient;
|
||||
private final ServerInventoryView baseView;
|
||||
private final TierSelectorStrategy tierSelectorStrategy;
|
||||
private final ServiceEmitter emitter;
|
||||
|
||||
private volatile boolean initialized = false;
|
||||
|
||||
|
@ -77,7 +79,8 @@ public class BrokerServerView implements TimelineServerView
|
|||
@Smile ObjectMapper smileMapper,
|
||||
@Client HttpClient httpClient,
|
||||
ServerInventoryView baseView,
|
||||
TierSelectorStrategy tierSelectorStrategy
|
||||
TierSelectorStrategy tierSelectorStrategy,
|
||||
ServiceEmitter emitter
|
||||
)
|
||||
{
|
||||
this.warehouse = warehouse;
|
||||
|
@ -86,6 +89,7 @@ public class BrokerServerView implements TimelineServerView
|
|||
this.httpClient = httpClient;
|
||||
this.baseView = baseView;
|
||||
this.tierSelectorStrategy = tierSelectorStrategy;
|
||||
this.emitter = emitter;
|
||||
|
||||
this.clients = Maps.newConcurrentMap();
|
||||
this.selectors = Maps.newHashMap();
|
||||
|
@ -173,7 +177,7 @@ public class BrokerServerView implements TimelineServerView
|
|||
|
||||
private DirectDruidClient makeDirectClient(DruidServer server)
|
||||
{
|
||||
return new DirectDruidClient(warehouse, queryWatcher, smileMapper, httpClient, server.getHost());
|
||||
return new DirectDruidClient(warehouse, queryWatcher, smileMapper, httpClient, server.getHost(), emitter);
|
||||
}
|
||||
|
||||
private QueryableDruidServer removeServer(DruidServer server)
|
||||
|
|
|
@ -27,6 +27,7 @@ import com.fasterxml.jackson.databind.type.TypeFactory;
|
|||
import com.fasterxml.jackson.dataformat.smile.SmileFactory;
|
||||
import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
|
||||
import com.google.common.base.Charsets;
|
||||
import com.google.common.base.Strings;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.io.ByteSource;
|
||||
|
@ -41,6 +42,8 @@ import com.metamx.common.guava.CloseQuietly;
|
|||
import com.metamx.common.guava.Sequence;
|
||||
import com.metamx.common.guava.Sequences;
|
||||
import com.metamx.common.logger.Logger;
|
||||
import com.metamx.emitter.service.ServiceEmitter;
|
||||
import com.metamx.emitter.service.ServiceMetricEvent;
|
||||
import com.metamx.http.client.HttpClient;
|
||||
import com.metamx.http.client.Request;
|
||||
import com.metamx.http.client.response.ClientResponse;
|
||||
|
@ -48,6 +51,7 @@ import com.metamx.http.client.response.HttpResponseHandler;
|
|||
import com.metamx.http.client.response.StatusResponseHandler;
|
||||
import com.metamx.http.client.response.StatusResponseHolder;
|
||||
import io.druid.query.BySegmentResultValueClass;
|
||||
import io.druid.query.DruidMetrics;
|
||||
import io.druid.query.Query;
|
||||
import io.druid.query.QueryInterruptedException;
|
||||
import io.druid.query.QueryRunner;
|
||||
|
@ -94,6 +98,7 @@ public class DirectDruidClient<T> implements QueryRunner<T>
|
|||
private final ObjectMapper objectMapper;
|
||||
private final HttpClient httpClient;
|
||||
private final String host;
|
||||
private final ServiceEmitter emitter;
|
||||
|
||||
private final AtomicInteger openConnections;
|
||||
private final boolean isSmile;
|
||||
|
@ -103,7 +108,8 @@ public class DirectDruidClient<T> implements QueryRunner<T>
|
|||
QueryWatcher queryWatcher,
|
||||
ObjectMapper objectMapper,
|
||||
HttpClient httpClient,
|
||||
String host
|
||||
String host,
|
||||
ServiceEmitter emitter
|
||||
)
|
||||
{
|
||||
this.warehouse = warehouse;
|
||||
|
@ -111,6 +117,7 @@ public class DirectDruidClient<T> implements QueryRunner<T>
|
|||
this.objectMapper = objectMapper;
|
||||
this.httpClient = httpClient;
|
||||
this.host = host;
|
||||
this.emitter = emitter;
|
||||
|
||||
this.isSmile = this.objectMapper.getFactory() instanceof SmileFactory;
|
||||
this.openConnections = new AtomicInteger();
|
||||
|
@ -152,9 +159,16 @@ public class DirectDruidClient<T> implements QueryRunner<T>
|
|||
try {
|
||||
log.debug("Querying url[%s]", url);
|
||||
|
||||
final long requestStartTime = System.currentTimeMillis();
|
||||
|
||||
final ServiceMetricEvent.Builder builder = toolChest.makeMetricBuilder(query);
|
||||
builder.setDimension("server", host);
|
||||
builder.setDimension(DruidMetrics.ID, Strings.nullToEmpty(query.getId()));
|
||||
|
||||
|
||||
final HttpResponseHandler<InputStream, InputStream> responseHandler = new HttpResponseHandler<InputStream, InputStream>()
|
||||
{
|
||||
private long startTime;
|
||||
private long responseStartTime;
|
||||
private final AtomicLong byteCount = new AtomicLong(0);
|
||||
private final BlockingQueue<InputStream> queue = new LinkedBlockingQueue<>();
|
||||
private final AtomicBoolean done = new AtomicBoolean(false);
|
||||
|
@ -163,7 +177,9 @@ public class DirectDruidClient<T> implements QueryRunner<T>
|
|||
public ClientResponse<InputStream> handleResponse(HttpResponse response)
|
||||
{
|
||||
log.debug("Initial response from url[%s]", url);
|
||||
startTime = System.currentTimeMillis();
|
||||
responseStartTime = System.currentTimeMillis();
|
||||
emitter.emit(builder.build("query/node/ttfb", responseStartTime - requestStartTime));
|
||||
|
||||
try {
|
||||
final String responseContext = response.headers().get("X-Druid-Response-Context");
|
||||
// context may be null in case of error or query timeout
|
||||
|
@ -256,9 +272,10 @@ public class DirectDruidClient<T> implements QueryRunner<T>
|
|||
"Completed request to url[%s] with %,d bytes returned in %,d millis [%,f b/s].",
|
||||
url,
|
||||
byteCount.get(),
|
||||
stopTime - startTime,
|
||||
byteCount.get() / (0.0001 * (stopTime - startTime))
|
||||
stopTime - responseStartTime,
|
||||
byteCount.get() / (0.0001 * (stopTime - responseStartTime))
|
||||
);
|
||||
emitter.emit(builder.build("query/node/time", stopTime - requestStartTime));
|
||||
synchronized (done) {
|
||||
try {
|
||||
// An empty byte array is put at the end to give the SequenceInputStream.close() as something to close out
|
||||
|
|
|
@ -30,6 +30,7 @@ import com.google.common.collect.ImmutableSet;
|
|||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.metamx.common.Pair;
|
||||
import com.metamx.emitter.service.ServiceEmitter;
|
||||
import com.metamx.http.client.HttpClient;
|
||||
import io.druid.client.selector.HighestPriorityTierSelectorStrategy;
|
||||
import io.druid.client.selector.RandomServerSelectorStrategy;
|
||||
|
@ -41,6 +42,7 @@ import io.druid.query.QueryWatcher;
|
|||
import io.druid.query.TableDataSource;
|
||||
import io.druid.server.coordination.DruidServerMetadata;
|
||||
import io.druid.server.initialization.ZkPathsConfig;
|
||||
import io.druid.server.metrics.NoopServiceEmitter;
|
||||
import io.druid.timeline.DataSegment;
|
||||
import io.druid.timeline.TimelineLookup;
|
||||
import io.druid.timeline.TimelineObjectHolder;
|
||||
|
@ -364,7 +366,8 @@ public class BrokerServerViewTest extends CuratorTestBase
|
|||
getSmileMapper(),
|
||||
EasyMock.createMock(HttpClient.class),
|
||||
baseView,
|
||||
new HighestPriorityTierSelectorStrategy(new RandomServerSelectorStrategy())
|
||||
new HighestPriorityTierSelectorStrategy(new RandomServerSelectorStrategy()),
|
||||
new NoopServiceEmitter()
|
||||
);
|
||||
}
|
||||
|
||||
|
|
|
@ -39,6 +39,7 @@ import io.druid.query.QueryRunnerTestHelper;
|
|||
import io.druid.query.ReflectionQueryToolChestWarehouse;
|
||||
import io.druid.query.Result;
|
||||
import io.druid.query.timeboundary.TimeBoundaryQuery;
|
||||
import io.druid.server.metrics.NoopServiceEmitter;
|
||||
import io.druid.timeline.DataSegment;
|
||||
import io.druid.timeline.partition.NoneShardSpec;
|
||||
import org.easymock.Capture;
|
||||
|
@ -117,14 +118,16 @@ public class DirectDruidClientTest
|
|||
QueryRunnerTestHelper.NOOP_QUERYWATCHER,
|
||||
new DefaultObjectMapper(),
|
||||
httpClient,
|
||||
"foo"
|
||||
"foo",
|
||||
new NoopServiceEmitter()
|
||||
);
|
||||
DirectDruidClient client2 = new DirectDruidClient(
|
||||
new ReflectionQueryToolChestWarehouse(),
|
||||
QueryRunnerTestHelper.NOOP_QUERYWATCHER,
|
||||
new DefaultObjectMapper(),
|
||||
httpClient,
|
||||
"foo2"
|
||||
"foo2",
|
||||
new NoopServiceEmitter()
|
||||
);
|
||||
|
||||
QueryableDruidServer queryableDruidServer1 = new QueryableDruidServer(
|
||||
|
@ -225,7 +228,8 @@ public class DirectDruidClientTest
|
|||
QueryRunnerTestHelper.NOOP_QUERYWATCHER,
|
||||
new DefaultObjectMapper(),
|
||||
httpClient,
|
||||
"foo"
|
||||
"foo",
|
||||
new NoopServiceEmitter()
|
||||
);
|
||||
|
||||
QueryableDruidServer queryableDruidServer1 = new QueryableDruidServer(
|
||||
|
|
Loading…
Reference in New Issue