mirror of https://github.com/apache/druid.git
Add MetricsVerifier to simplify verification of metric values in tests (#13442)
This commit is contained in:
parent
db7c29c6f9
commit
656b6cdf62
|
@ -0,0 +1,98 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.druid.java.util.metrics;
|
||||||
|
|
||||||
|
import org.junit.Assert;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test utility to extract and verify metric values.
|
||||||
|
*/
|
||||||
|
public interface MetricsVerifier
|
||||||
|
{
|
||||||
|
/**
|
||||||
|
* Verifies that no event has been emitted for the given metric.
|
||||||
|
*/
|
||||||
|
default void verifyNotEmitted(String metricName)
|
||||||
|
{
|
||||||
|
verifyEmitted(metricName, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Verifies that the metric was emitted the expected number of times.
|
||||||
|
*/
|
||||||
|
default void verifyEmitted(String metricName, int times)
|
||||||
|
{
|
||||||
|
verifyEmitted(metricName, null, times);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Verifies that the metric was emitted for the given dimension filters the
|
||||||
|
* expected number of times.
|
||||||
|
*/
|
||||||
|
default void verifyEmitted(String metricName, Map<String, Object> dimensionFilters, int times)
|
||||||
|
{
|
||||||
|
Assert.assertEquals(
|
||||||
|
"Metric was emitted unexpected number of times.",
|
||||||
|
times,
|
||||||
|
getMetricValues(metricName, dimensionFilters).size()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Verifies the value of the specified metric emitted in the previous run.
|
||||||
|
*/
|
||||||
|
default void verifyValue(String metricName, Number expectedValue)
|
||||||
|
{
|
||||||
|
verifyValue(metricName, null, expectedValue);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Verifies the value of the event corresponding to the specified metric and
|
||||||
|
* dimensionFilters emitted in the previous run.
|
||||||
|
*/
|
||||||
|
default void verifyValue(String metricName, Map<String, Object> dimensionFilters, Number expectedValue)
|
||||||
|
{
|
||||||
|
Assert.assertEquals(expectedValue, getValue(metricName, dimensionFilters));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Gets the value of the event corresponding to the specified metric and
|
||||||
|
* dimensionFilters.
|
||||||
|
*/
|
||||||
|
default Number getValue(String metricName, Map<String, Object> dimensionFilters)
|
||||||
|
{
|
||||||
|
List<Number> values = getMetricValues(metricName, dimensionFilters);
|
||||||
|
Assert.assertEquals(
|
||||||
|
"Metric must have been emitted exactly once for the given dimensions.",
|
||||||
|
1,
|
||||||
|
values.size()
|
||||||
|
);
|
||||||
|
return values.get(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Gets the metric values for the specified dimension filters.
|
||||||
|
*/
|
||||||
|
List<Number> getMetricValues(String metricName, Map<String, Object> dimensionFilters);
|
||||||
|
|
||||||
|
}
|
|
@ -24,12 +24,15 @@ import org.apache.druid.java.util.emitter.service.ServiceEmitter;
|
||||||
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
|
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
public class StubServiceEmitter extends ServiceEmitter
|
public class StubServiceEmitter extends ServiceEmitter implements MetricsVerifier
|
||||||
{
|
{
|
||||||
private final List<Event> events = new ArrayList<>();
|
private final List<Event> events = new ArrayList<>();
|
||||||
private final List<ServiceMetricEvent> metricEvents = new ArrayList<>();
|
private final Map<String, List<ServiceMetricEvent>> metricEvents = new HashMap<>();
|
||||||
|
|
||||||
public StubServiceEmitter(String service, String host)
|
public StubServiceEmitter(String service, String host)
|
||||||
{
|
{
|
||||||
|
@ -40,7 +43,9 @@ public class StubServiceEmitter extends ServiceEmitter
|
||||||
public void emit(Event event)
|
public void emit(Event event)
|
||||||
{
|
{
|
||||||
if (event instanceof ServiceMetricEvent) {
|
if (event instanceof ServiceMetricEvent) {
|
||||||
metricEvents.add((ServiceMetricEvent) event);
|
ServiceMetricEvent metricEvent = (ServiceMetricEvent) event;
|
||||||
|
metricEvents.computeIfAbsent(metricEvent.getMetric(), name -> new ArrayList<>())
|
||||||
|
.add(metricEvent);
|
||||||
}
|
}
|
||||||
events.add(event);
|
events.add(event);
|
||||||
}
|
}
|
||||||
|
@ -53,12 +58,29 @@ public class StubServiceEmitter extends ServiceEmitter
|
||||||
return events;
|
return events;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
@Override
|
||||||
* Gets all the metric events emitted since the previous {@link #flush()}.
|
public List<Number> getMetricValues(
|
||||||
*/
|
String metricName,
|
||||||
public List<ServiceMetricEvent> getMetricEvents()
|
Map<String, Object> dimensionFilters
|
||||||
|
)
|
||||||
{
|
{
|
||||||
return metricEvents;
|
final List<Number> values = new ArrayList<>();
|
||||||
|
final List<ServiceMetricEvent> events =
|
||||||
|
metricEvents.getOrDefault(metricName, Collections.emptyList());
|
||||||
|
final Map<String, Object> filters =
|
||||||
|
dimensionFilters == null ? Collections.emptyMap() : dimensionFilters;
|
||||||
|
for (ServiceMetricEvent event : events) {
|
||||||
|
final Map<String, Object> userDims = event.getUserDims();
|
||||||
|
boolean match = filters.keySet().stream()
|
||||||
|
.map(d -> filters.get(d).equals(userDims.get(d)))
|
||||||
|
.reduce((a, b) -> a && b)
|
||||||
|
.orElse(true);
|
||||||
|
if (match) {
|
||||||
|
values.add(event.getValue());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return values;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -757,8 +757,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
|
||||||
query,
|
query,
|
||||||
serviceEmitter
|
serviceEmitter
|
||||||
);
|
);
|
||||||
Assert.assertEquals(1, serviceEmitter.getEvents().size());
|
serviceEmitter.verifyEmitted("query/wait/time", ImmutableMap.of("vectorized", vectorize), 1);
|
||||||
Assert.assertEquals(vectorize, serviceEmitter.getEvents().get(0).toMap().getOrDefault("vectorized", null));
|
|
||||||
TestHelper.assertExpectedObjects(expectedResults, results, "groupBy");
|
TestHelper.assertExpectedObjects(expectedResults, results, "groupBy");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -230,8 +230,7 @@ public class ScanQueryRunnerTest extends InitializedNullHandlingTest
|
||||||
0,
|
0,
|
||||||
3
|
3
|
||||||
);
|
);
|
||||||
Assert.assertEquals(1, stubServiceEmitter.getEvents().size());
|
stubServiceEmitter.verifyEmitted("query/wait/time", ImmutableMap.of("vectorized", false), 1);
|
||||||
Assert.assertEquals(false, stubServiceEmitter.getEvents().get(0).toMap().getOrDefault("vectorized", null));
|
|
||||||
verify(expectedResults, populateNullColumnAtLastForQueryableIndexCase(results, "null_column"));
|
verify(expectedResults, populateNullColumnAtLastForQueryableIndexCase(results, "null_column"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -317,11 +317,7 @@ public class TimeseriesQueryRunnerTest extends InitializedNullHandlingTest
|
||||||
++count;
|
++count;
|
||||||
}
|
}
|
||||||
|
|
||||||
Assert.assertEquals(1, stubServiceEmitter.getEvents().size());
|
stubServiceEmitter.verifyEmitted("query/wait/time", ImmutableMap.of("vectorized", vectorize), 1);
|
||||||
Assert.assertEquals(
|
|
||||||
vectorize,
|
|
||||||
stubServiceEmitter.getEvents().get(0).toMap().getOrDefault("vectorized", null)
|
|
||||||
);
|
|
||||||
Assert.assertEquals(lastResult.toString(), expectedLast, lastResult.getTimestamp());
|
Assert.assertEquals(lastResult.toString(), expectedLast, lastResult.getTimestamp());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -76,8 +76,8 @@ public class BalancingStrategiesTest extends CoordinatorSimulationBaseTest
|
||||||
runCoordinatorCycle();
|
runCoordinatorCycle();
|
||||||
loadQueuedSegments();
|
loadQueuedSegments();
|
||||||
verifyValue(Metric.ASSIGNED_COUNT, 1000L);
|
verifyValue(Metric.ASSIGNED_COUNT, 1000L);
|
||||||
verifyNoEvent(Metric.MOVED_COUNT);
|
verifyNotEmitted(Metric.MOVED_COUNT);
|
||||||
verifyNoEvent(Metric.UNMOVED_COUNT);
|
verifyNotEmitted(Metric.UNMOVED_COUNT);
|
||||||
|
|
||||||
for (DruidServer historical : historicals) {
|
for (DruidServer historical : historicals) {
|
||||||
Assert.assertEquals(200, historical.getTotalSegments());
|
Assert.assertEquals(200, historical.getTotalSegments());
|
||||||
|
@ -113,8 +113,8 @@ public class BalancingStrategiesTest extends CoordinatorSimulationBaseTest
|
||||||
runCoordinatorCycle();
|
runCoordinatorCycle();
|
||||||
loadQueuedSegments();
|
loadQueuedSegments();
|
||||||
verifyValue(Metric.ASSIGNED_COUNT, 1000L);
|
verifyValue(Metric.ASSIGNED_COUNT, 1000L);
|
||||||
verifyNoEvent(Metric.MOVED_COUNT);
|
verifyNotEmitted(Metric.MOVED_COUNT);
|
||||||
verifyNoEvent(Metric.UNMOVED_COUNT);
|
verifyNotEmitted(Metric.UNMOVED_COUNT);
|
||||||
|
|
||||||
// Verify that each server is equally loaded
|
// Verify that each server is equally loaded
|
||||||
for (DruidServer historical : historicals) {
|
for (DruidServer historical : historicals) {
|
||||||
|
@ -161,8 +161,8 @@ public class BalancingStrategiesTest extends CoordinatorSimulationBaseTest
|
||||||
runCoordinatorCycle();
|
runCoordinatorCycle();
|
||||||
loadQueuedSegments();
|
loadQueuedSegments();
|
||||||
verifyValue(Metric.ASSIGNED_COUNT, 1000L);
|
verifyValue(Metric.ASSIGNED_COUNT, 1000L);
|
||||||
verifyNoEvent(Metric.MOVED_COUNT);
|
verifyNotEmitted(Metric.MOVED_COUNT);
|
||||||
verifyNoEvent(Metric.UNMOVED_COUNT);
|
verifyNotEmitted(Metric.UNMOVED_COUNT);
|
||||||
|
|
||||||
// Verify that each server is equally loaded
|
// Verify that each server is equally loaded
|
||||||
for (DruidServer historical : historicals) {
|
for (DruidServer historical : historicals) {
|
||||||
|
|
|
@ -20,7 +20,7 @@
|
||||||
package org.apache.druid.server.coordinator.simulate;
|
package org.apache.druid.server.coordinator.simulate;
|
||||||
|
|
||||||
import org.apache.druid.client.DruidServer;
|
import org.apache.druid.client.DruidServer;
|
||||||
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
|
import org.apache.druid.java.util.metrics.MetricsVerifier;
|
||||||
import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
|
import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
|
||||||
import org.apache.druid.timeline.DataSegment;
|
import org.apache.druid.timeline.DataSegment;
|
||||||
|
|
||||||
|
@ -81,9 +81,10 @@ public interface CoordinatorSimulation
|
||||||
DruidServer getInventoryView(String serverName);
|
DruidServer getInventoryView(String serverName);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the metric events emitted in the previous coordinator run.
|
* Returns a MetricsVerifier which can be used to extract and verify the
|
||||||
|
* metric values emitted in the previous coordinator run.
|
||||||
*/
|
*/
|
||||||
List<ServiceMetricEvent> getMetricEvents();
|
MetricsVerifier getMetricsVerifier();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Gets the load percentage of the specified datasource as seen by the coordinator.
|
* Gets the load percentage of the specified datasource as seen by the coordinator.
|
||||||
|
|
|
@ -21,7 +21,7 @@ package org.apache.druid.server.coordinator.simulate;
|
||||||
|
|
||||||
import org.apache.druid.client.DruidServer;
|
import org.apache.druid.client.DruidServer;
|
||||||
import org.apache.druid.java.util.common.granularity.Granularities;
|
import org.apache.druid.java.util.common.granularity.Granularities;
|
||||||
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
|
import org.apache.druid.java.util.metrics.MetricsVerifier;
|
||||||
import org.apache.druid.server.coordination.ServerType;
|
import org.apache.druid.server.coordination.ServerType;
|
||||||
import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
|
import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
|
||||||
import org.apache.druid.server.coordinator.CreateDataSegments;
|
import org.apache.druid.server.coordinator.CreateDataSegments;
|
||||||
|
@ -32,8 +32,6 @@ import org.junit.After;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -51,13 +49,15 @@ import java.util.Map;
|
||||||
* leading to flakiness in the tests. The simulation sets this field to true by
|
* leading to flakiness in the tests. The simulation sets this field to true by
|
||||||
* default.
|
* default.
|
||||||
*/
|
*/
|
||||||
public abstract class CoordinatorSimulationBaseTest
|
public abstract class CoordinatorSimulationBaseTest implements
|
||||||
implements CoordinatorSimulation.CoordinatorState, CoordinatorSimulation.ClusterState
|
CoordinatorSimulation.CoordinatorState,
|
||||||
|
CoordinatorSimulation.ClusterState,
|
||||||
|
MetricsVerifier
|
||||||
{
|
{
|
||||||
static final double DOUBLE_DELTA = 10e-9;
|
static final double DOUBLE_DELTA = 10e-9;
|
||||||
|
|
||||||
private CoordinatorSimulation sim;
|
private CoordinatorSimulation sim;
|
||||||
private final Map<String, List<ServiceMetricEvent>> latestMetricEvents = new HashMap<>();
|
private MetricsVerifier metricsVerifier;
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public abstract void setUp();
|
public abstract void setUp();
|
||||||
|
@ -78,25 +78,19 @@ public abstract class CoordinatorSimulationBaseTest
|
||||||
{
|
{
|
||||||
this.sim = simulation;
|
this.sim = simulation;
|
||||||
simulation.start();
|
simulation.start();
|
||||||
|
this.metricsVerifier = this.sim.coordinator().getMetricsVerifier();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void runCoordinatorCycle()
|
public void runCoordinatorCycle()
|
||||||
{
|
{
|
||||||
latestMetricEvents.clear();
|
|
||||||
sim.coordinator().runCoordinatorCycle();
|
sim.coordinator().runCoordinatorCycle();
|
||||||
|
|
||||||
// Extract the metric values of this run
|
|
||||||
for (ServiceMetricEvent event : sim.coordinator().getMetricEvents()) {
|
|
||||||
latestMetricEvents.computeIfAbsent(event.getMetric(), m -> new ArrayList<>())
|
|
||||||
.add(event);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<ServiceMetricEvent> getMetricEvents()
|
public MetricsVerifier getMetricsVerifier()
|
||||||
{
|
{
|
||||||
return sim.coordinator().getMetricEvents();
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -153,61 +147,13 @@ public abstract class CoordinatorSimulationBaseTest
|
||||||
Assert.assertEquals(100.0, getLoadPercentage(datasource), DOUBLE_DELTA);
|
Assert.assertEquals(100.0, getLoadPercentage(datasource), DOUBLE_DELTA);
|
||||||
}
|
}
|
||||||
|
|
||||||
void verifyNoEvent(String metricName)
|
@Override
|
||||||
|
public List<Number> getMetricValues(
|
||||||
|
String metricName,
|
||||||
|
Map<String, Object> dimensionFilters
|
||||||
|
)
|
||||||
{
|
{
|
||||||
Assert.assertTrue(getMetricValues(metricName, null).isEmpty());
|
return metricsVerifier.getMetricValues(metricName, dimensionFilters);
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Verifies the value of the specified metric emitted in the previous run.
|
|
||||||
*/
|
|
||||||
void verifyValue(String metricName, Number expectedValue)
|
|
||||||
{
|
|
||||||
verifyValue(metricName, null, expectedValue);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Verifies the value of the event corresponding to the specified metric and
|
|
||||||
* dimensionFilters emitted in the previous run.
|
|
||||||
*/
|
|
||||||
void verifyValue(String metricName, Map<String, String> dimensionFilters, Number expectedValue)
|
|
||||||
{
|
|
||||||
Assert.assertEquals(expectedValue, getValue(metricName, dimensionFilters));
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Gets the value of the event corresponding to the specified metric and
|
|
||||||
* dimensionFilters emitted in the previous run.
|
|
||||||
*/
|
|
||||||
Number getValue(String metricName, Map<String, String> dimensionFilters)
|
|
||||||
{
|
|
||||||
List<Number> values = getMetricValues(metricName, dimensionFilters);
|
|
||||||
Assert.assertEquals(
|
|
||||||
"Metric must have been emitted exactly once for the given dimensions.",
|
|
||||||
1,
|
|
||||||
values.size()
|
|
||||||
);
|
|
||||||
return values.get(0);
|
|
||||||
}
|
|
||||||
|
|
||||||
private List<Number> getMetricValues(String metricName, Map<String, String> dimensionFilters)
|
|
||||||
{
|
|
||||||
final List<Number> values = new ArrayList<>();
|
|
||||||
final List<ServiceMetricEvent> events = latestMetricEvents.getOrDefault(metricName, Collections.emptyList());
|
|
||||||
final Map<String, String> filters = dimensionFilters == null
|
|
||||||
? Collections.emptyMap() : dimensionFilters;
|
|
||||||
for (ServiceMetricEvent event : events) {
|
|
||||||
final Map<String, Object> userDims = event.getUserDims();
|
|
||||||
boolean match = filters.keySet().stream()
|
|
||||||
.map(d -> filters.get(d).equals(userDims.get(d)))
|
|
||||||
.reduce((a, b) -> a && b)
|
|
||||||
.orElse(true);
|
|
||||||
if (match) {
|
|
||||||
values.add(event.getValue());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return values;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Utility methods
|
// Utility methods
|
||||||
|
@ -237,13 +183,13 @@ public abstract class CoordinatorSimulationBaseTest
|
||||||
/**
|
/**
|
||||||
* Creates a map containing dimension key-values to filter out metric events.
|
* Creates a map containing dimension key-values to filter out metric events.
|
||||||
*/
|
*/
|
||||||
static Map<String, String> filter(String... dimensionValues)
|
static Map<String, Object> filter(String... dimensionValues)
|
||||||
{
|
{
|
||||||
if (dimensionValues.length < 2 || dimensionValues.length % 2 == 1) {
|
if (dimensionValues.length < 2 || dimensionValues.length % 2 == 1) {
|
||||||
throw new IllegalArgumentException("Dimension key-values must be specified in pairs.");
|
throw new IllegalArgumentException("Dimension key-values must be specified in pairs.");
|
||||||
}
|
}
|
||||||
|
|
||||||
final Map<String, String> filters = new HashMap<>();
|
final Map<String, Object> filters = new HashMap<>();
|
||||||
for (int i = 0; i < dimensionValues.length; ) {
|
for (int i = 0; i < dimensionValues.length; ) {
|
||||||
filters.put(dimensionValues[i], dimensionValues[i + 1]);
|
filters.put(dimensionValues[i], dimensionValues[i + 1]);
|
||||||
i += 2;
|
i += 2;
|
||||||
|
|
|
@ -32,8 +32,8 @@ import org.apache.druid.java.util.common.concurrent.DirectExecutorService;
|
||||||
import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory;
|
import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory;
|
||||||
import org.apache.druid.java.util.common.lifecycle.Lifecycle;
|
import org.apache.druid.java.util.common.lifecycle.Lifecycle;
|
||||||
import org.apache.druid.java.util.emitter.EmittingLogger;
|
import org.apache.druid.java.util.emitter.EmittingLogger;
|
||||||
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
|
|
||||||
import org.apache.druid.java.util.http.client.HttpClient;
|
import org.apache.druid.java.util.http.client.HttpClient;
|
||||||
|
import org.apache.druid.java.util.metrics.MetricsVerifier;
|
||||||
import org.apache.druid.java.util.metrics.StubServiceEmitter;
|
import org.apache.druid.java.util.metrics.StubServiceEmitter;
|
||||||
import org.apache.druid.server.coordinator.BalancerStrategyFactory;
|
import org.apache.druid.server.coordinator.BalancerStrategyFactory;
|
||||||
import org.apache.druid.server.coordinator.CachingCostBalancerStrategyConfig;
|
import org.apache.druid.server.coordinator.CachingCostBalancerStrategyConfig;
|
||||||
|
@ -392,9 +392,9 @@ public class CoordinatorSimulationBuilder
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<ServiceMetricEvent> getMetricEvents()
|
public MetricsVerifier getMetricsVerifier()
|
||||||
{
|
{
|
||||||
return new ArrayList<>(env.serviceEmitter.getMetricEvents());
|
return env.serviceEmitter;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -148,7 +148,7 @@ public class SegmentLoadingTest extends CoordinatorSimulationBaseTest
|
||||||
startSimulation(sim);
|
startSimulation(sim);
|
||||||
runCoordinatorCycle();
|
runCoordinatorCycle();
|
||||||
|
|
||||||
verifyNoEvent(Metric.DROPPED_COUNT);
|
verifyNotEmitted(Metric.DROPPED_COUNT);
|
||||||
int totalAssignedInRun1
|
int totalAssignedInRun1
|
||||||
= getValue(Metric.ASSIGNED_COUNT, filter(DruidMetrics.TIER, Tier.T2)).intValue()
|
= getValue(Metric.ASSIGNED_COUNT, filter(DruidMetrics.TIER, Tier.T2)).intValue()
|
||||||
+ getValue(Metric.ASSIGNED_COUNT, filter(DruidMetrics.TIER, Tier.T3)).intValue();
|
+ getValue(Metric.ASSIGNED_COUNT, filter(DruidMetrics.TIER, Tier.T3)).intValue();
|
||||||
|
@ -158,7 +158,7 @@ public class SegmentLoadingTest extends CoordinatorSimulationBaseTest
|
||||||
runCoordinatorCycle();
|
runCoordinatorCycle();
|
||||||
loadQueuedSegments();
|
loadQueuedSegments();
|
||||||
|
|
||||||
verifyNoEvent(Metric.DROPPED_COUNT);
|
verifyNotEmitted(Metric.DROPPED_COUNT);
|
||||||
int totalLoadedAfterRun2
|
int totalLoadedAfterRun2
|
||||||
= historicalT21.getTotalSegments() + historicalT22.getTotalSegments()
|
= historicalT21.getTotalSegments() + historicalT22.getTotalSegments()
|
||||||
+ historicalT31.getTotalSegments() + historicalT32.getTotalSegments();
|
+ historicalT31.getTotalSegments() + historicalT32.getTotalSegments();
|
||||||
|
|
|
@ -75,15 +75,10 @@ public class TaskCountStatsMonitorTest
|
||||||
final StubServiceEmitter emitter = new StubServiceEmitter("service", "host");
|
final StubServiceEmitter emitter = new StubServiceEmitter("service", "host");
|
||||||
monitor.doMonitor(emitter);
|
monitor.doMonitor(emitter);
|
||||||
Assert.assertEquals(5, emitter.getEvents().size());
|
Assert.assertEquals(5, emitter.getEvents().size());
|
||||||
Assert.assertEquals("task/success/count", emitter.getEvents().get(0).toMap().get("metric"));
|
emitter.verifyValue("task/success/count", 1L);
|
||||||
Assert.assertEquals(1L, emitter.getEvents().get(0).toMap().get("value"));
|
emitter.verifyValue("task/failed/count", 1L);
|
||||||
Assert.assertEquals("task/failed/count", emitter.getEvents().get(1).toMap().get("metric"));
|
emitter.verifyValue("task/running/count", 1L);
|
||||||
Assert.assertEquals(1L, emitter.getEvents().get(1).toMap().get("value"));
|
emitter.verifyValue("task/pending/count", 1L);
|
||||||
Assert.assertEquals("task/running/count", emitter.getEvents().get(2).toMap().get("metric"));
|
emitter.verifyValue("task/waiting/count", 1L);
|
||||||
Assert.assertEquals(1L, emitter.getEvents().get(2).toMap().get("value"));
|
|
||||||
Assert.assertEquals("task/pending/count", emitter.getEvents().get(3).toMap().get("metric"));
|
|
||||||
Assert.assertEquals(1L, emitter.getEvents().get(3).toMap().get("value"));
|
|
||||||
Assert.assertEquals("task/waiting/count", emitter.getEvents().get(4).toMap().get("metric"));
|
|
||||||
Assert.assertEquals(1L, emitter.getEvents().get(4).toMap().get("value"));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -75,15 +75,10 @@ public class TaskSlotCountStatsMonitorTest
|
||||||
final StubServiceEmitter emitter = new StubServiceEmitter("service", "host");
|
final StubServiceEmitter emitter = new StubServiceEmitter("service", "host");
|
||||||
monitor.doMonitor(emitter);
|
monitor.doMonitor(emitter);
|
||||||
Assert.assertEquals(5, emitter.getEvents().size());
|
Assert.assertEquals(5, emitter.getEvents().size());
|
||||||
Assert.assertEquals("taskSlot/total/count", emitter.getEvents().get(0).toMap().get("metric"));
|
emitter.verifyValue("taskSlot/total/count", 1L);
|
||||||
Assert.assertEquals(1L, emitter.getEvents().get(0).toMap().get("value"));
|
emitter.verifyValue("taskSlot/idle/count", 1L);
|
||||||
Assert.assertEquals("taskSlot/idle/count", emitter.getEvents().get(1).toMap().get("metric"));
|
emitter.verifyValue("taskSlot/used/count", 1L);
|
||||||
Assert.assertEquals(1L, emitter.getEvents().get(1).toMap().get("value"));
|
emitter.verifyValue("taskSlot/lazy/count", 1L);
|
||||||
Assert.assertEquals("taskSlot/used/count", emitter.getEvents().get(2).toMap().get("metric"));
|
emitter.verifyValue("taskSlot/blacklisted/count", 1L);
|
||||||
Assert.assertEquals(1L, emitter.getEvents().get(2).toMap().get("value"));
|
|
||||||
Assert.assertEquals("taskSlot/lazy/count", emitter.getEvents().get(3).toMap().get("metric"));
|
|
||||||
Assert.assertEquals(1L, emitter.getEvents().get(3).toMap().get("value"));
|
|
||||||
Assert.assertEquals("taskSlot/blacklisted/count", emitter.getEvents().get(4).toMap().get("metric"));
|
|
||||||
Assert.assertEquals(1L, emitter.getEvents().get(4).toMap().get("value"));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,10 +20,10 @@
|
||||||
package org.apache.druid.server.metrics;
|
package org.apache.druid.server.metrics;
|
||||||
|
|
||||||
import com.google.common.collect.ImmutableList;
|
import com.google.common.collect.ImmutableList;
|
||||||
|
import com.google.common.collect.ImmutableMap;
|
||||||
import com.google.common.collect.ImmutableSet;
|
import com.google.common.collect.ImmutableSet;
|
||||||
import com.google.inject.Guice;
|
import com.google.inject.Guice;
|
||||||
import com.google.inject.Injector;
|
import com.google.inject.Injector;
|
||||||
import com.google.inject.Module;
|
|
||||||
import org.apache.druid.discovery.NodeRole;
|
import org.apache.druid.discovery.NodeRole;
|
||||||
import org.apache.druid.java.util.metrics.StubServiceEmitter;
|
import org.apache.druid.java.util.metrics.StubServiceEmitter;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
|
@ -143,24 +143,18 @@ public class WorkerTaskCountStatsMonitorTest
|
||||||
|
|
||||||
injectorForMiddleManager = Guice.createInjector(
|
injectorForMiddleManager = Guice.createInjector(
|
||||||
ImmutableList.of(
|
ImmutableList.of(
|
||||||
(Module) binder -> {
|
binder -> binder.bind(WorkerTaskCountStatsProvider.class).toInstance(statsProvider)
|
||||||
binder.bind(WorkerTaskCountStatsProvider.class).toInstance(statsProvider);
|
|
||||||
}
|
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
|
|
||||||
injectorForMiddleManagerNullStats = Guice.createInjector(
|
injectorForMiddleManagerNullStats = Guice.createInjector(
|
||||||
ImmutableList.of(
|
ImmutableList.of(
|
||||||
(Module) binder -> {
|
binder -> binder.bind(WorkerTaskCountStatsProvider.class).toInstance(nullStatsProvider)
|
||||||
binder.bind(WorkerTaskCountStatsProvider.class).toInstance(nullStatsProvider);
|
|
||||||
}
|
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
|
|
||||||
injectorForPeon = Guice.createInjector(
|
injectorForPeon = Guice.createInjector(
|
||||||
ImmutableList.of(
|
ImmutableList.of(binder -> {})
|
||||||
(Module) binder -> {}
|
|
||||||
)
|
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -172,26 +166,31 @@ public class WorkerTaskCountStatsMonitorTest
|
||||||
final StubServiceEmitter emitter = new StubServiceEmitter("service", "host");
|
final StubServiceEmitter emitter = new StubServiceEmitter("service", "host");
|
||||||
monitor.doMonitor(emitter);
|
monitor.doMonitor(emitter);
|
||||||
Assert.assertEquals(5, emitter.getEvents().size());
|
Assert.assertEquals(5, emitter.getEvents().size());
|
||||||
Assert.assertEquals("worker/task/failed/count", emitter.getEvents().get(0).toMap().get("metric"));
|
emitter.verifyValue(
|
||||||
Assert.assertEquals("workerCategory", emitter.getEvents().get(0).toMap().get("category"));
|
"worker/task/failed/count",
|
||||||
Assert.assertEquals("workerVersion", emitter.getEvents().get(0).toMap().get("workerVersion"));
|
ImmutableMap.of("category", "workerCategory", "workerVersion", "workerVersion"),
|
||||||
Assert.assertEquals(4L, emitter.getEvents().get(0).toMap().get("value"));
|
4L
|
||||||
Assert.assertEquals("worker/task/success/count", emitter.getEvents().get(1).toMap().get("metric"));
|
);
|
||||||
Assert.assertEquals("workerCategory", emitter.getEvents().get(1).toMap().get("category"));
|
emitter.verifyValue(
|
||||||
Assert.assertEquals("workerVersion", emitter.getEvents().get(1).toMap().get("workerVersion"));
|
"worker/task/success/count",
|
||||||
Assert.assertEquals(2L, emitter.getEvents().get(1).toMap().get("value"));
|
ImmutableMap.of("category", "workerCategory", "workerVersion", "workerVersion"),
|
||||||
Assert.assertEquals("worker/taskSlot/idle/count", emitter.getEvents().get(2).toMap().get("metric"));
|
2L
|
||||||
Assert.assertEquals("workerCategory", emitter.getEvents().get(2).toMap().get("category"));
|
);
|
||||||
Assert.assertEquals("workerVersion", emitter.getEvents().get(2).toMap().get("workerVersion"));
|
emitter.verifyValue(
|
||||||
Assert.assertEquals(3L, emitter.getEvents().get(2).toMap().get("value"));
|
"worker/taskSlot/idle/count",
|
||||||
Assert.assertEquals("worker/taskSlot/total/count", emitter.getEvents().get(3).toMap().get("metric"));
|
ImmutableMap.of("category", "workerCategory", "workerVersion", "workerVersion"),
|
||||||
Assert.assertEquals("workerCategory", emitter.getEvents().get(3).toMap().get("category"));
|
3L
|
||||||
Assert.assertEquals("workerVersion", emitter.getEvents().get(3).toMap().get("workerVersion"));
|
);
|
||||||
Assert.assertEquals(5L, emitter.getEvents().get(3).toMap().get("value"));
|
emitter.verifyValue(
|
||||||
Assert.assertEquals("worker/taskSlot/used/count", emitter.getEvents().get(4).toMap().get("metric"));
|
"worker/taskSlot/total/count",
|
||||||
Assert.assertEquals("workerCategory", emitter.getEvents().get(4).toMap().get("category"));
|
ImmutableMap.of("category", "workerCategory", "workerVersion", "workerVersion"),
|
||||||
Assert.assertEquals("workerVersion", emitter.getEvents().get(4).toMap().get("workerVersion"));
|
5L
|
||||||
Assert.assertEquals(1L, emitter.getEvents().get(4).toMap().get("value"));
|
);
|
||||||
|
emitter.verifyValue(
|
||||||
|
"worker/taskSlot/used/count",
|
||||||
|
ImmutableMap.of("category", "workerCategory", "workerVersion", "workerVersion"),
|
||||||
|
1L
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
|
@ -425,15 +425,9 @@ public class SqlResourceTest extends CalciteTestBase
|
||||||
);
|
);
|
||||||
checkSqlRequestLog(true);
|
checkSqlRequestLog(true);
|
||||||
Assert.assertTrue(lifecycleManager.getAll("id").isEmpty());
|
Assert.assertTrue(lifecycleManager.getAll("id").isEmpty());
|
||||||
Set<String> metricNames = ImmutableSet.of("sqlQuery/time", "sqlQuery/bytes", "sqlQuery/planningTimeMs");
|
stubServiceEmitter.verifyEmitted("sqlQuery/time", 1);
|
||||||
Assert.assertEquals(3, stubServiceEmitter.getEvents().size());
|
stubServiceEmitter.verifyValue("sqlQuery/bytes", 27L);
|
||||||
for (String metricName : metricNames) {
|
stubServiceEmitter.verifyEmitted("sqlQuery/planningTimeMs", 1);
|
||||||
Assert.assertTrue(
|
|
||||||
stubServiceEmitter.getEvents()
|
|
||||||
.stream()
|
|
||||||
.anyMatch(event -> event.toMap().containsValue(metricName))
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue