adds new coordinator metrics 'segment/unavailable/count' and (#3176)

'segment/underReplicated/count' (#3173)
This commit is contained in:
michaelschiff 2016-06-23 14:53:15 -07:00 committed by Gian Merlino
parent da660bb592
commit 66d8ad36d7
4 changed files with 37 additions and 2 deletions

View File

@ -142,6 +142,8 @@ These metrics are for the Druid coordinator and are reset each time the coordina
|`segment/size`|Size in bytes of available segments.|dataSource.|Varies.|
|`segment/count`|Number of available segments.|dataSource.|< max|
|`segment/overShadowed/count`|Number of overShadowed segments.||Varies.|
|`segment/unavailable/count`|Number of segments (not including replicas) left to load until segments that should be loaded in the cluster are available for queries.|datasource.|0|
|`segment/underReplicated/count`|Number of segments (including replicas) left to load until segments that should be loaded in the cluster are available for queries.|tier, datasource.|0|
## General Health

View File

@ -48,6 +48,7 @@ public class DruidMetrics
public final static String TASK_STATUS = "taskStatus";
public final static String SERVER = "server";
public final static String TIER = "tier";
public static int findNumComplexAggs(List<AggregatorFactory> aggs)
{

View File

@ -799,7 +799,7 @@ public class DruidCoordinator
new DruidCoordinatorCleanupUnneeded(DruidCoordinator.this),
new DruidCoordinatorCleanupOvershadowed(DruidCoordinator.this),
new DruidCoordinatorBalancer(DruidCoordinator.this),
new DruidCoordinatorLogger()
new DruidCoordinatorLogger(DruidCoordinator.this)
),
startingLeaderCounter
);

View File

@ -30,6 +30,7 @@ import io.druid.collections.CountingMap;
import io.druid.query.DruidMetrics;
import io.druid.server.coordinator.CoordinatorStats;
import io.druid.server.coordinator.DruidCluster;
import io.druid.server.coordinator.DruidCoordinator;
import io.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import io.druid.server.coordinator.LoadQueuePeon;
import io.druid.server.coordinator.ServerHolder;
@ -44,6 +45,11 @@ import java.util.concurrent.atomic.AtomicLong;
public class DruidCoordinatorLogger implements DruidCoordinatorHelper
{
private static final Logger log = new Logger(DruidCoordinatorLogger.class);
private final DruidCoordinator coordinator;
public DruidCoordinatorLogger(DruidCoordinator coordinator) {
this.coordinator = coordinator;
}
private <T extends Number> void emitTieredStats(
final ServiceEmitter emitter,
@ -57,7 +63,7 @@ public class DruidCoordinatorLogger implements DruidCoordinatorHelper
Number value = entry.getValue();
emitter.emit(
new ServiceMetricEvent.Builder()
.setDimension("tier", tier)
.setDimension(DruidMetrics.TIER, tier)
.build(
metricName, value.doubleValue()
)
@ -227,6 +233,31 @@ public class DruidCoordinatorLogger implements DruidCoordinatorHelper
)
);
}
for (Map.Entry<String, AtomicLong> entry : coordinator.getSegmentAvailability().entrySet()) {
String datasource = entry.getKey();
Long count = entry.getValue().get();
emitter.emit(
new ServiceMetricEvent.Builder()
.setDimension(DruidMetrics.DATASOURCE, datasource).build(
"segment/unavailable/count", count
)
);
}
for (Map.Entry<String, CountingMap<String>> entry : coordinator.getReplicationStatus().entrySet()) {
String tier = entry.getKey();
CountingMap<String> datasourceAvailabilities = entry.getValue();
for (Map.Entry<String, AtomicLong> datasourceAvailability : datasourceAvailabilities.entrySet()) {
String datasource = datasourceAvailability.getKey();
Long count = datasourceAvailability.getValue().get();
emitter.emit(
new ServiceMetricEvent.Builder()
.setDimension(DruidMetrics.TIER, tier)
.setDimension(DruidMetrics.DATASOURCE, datasource).build(
"segment/underReplicated/count", count
)
);
}
}
// Emit segment metrics
CountingMap<String> segmentSizes = new CountingMap<String>();
@ -258,6 +289,7 @@ public class DruidCoordinatorLogger implements DruidCoordinatorHelper
);
}
return params;
}
}