Add tier based usage metrics for historical nodes to help with autoscaling (#8636)

* Add tier based usage metrics for historical nodes to help with druid historical autoscaling

Add tier based usage metrics for historical nodes to help druid cluster orchestration systems understand the historical node usage and requirements. Following metrics would be helpful -

tier/required/capacity- total capacity in bytes required in each tier. Dimensions - tier
tier/total/capacity - total capacity in bytes available in a given tier. Dimension - tier
tier/historical/count - no. of historical nodes available in each tier. Dimension - tier
tier/replication/factor - configured maximum replication factor in given tier. Dimension - tier

* fix unit test failures
This commit is contained in:
Nishant Bangarwa 2019-10-09 08:25:32 +05:30 committed by Fangjin Yang
parent 18758f5228
commit 0853273091
6 changed files with 97 additions and 1 deletions

View File

@ -209,6 +209,10 @@ These metrics are for the Druid Coordinator and are reset each time the Coordina
|`segment/overShadowed/count`|Number of overShadowed segments.||Varies.|
|`segment/unavailable/count`|Number of segments (not including replicas) left to load until segments that should be loaded in the cluster are available for queries.|datasource.|0|
|`segment/underReplicated/count`|Number of segments (including replicas) left to load until segments that should be loaded in the cluster are available for queries.|tier, datasource.|0|
|`tier/historical/count`|Number of available historical nodes in each tier.|tier.|Varies.|
|`tier/replication/factor`|Configured maximum replication factor in each tier.|tier.|Varies.|
|`tier/required/capacity`|Total capacity in bytes required in each tier.|tier.|Varies.|
|`tier/total/capacity`|Total capacity in bytes available in each tier.|tier.|Varies.|
If `emitBalancingStats` is set to `true` in the Coordinator [dynamic configuration](../configuration/index.html#dynamic-configuration), then [log entries](../configuration/logging.md) for class `org.apache.druid.server.coordinator.helper.DruidCoordinatorLogger` will have extra information on balancing decisions.

View File

@ -120,6 +120,12 @@ public class CoordinatorStats
.addTo(tier, value);
}
public void accumulateMaxTieredStat(final String statName, final String tier, final long value)
{
perTierStats.computeIfAbsent(statName, ignored -> new Object2LongOpenHashMap<>())
.mergeLong(tier, value, Math::max);
}
public void addToDataSourceStat(String statName, String dataSource, long value)
{
perDataSourceStats.computeIfAbsent(statName, k -> new Object2LongOpenHashMap<>())

View File

@ -32,6 +32,7 @@ import org.apache.druid.server.coordinator.DruidCoordinator;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.LoadQueuePeon;
import org.apache.druid.server.coordinator.ServerHolder;
import org.apache.druid.server.coordinator.rules.LoadRule;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.PartitionChunk;
@ -46,6 +47,9 @@ public class DruidCoordinatorLogger implements DruidCoordinatorHelper
{
private static final Logger log = new Logger(DruidCoordinatorLogger.class);
private final DruidCoordinator coordinator;
public static final String TOTAL_CAPACITY = "totalCapacity";
public static final String TOTAL_HISTORICAL_COUNT = "totalHistoricalCount";
public static final String MAX_REPLICATION_FACTOR = "maxReplicationFactor";
public DruidCoordinatorLogger(DruidCoordinator coordinator)
{
@ -66,6 +70,20 @@ public class DruidCoordinatorLogger implements DruidCoordinatorHelper
);
}
private void emitTieredStat(
final ServiceEmitter emitter,
final String metricName,
final String tier,
final long value
)
{
emitter.emit(
new ServiceMetricEvent.Builder()
.setDimension(DruidMetrics.TIER, tier)
.build(metricName, value)
);
}
private void emitTieredStats(
final ServiceEmitter emitter,
final String metricName,
@ -182,9 +200,36 @@ public class DruidCoordinatorLogger implements DruidCoordinatorHelper
log.debug("Segment to drop[%s]", segment);
}
}
stats.addToTieredStat(TOTAL_CAPACITY, server.getTier(), server.getMaxSize());
stats.addToTieredStat(TOTAL_HISTORICAL_COUNT, server.getTier(), 1);
}
}
params.getDatabaseRuleManager()
.getAllRules()
.values()
.forEach(
rules -> rules.forEach(
rule -> {
if (rule instanceof LoadRule) {
((LoadRule) rule).getTieredReplicants()
.forEach(
(tier, replica) -> stats.accumulateMaxTieredStat(
MAX_REPLICATION_FACTOR,
tier,
replica
));
}
}
));
emitTieredStats(emitter, "tier/required/capacity", stats, LoadRule.REQUIRED_CAPACITY);
emitTieredStats(emitter, "tier/total/capacity", stats, TOTAL_CAPACITY);
emitTieredStats(emitter, "tier/replication/factor", stats, MAX_REPLICATION_FACTOR);
emitTieredStats(emitter, "tier/historical/count", stats, TOTAL_HISTORICAL_COUNT);
// Emit coordinator metrics
params
.getLoadManagementPeons()

