From d04521d58f28119687f6887bb92bfb8cdc189840 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Tue, 1 Aug 2023 10:13:55 +0530 Subject: [PATCH] Improve description field when emitting metric for broadcast failure (#14703) Changes: - Emit descriptions such as `Load queue is full`, `No disk space` etc. instead of `Unknown error` - Rewrite `BroadcastDistributionRuleTest` --- .../server/coordinator/ServerHolder.java | 5 + .../loading/StrategicSegmentAssigner.java | 22 +- .../server/coordinator/ServerHolderTest.java | 1 + .../rules/BroadcastDistributionRuleTest.java | 554 ++++++------------ 4 files changed, 212 insertions(+), 370 deletions(-) diff --git a/server/src/main/java/org/apache/druid/server/coordinator/ServerHolder.java b/server/src/main/java/org/apache/druid/server/coordinator/ServerHolder.java index 7947f1c1b32..d55ac035d61 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/ServerHolder.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/ServerHolder.java @@ -196,6 +196,11 @@ public class ServerHolder implements Comparable return isDecommissioning; } + public boolean isLoadQueueFull() + { + return totalAssignmentsInRun >= maxAssignmentsInRun; + } + public long getAvailableSize() { return getMaxSize() - getSizeUsed(); diff --git a/server/src/main/java/org/apache/druid/server/coordinator/loading/StrategicSegmentAssigner.java b/server/src/main/java/org/apache/druid/server/coordinator/loading/StrategicSegmentAssigner.java index 038707a19f4..a1800ae72b0 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/loading/StrategicSegmentAssigner.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/loading/StrategicSegmentAssigner.java @@ -355,7 +355,7 @@ public class StrategicSegmentAssigner implements SegmentActionHandler } /** - * Loads the broadcast segment if it is not loaded on the given server. + * Loads the broadcast segment if it is not already loaded on the given server. * Returns true only if the segment was successfully queued for load on the server. */ private boolean loadBroadcastSegment(DataSegment segment, ServerHolder server) @@ -364,19 +364,21 @@ public class StrategicSegmentAssigner implements SegmentActionHandler return false; } else if (server.isDroppingSegment(segment)) { return server.cancelOperation(SegmentAction.DROP, segment); + } else if (server.canLoadSegment(segment)) { + return loadSegment(segment, server); } - if (server.canLoadSegment(segment) && loadSegment(segment, server)) { - return true; + final String skipReason; + if (server.getAvailableSize() < segment.getSize()) { + skipReason = "Not enough disk space"; + } else if (server.isLoadQueueFull()) { + skipReason = "Load queue is full"; } else { - log.makeAlert("Could not assign broadcast segment for datasource [%s]", segment.getDataSource()) - .addData("segmentId", segment.getId()) - .addData("segmentSize", segment.getSize()) - .addData("hostName", server.getServer().getHost()) - .addData("availableSize", server.getAvailableSize()) - .emit(); - return false; + skipReason = "Unknown error"; } + + incrementSkipStat(Stats.Segments.ASSIGN_SKIPPED, skipReason, segment, server.getServer().getTier()); + return false; } /** diff --git a/server/src/test/java/org/apache/druid/server/coordinator/ServerHolderTest.java b/server/src/test/java/org/apache/druid/server/coordinator/ServerHolderTest.java index fd1fb7c3062..11e36dea183 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/ServerHolderTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/ServerHolderTest.java @@ -195,5 +195,6 @@ public class ServerHolderTest Assert.assertFalse(h1.isServingSegment(SEGMENTS.get(1))); Assert.assertTrue(h1.hasSegmentLoaded(SEGMENTS.get(0).getId())); Assert.assertFalse(h1.hasSegmentLoaded(SEGMENTS.get(1).getId())); + Assert.assertFalse(h1.isLoadQueueFull()); } } diff --git a/server/src/test/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRuleTest.java b/server/src/test/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRuleTest.java index e21d823f346..b57cfc92914 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRuleTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRuleTest.java @@ -21,407 +21,203 @@ package org.apache.druid.server.coordinator.rules; import org.apache.druid.client.DruidServer; import org.apache.druid.java.util.common.DateTimes; -import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.server.coordination.ServerType; +import org.apache.druid.server.coordinator.CreateDataSegments; import org.apache.druid.server.coordinator.DruidCluster; import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams; import org.apache.druid.server.coordinator.ServerHolder; import org.apache.druid.server.coordinator.balancer.RandomBalancerStrategy; +import org.apache.druid.server.coordinator.loading.SegmentAction; import org.apache.druid.server.coordinator.loading.SegmentLoadQueueManager; import org.apache.druid.server.coordinator.loading.StrategicSegmentAssigner; import org.apache.druid.server.coordinator.loading.TestLoadQueuePeon; import org.apache.druid.server.coordinator.stats.CoordinatorRunStats; +import org.apache.druid.server.coordinator.stats.Dimension; +import org.apache.druid.server.coordinator.stats.RowKey; import org.apache.druid.server.coordinator.stats.Stats; import org.apache.druid.timeline.DataSegment; -import org.apache.druid.timeline.partition.NoneShardSpec; import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import java.util.ArrayList; -import java.util.HashMap; import java.util.List; public class BroadcastDistributionRuleTest { - private DruidCluster druidCluster; - private ServerHolder holderOfSmallSegment; - private final List holdersOfLargeSegments = new ArrayList<>(); - private final List holdersOfLargeSegments2 = new ArrayList<>(); - private final List largeSegments = new ArrayList<>(); - private final List largeSegments2 = new ArrayList<>(); - private DataSegment smallSegment; - private DruidCluster secondCluster; - private ServerHolder activeServer; - private ServerHolder decommissioningServer1; - private ServerHolder decommissioningServer2; - private SegmentLoadQueueManager loadQueueManager; + private int serverId = 0; - private static final String DS_SMALL = "small_source"; + private static final String DS_WIKI = "wiki"; private static final String TIER_1 = "tier1"; private static final String TIER_2 = "tier2"; + private final DataSegment wikiSegment + = CreateDataSegments.ofDatasource(DS_WIKI).eachOfSizeInMb(100).get(0); + @Before public void setUp() { - loadQueueManager = new SegmentLoadQueueManager(null, null, null); - smallSegment = new DataSegment( - DS_SMALL, - Intervals.of("0/1000"), - DateTimes.nowUtc().toString(), - new HashMap<>(), - new ArrayList<>(), - new ArrayList<>(), - NoneShardSpec.instance(), - 0, - 0 + serverId = 0; + } + + @Test + public void testSegmentIsBroadcastToAllTiers() + { + // 2 tiers with one server each + final ServerHolder serverT11 = create10gbHistorical(TIER_1); + final ServerHolder serverT21 = create10gbHistorical(TIER_2); + DruidCluster cluster = DruidCluster.builder().add(serverT11).add(serverT21).build(); + DruidCoordinatorRuntimeParams params = makeParamsWithUsedSegments(cluster, wikiSegment); + + ForeverBroadcastDistributionRule rule = new ForeverBroadcastDistributionRule(); + CoordinatorRunStats stats = runRuleOnSegment(rule, wikiSegment, params); + + // Verify that segment is assigned to servers of all tiers + Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED, TIER_1, DS_WIKI)); + Assert.assertTrue(serverT11.isLoadingSegment(wikiSegment)); + + Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED, TIER_2, DS_WIKI)); + Assert.assertTrue(serverT21.isLoadingSegment(wikiSegment)); + } + + @Test + public void testSegmentIsNotBroadcastToServerIfAlreadyLoaded() + { + // serverT11 is already serving the segment which is being broadcast + final ServerHolder serverT11 = create10gbHistorical(TIER_1, wikiSegment); + final ServerHolder serverT12 = create10gbHistorical(TIER_1); + DruidCluster cluster = DruidCluster.builder().add(serverT11).add(serverT12).build(); + DruidCoordinatorRuntimeParams params = makeParamsWithUsedSegments(cluster, wikiSegment); + + ForeverBroadcastDistributionRule rule = new ForeverBroadcastDistributionRule(); + CoordinatorRunStats stats = runRuleOnSegment(rule, wikiSegment, params); + + // Verify that serverT11 is already serving and serverT12 is loading segment + Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED, TIER_1, DS_WIKI)); + Assert.assertFalse(serverT11.isLoadingSegment(wikiSegment)); + Assert.assertTrue(serverT11.isServingSegment(wikiSegment)); + Assert.assertTrue(serverT12.isLoadingSegment(wikiSegment)); + } + + @Test + public void testSegmentIsNotBroadcastToDecommissioningServer() + { + ServerHolder activeServer = create10gbHistorical(TIER_1); + ServerHolder decommissioningServer = createDecommissioningHistorical(TIER_1); + DruidCluster cluster = DruidCluster.builder() + .add(activeServer) + .add(decommissioningServer).build(); + DruidCoordinatorRuntimeParams params = makeParamsWithUsedSegments(cluster, wikiSegment); + + ForeverBroadcastDistributionRule rule = new ForeverBroadcastDistributionRule(); + CoordinatorRunStats stats = runRuleOnSegment(rule, wikiSegment, params); + + Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED, TIER_1, DS_WIKI)); + Assert.assertTrue(activeServer.isLoadingSegment(wikiSegment)); + Assert.assertTrue(decommissioningServer.getLoadingSegments().isEmpty()); + } + + @Test + public void testBroadcastSegmentIsDroppedFromDecommissioningServer() + { + // Both active and decommissioning servers are already serving the segment + ServerHolder activeServer = create10gbHistorical(TIER_1, wikiSegment); + ServerHolder decommissioningServer = createDecommissioningHistorical(TIER_1, wikiSegment); + DruidCluster cluster = DruidCluster.builder() + .add(activeServer) + .add(decommissioningServer) + .build(); + DruidCoordinatorRuntimeParams params = makeParamsWithUsedSegments(cluster, wikiSegment); + + ForeverBroadcastDistributionRule rule = new ForeverBroadcastDistributionRule(); + CoordinatorRunStats stats = runRuleOnSegment(rule, wikiSegment, params); + + // Verify that segment is dropped only from the decommissioning server + Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.DROPPED, TIER_1, DS_WIKI)); + Assert.assertTrue(activeServer.getPeon().getSegmentsToDrop().isEmpty()); + Assert.assertTrue(decommissioningServer.getPeon().getSegmentsToDrop().contains(wikiSegment)); + } + + @Test + public void testSegmentIsBroadcastToAllServerTypes() + { + final ServerHolder broker = new ServerHolder( + create10gbServer(ServerType.BROKER, "broker_tier").toImmutableDruidServer(), + new TestLoadQueuePeon() ); + final ServerHolder indexer = new ServerHolder( + create10gbServer(ServerType.INDEXER_EXECUTOR, TIER_2).toImmutableDruidServer(), + new TestLoadQueuePeon() + ); + final ServerHolder historical = create10gbHistorical(TIER_1); - for (int i = 0; i < 3; i++) { - largeSegments.add( - new DataSegment( - "large_source", - Intervals.of((i * 1000) + "/" + ((i + 1) * 1000)), - DateTimes.nowUtc().toString(), - new HashMap<>(), - new ArrayList<>(), - new ArrayList<>(), - NoneShardSpec.instance(), - 0, - 100 - ) - ); - } + DruidCluster cluster = DruidCluster.builder() + .add(broker).add(indexer).add(historical) + .build(); + DruidCoordinatorRuntimeParams params = makeParamsWithUsedSegments(cluster, wikiSegment); - for (int i = 0; i < 2; i++) { - largeSegments2.add( - new DataSegment( - "large_source2", - Intervals.of((i * 1000) + "/" + ((i + 1) * 1000)), - DateTimes.nowUtc().toString(), - new HashMap<>(), - new ArrayList<>(), - new ArrayList<>(), - NoneShardSpec.instance(), - 0, - 100 - ) - ); - } + ForeverBroadcastDistributionRule rule = new ForeverBroadcastDistributionRule(); + final CoordinatorRunStats stats = runRuleOnSegment(rule, wikiSegment, params); - holderOfSmallSegment = new ServerHolder( - new DruidServer( - "serverHot2", - "hostHot2", - null, - 1000, - ServerType.HISTORICAL, - TIER_1, - 0 - ).addDataSegment(smallSegment) - .toImmutableDruidServer(), + // Verify that segment is assigned to historical, broker as well as indexer + Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED, TIER_1, DS_WIKI)); + Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED, TIER_2, DS_WIKI)); + Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED, broker.getServer().getTier(), DS_WIKI)); + + Assert.assertTrue(historical.isLoadingSegment(wikiSegment)); + Assert.assertTrue(indexer.isLoadingSegment(wikiSegment)); + Assert.assertTrue(broker.isLoadingSegment(wikiSegment)); + } + + @Test + public void testReasonForBroadcastFailure() + { + final ServerHolder eligibleServer = create10gbHistorical(TIER_1); + final ServerHolder serverWithNoDiskSpace = new ServerHolder( + new DruidServer("server1", "server1", null, 0L, ServerType.HISTORICAL, TIER_1, 0) + .toImmutableDruidServer(), new TestLoadQueuePeon() ); - holdersOfLargeSegments.add( - new ServerHolder( - new DruidServer( - "serverHot1", - "hostHot1", - null, - 1000, - ServerType.HISTORICAL, - TIER_1, - 0 - ).addDataSegment(largeSegments.get(0)) - .toImmutableDruidServer(), - new TestLoadQueuePeon() - ) - ); - holdersOfLargeSegments.add( - new ServerHolder( - new DruidServer( - "serverNorm1", - "hostNorm1", - null, - 1000, - ServerType.HISTORICAL, - TIER_2, - 0 - ).addDataSegment(largeSegments.get(1)) - .toImmutableDruidServer(), - new TestLoadQueuePeon() - ) - ); - holdersOfLargeSegments.add( - new ServerHolder( - new DruidServer( - "serverNorm2", - "hostNorm2", - null, - 100, - ServerType.HISTORICAL, - TIER_2, - 0 - ).addDataSegment(largeSegments.get(2)) - .toImmutableDruidServer(), - new TestLoadQueuePeon() - ) + // Create a server with full load queue + final int maxSegmentsInLoadQueue = 5; + final ServerHolder serverWithFullQueue = new ServerHolder( + create10gbServer(ServerType.HISTORICAL, TIER_1).toImmutableDruidServer(), + new TestLoadQueuePeon(), false, maxSegmentsInLoadQueue, 100 ); - holdersOfLargeSegments2.add( - new ServerHolder( - new DruidServer( - "serverHot3", - "hostHot3", - null, - 1000, - ServerType.HISTORICAL, - TIER_1, - 0 - ).addDataSegment(largeSegments2.get(0)) - .toImmutableDruidServer(), - new TestLoadQueuePeon() - ) - ); - holdersOfLargeSegments2.add( - new ServerHolder( - new DruidServer( - "serverNorm3", - "hostNorm3", - null, - 100, - ServerType.HISTORICAL, - TIER_2, - 0 - ).addDataSegment(largeSegments2.get(1)) - .toImmutableDruidServer(), - new TestLoadQueuePeon() - ) - ); + List segmentsInQueue + = CreateDataSegments.ofDatasource("koala") + .forIntervals(maxSegmentsInLoadQueue, Granularities.MONTH) + .withNumPartitions(1) + .eachOfSizeInMb(10); + segmentsInQueue.forEach(s -> serverWithFullQueue.startOperation(SegmentAction.LOAD, s)); + Assert.assertTrue(serverWithFullQueue.isLoadQueueFull()); - activeServer = new ServerHolder( - new DruidServer( - "active", - "host1", - null, - 100, - ServerType.HISTORICAL, - TIER_1, - 0 - ).addDataSegment(largeSegments.get(0)) - .toImmutableDruidServer(), - new TestLoadQueuePeon() - ); + DruidCluster cluster = DruidCluster.builder() + .add(eligibleServer) + .add(serverWithNoDiskSpace) + .add(serverWithFullQueue) + .build(); + DruidCoordinatorRuntimeParams params = makeParamsWithUsedSegments(cluster, wikiSegment); - decommissioningServer1 = new ServerHolder( - new DruidServer( - "decommissioning1", - "host2", - null, - 100, - ServerType.HISTORICAL, - TIER_1, - 0 - ).addDataSegment(smallSegment) - .toImmutableDruidServer(), - new TestLoadQueuePeon(), - true - ); + ForeverBroadcastDistributionRule rule = new ForeverBroadcastDistributionRule(); + final CoordinatorRunStats stats = runRuleOnSegment(rule, wikiSegment, params); - decommissioningServer2 = new ServerHolder( - new DruidServer( - "decommissioning2", - "host3", - null, - 100, - ServerType.HISTORICAL, - TIER_1, - 0 - ).addDataSegment(largeSegments.get(1)) - .toImmutableDruidServer(), - new TestLoadQueuePeon(), - true - ); + // Verify that the segment is broadcast only to the eligible server + Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED, TIER_1, DS_WIKI)); + RowKey metricKey = RowKey.with(Dimension.DATASOURCE, DS_WIKI) + .with(Dimension.TIER, TIER_1) + .and(Dimension.DESCRIPTION, "Not enough disk space"); + Assert.assertEquals(1L, stats.get(Stats.Segments.ASSIGN_SKIPPED, metricKey)); - druidCluster = DruidCluster - .builder() - .addTier( - TIER_1, - holdersOfLargeSegments.get(0), - holderOfSmallSegment, - holdersOfLargeSegments2.get(0) - ) - .addTier( - TIER_2, - holdersOfLargeSegments.get(1), - holdersOfLargeSegments.get(2), - holdersOfLargeSegments2.get(1) - ) - .build(); - - secondCluster = DruidCluster - .builder() - .addTier( - TIER_1, - activeServer, - decommissioningServer1, - decommissioningServer2 - ) - .build(); + metricKey = RowKey.with(Dimension.DATASOURCE, DS_WIKI) + .with(Dimension.TIER, TIER_1) + .and(Dimension.DESCRIPTION, "Load queue is full"); + Assert.assertEquals(1L, stats.get(Stats.Segments.ASSIGN_SKIPPED, metricKey)); } - @Test - public void testBroadcastToSingleDataSource() - { - final ForeverBroadcastDistributionRule rule = - new ForeverBroadcastDistributionRule(); - - CoordinatorRunStats stats = runRuleAndGetStats( - rule, - smallSegment, - makeCoordinartorRuntimeParams( - druidCluster, - smallSegment, - largeSegments.get(0), - largeSegments.get(1), - largeSegments.get(2), - largeSegments2.get(0), - largeSegments2.get(1) - ) - ); - - Assert.assertEquals(2L, stats.getSegmentStat(Stats.Segments.ASSIGNED, TIER_1, DS_SMALL)); - Assert.assertEquals(3L, stats.getSegmentStat(Stats.Segments.ASSIGNED, TIER_2, DS_SMALL)); - - Assert.assertTrue( - holdersOfLargeSegments.stream().allMatch( - holder -> holder.isLoadingSegment(smallSegment) - ) - ); - Assert.assertTrue( - holdersOfLargeSegments2.stream().allMatch( - holder -> holder.isLoadingSegment(smallSegment) - ) - ); - Assert.assertTrue(holderOfSmallSegment.isServingSegment(smallSegment)); - } - - private DruidCoordinatorRuntimeParams makeCoordinartorRuntimeParams( - DruidCluster druidCluster, - DataSegment... usedSegments - ) - { - return DruidCoordinatorRuntimeParams - .newBuilder(DateTimes.nowUtc()) - .withDruidCluster(druidCluster) - .withUsedSegmentsInTest(usedSegments) - .withBalancerStrategy(new RandomBalancerStrategy()) - .withSegmentAssignerUsing(loadQueueManager) - .build(); - } - - /** - * Servers: - * name | segments - * -----------------+-------------- - * active | large segment - * decommissioning1 | small segment - * decommissioning2 | large segment - *

- * After running the rule for the small segment: - * active | large & small segments - * decommissioning1 | - * decommissionint2 | large segment - */ - @Test - public void testBroadcastDecommissioning() - { - final ForeverBroadcastDistributionRule rule = - new ForeverBroadcastDistributionRule(); - - CoordinatorRunStats stats = runRuleAndGetStats( - rule, - smallSegment, - makeCoordinartorRuntimeParams( - secondCluster, - smallSegment, - largeSegments.get(0), - largeSegments.get(1) - ) - ); - - Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED, TIER_1, DS_SMALL)); - Assert.assertEquals(1, activeServer.getPeon().getSegmentsToLoad().size()); - Assert.assertEquals(1, decommissioningServer1.getPeon().getSegmentsToDrop().size()); - Assert.assertEquals(0, decommissioningServer2.getPeon().getSegmentsToLoad().size()); - } - - @Test - public void testBroadcastToMultipleDataSources() - { - final ForeverBroadcastDistributionRule rule = new ForeverBroadcastDistributionRule(); - - CoordinatorRunStats stats = runRuleAndGetStats( - rule, - smallSegment, - makeCoordinartorRuntimeParams( - druidCluster, - smallSegment, - largeSegments.get(0), - largeSegments.get(1), - largeSegments.get(2), - largeSegments2.get(0), - largeSegments2.get(1) - ) - ); - - Assert.assertEquals(2L, stats.getSegmentStat(Stats.Segments.ASSIGNED, TIER_1, DS_SMALL)); - Assert.assertEquals(3L, stats.getSegmentStat(Stats.Segments.ASSIGNED, TIER_2, DS_SMALL)); - - Assert.assertTrue( - holdersOfLargeSegments.stream().allMatch( - holder -> holder.isLoadingSegment(smallSegment) - ) - ); - Assert.assertTrue( - holdersOfLargeSegments2.stream().allMatch( - holder -> holder.isLoadingSegment(smallSegment) - ) - ); - Assert.assertFalse(holderOfSmallSegment.isLoadingSegment(smallSegment)); - } - - @Test - public void testBroadcastToAllServers() - { - final ForeverBroadcastDistributionRule rule = new ForeverBroadcastDistributionRule(); - - CoordinatorRunStats stats = runRuleAndGetStats( - rule, - smallSegment, - makeCoordinartorRuntimeParams( - druidCluster, - smallSegment, - largeSegments.get(0), - largeSegments.get(1), - largeSegments.get(2), - largeSegments2.get(0), - largeSegments2.get(1) - ) - ); - - Assert.assertEquals(2L, stats.getSegmentStat(Stats.Segments.ASSIGNED, TIER_1, DS_SMALL)); - Assert.assertEquals(3L, stats.getSegmentStat(Stats.Segments.ASSIGNED, TIER_2, DS_SMALL)); - - Assert.assertTrue( - druidCluster.getAllServers().stream().allMatch( - holder -> holder.isLoadingSegment(smallSegment) || holder.isServingSegment(smallSegment) - ) - ); - } - - private CoordinatorRunStats runRuleAndGetStats( + private CoordinatorRunStats runRuleOnSegment( Rule rule, DataSegment segment, DruidCoordinatorRuntimeParams params @@ -431,4 +227,42 @@ public class BroadcastDistributionRuleTest rule.run(segment, segmentAssigner); return segmentAssigner.getStats(); } + + private DruidCoordinatorRuntimeParams makeParamsWithUsedSegments( + DruidCluster druidCluster, + DataSegment... usedSegments + ) + { + return DruidCoordinatorRuntimeParams + .newBuilder(DateTimes.nowUtc()) + .withDruidCluster(druidCluster) + .withUsedSegmentsInTest(usedSegments) + .withBalancerStrategy(new RandomBalancerStrategy()) + .withSegmentAssignerUsing(new SegmentLoadQueueManager(null, null, null)) + .build(); + } + + private ServerHolder create10gbHistorical(String tier, DataSegment... segments) + { + DruidServer server = create10gbServer(ServerType.HISTORICAL, tier); + for (DataSegment segment : segments) { + server.addDataSegment(segment); + } + return new ServerHolder(server.toImmutableDruidServer(), new TestLoadQueuePeon()); + } + + private ServerHolder createDecommissioningHistorical(String tier, DataSegment... segments) + { + DruidServer server = create10gbServer(ServerType.HISTORICAL, tier); + for (DataSegment segment : segments) { + server.addDataSegment(segment); + } + return new ServerHolder(server.toImmutableDruidServer(), new TestLoadQueuePeon(), true); + } + + private DruidServer create10gbServer(ServerType type, String tier) + { + final String name = "server_" + serverId++; + return new DruidServer(name, name, null, 10L << 30, type, tier, 0); + } }