View File

@ -54,6 +54,7 @@ public abstract class LoadRule implements Rule
private static final EmittingLogger log = new EmittingLogger(LoadRule.class);
static final String ASSIGNED_COUNT = "assignedCount";
static final String DROPPED_COUNT = "droppedCount";
public static final String REQUIRED_CAPACITY = "requiredCapacity";
private final Object2IntMap<String> targetReplicants = new Object2IntOpenHashMap<>();
private final Object2IntMap<String> currentReplicants = new Object2IntOpenHashMap<>();
@ -77,7 +78,9 @@ public abstract class LoadRule implements Rule
assign(params, segment, stats);
drop(params, segment, stats);
for (String tier : targetReplicants.keySet()) {
stats.addToTieredStat(REQUIRED_CAPACITY, tier, segment.getSize() * targetReplicants.getInt(tier));
}
return stats;
}
finally {

View File

@ -133,4 +133,38 @@ public class CoordinatorStatsTest
Assert.assertEquals(1, stats.getTieredStat("stat2", "tier2"));
Assert.assertEquals(1, stats.getTieredStat("stat3", "tier1"));
}
@Test
public void testAccumulateMaxToTieredStat()
{
Assert.assertFalse(stats.hasPerTierStats());
stats.accumulateMaxTieredStat("stat1", "tier1", 2);
stats.accumulateMaxTieredStat("stat1", "tier1", 6);
stats.accumulateMaxTieredStat("stat1", "tier1", 5);
stats.accumulateMaxTieredStat("stat2", "tier1", 5);
stats.accumulateMaxTieredStat("stat2", "tier1", 4);
stats.accumulateMaxTieredStat("stat2", "tier1", 5);
stats.accumulateMaxTieredStat("stat1", "tier2", 7);
stats.accumulateMaxTieredStat("stat1", "tier2", 9);
stats.accumulateMaxTieredStat("stat1", "tier2", 10);
Assert.assertTrue(stats.hasPerTierStats());
Assert.assertEquals(
Sets.newHashSet("tier1", "tier2"),
stats.getTiers("stat1")
);
Assert.assertEquals(
Sets.newHashSet("tier1"),
stats.getTiers("stat2")
);
Assert.assertTrue(stats.getTiers("stat3").isEmpty());
Assert.assertEquals(6, stats.getTieredStat("stat1", "tier1"));
Assert.assertEquals(5, stats.getTieredStat("stat2", "tier1"));
Assert.assertEquals(10, stats.getTieredStat("stat1", "tier2"));
}
}

View File

@ -328,6 +328,8 @@ public class DruidCoordinatorTest extends CuratorTestBase
Rule foreverLoadRule = new ForeverLoadRule(ImmutableMap.of(tier, 2));
EasyMock.expect(metadataRuleManager.getRulesWithDefault(EasyMock.anyString()))
.andReturn(ImmutableList.of(foreverLoadRule)).atLeastOnce();
EasyMock.expect(metadataRuleManager.getAllRules())
.andReturn(ImmutableMap.of(dataSource, ImmutableList.of(foreverLoadRule))).atLeastOnce();
metadataRuleManager.stop();
EasyMock.expectLastCall().once();
@ -481,6 +483,8 @@ public class DruidCoordinatorTest extends CuratorTestBase
EasyMock.expect(metadataRuleManager.getRulesWithDefault(EasyMock.anyString()))
.andReturn(ImmutableList.of(hotTier, coldTier)).atLeastOnce();
EasyMock.expect(metadataRuleManager.getAllRules())
.andReturn(ImmutableMap.of(dataSource, ImmutableList.of(hotTier, coldTier))).atLeastOnce();
EasyMock.expect(serverInventoryView.getInventory())
.andReturn(ImmutableList.of(hotServer, coldServer))