diff --git a/benchmarks/src/test/java/org/apache/druid/server/coordinator/BalancerStrategyBenchmark.java b/benchmarks/src/test/java/org/apache/druid/server/coordinator/BalancerStrategyBenchmark.java index 2afd99c8c38..e51cc7da3a9 100644 --- a/benchmarks/src/test/java/org/apache/druid/server/coordinator/BalancerStrategyBenchmark.java +++ b/benchmarks/src/test/java/org/apache/druid/server/coordinator/BalancerStrategyBenchmark.java @@ -27,7 +27,7 @@ import org.apache.druid.server.coordination.DruidServerMetadata; import org.apache.druid.server.coordination.ServerType; import org.apache.druid.server.coordinator.balancer.BalancerSegmentHolder; import org.apache.druid.server.coordinator.balancer.ReservoirSegmentSampler; -import org.apache.druid.server.coordinator.loading.LoadQueuePeonTester; +import org.apache.druid.server.coordinator.loading.TestLoadQueuePeon; import org.apache.druid.timeline.DataSegment; import org.joda.time.Interval; import org.openjdk.jmh.annotations.Benchmark; @@ -105,7 +105,7 @@ public class BalancerStrategyBenchmark ImmutableMap.of("test", new ImmutableDruidDataSource("test", Collections.emptyMap(), segments)), segments.size() ), - new LoadQueuePeonTester() + new TestLoadQueuePeon() ) ); } diff --git a/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsProfiler.java b/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsProfiler.java index 4a45e16bf96..6a76a865d85 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsProfiler.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsProfiler.java @@ -31,8 +31,8 @@ import org.apache.druid.metadata.MetadataRuleManager; import org.apache.druid.server.coordinator.duty.BalanceSegments; import org.apache.druid.server.coordinator.duty.RunRules; import org.apache.druid.server.coordinator.loading.LoadQueuePeon; -import org.apache.druid.server.coordinator.loading.LoadQueuePeonTester; import org.apache.druid.server.coordinator.loading.SegmentLoadQueueManager; +import org.apache.druid.server.coordinator.loading.TestLoadQueuePeon; import org.apache.druid.server.coordinator.rules.PeriodLoadRule; import org.apache.druid.server.coordinator.rules.Rule; import org.apache.druid.timeline.DataSegment; @@ -117,7 +117,7 @@ public class BalanceSegmentsProfiler EasyMock.expect(server.getSegment(EasyMock.anyObject())).andReturn(null).anyTimes(); EasyMock.replay(server); - LoadQueuePeon peon = new LoadQueuePeonTester(); + LoadQueuePeon peon = new TestLoadQueuePeon(); serverHolderList.add(new ServerHolder(server, peon)); } @@ -154,8 +154,8 @@ public class BalanceSegmentsProfiler public void profileRun() { Stopwatch watch = Stopwatch.createUnstarted(); - LoadQueuePeonTester fromPeon = new LoadQueuePeonTester(); - LoadQueuePeonTester toPeon = new LoadQueuePeonTester(); + TestLoadQueuePeon fromPeon = new TestLoadQueuePeon(); + TestLoadQueuePeon toPeon = new TestLoadQueuePeon(); EasyMock.expect(druidServer1.getName()).andReturn("from").atLeastOnce(); EasyMock.expect(druidServer1.getCurrSize()).andReturn(30L).atLeastOnce(); diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DruidClusterTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DruidClusterTest.java index 182d58dd387..17a4de1d73f 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/DruidClusterTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/DruidClusterTest.java @@ -22,7 +22,7 @@ package org.apache.druid.server.coordinator; import org.apache.druid.client.DruidServer; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.server.coordination.ServerType; -import org.apache.druid.server.coordinator.loading.LoadQueuePeonTester; +import org.apache.druid.server.coordinator.loading.TestLoadQueuePeon; import org.apache.druid.timeline.DataSegment; import org.junit.Assert; import org.junit.Before; @@ -47,13 +47,13 @@ public class DruidClusterTest private static final ServerHolder NEW_REALTIME = new ServerHolder( new DruidServer("name1", "host2", null, 100L, ServerType.REALTIME, "tier1", 0) .addDataSegment(SEGMENTS.get(0)).toImmutableDruidServer(), - new LoadQueuePeonTester() + new TestLoadQueuePeon() ); private static final ServerHolder NEW_HISTORICAL = new ServerHolder( new DruidServer("name1", "host2", null, 100L, ServerType.HISTORICAL, "tier1", 0) .addDataSegment(SEGMENTS.get(0)).toImmutableDruidServer(), - new LoadQueuePeonTester() + new TestLoadQueuePeon() ); private DruidCluster.Builder clusterBuilder; @@ -67,14 +67,14 @@ public class DruidClusterTest new ServerHolder( new DruidServer("name1", "host1", null, 100L, ServerType.REALTIME, "tier1", 0) .addDataSegment(SEGMENTS.get(0)).toImmutableDruidServer(), - new LoadQueuePeonTester() + new TestLoadQueuePeon() ) ) .add( new ServerHolder( new DruidServer("name1", "host1", null, 100L, ServerType.HISTORICAL, "tier1", 0) .addDataSegment(SEGMENTS.get(0)).toImmutableDruidServer(), - new LoadQueuePeonTester() + new TestLoadQueuePeon() ) ); } diff --git a/server/src/test/java/org/apache/druid/server/coordinator/RoundRobinServerSelectorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/RoundRobinServerSelectorTest.java index 4164bba3649..62f84b631a3 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/RoundRobinServerSelectorTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/RoundRobinServerSelectorTest.java @@ -23,8 +23,8 @@ import org.apache.druid.client.DruidServer; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.segment.IndexIO; import org.apache.druid.server.coordination.ServerType; -import org.apache.druid.server.coordinator.loading.LoadQueuePeonTester; import org.apache.druid.server.coordinator.loading.RoundRobinServerSelector; +import org.apache.druid.server.coordinator.loading.TestLoadQueuePeon; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.NumberedShardSpec; import org.junit.Assert; @@ -140,7 +140,7 @@ public class RoundRobinServerSelectorTest return new ServerHolder( new DruidServer(name, name, null, size, ServerType.HISTORICAL, TIER, 1) .toImmutableDruidServer(), - new LoadQueuePeonTester() + new TestLoadQueuePeon() ); } } diff --git a/server/src/test/java/org/apache/druid/server/coordinator/ServerHolderTest.java b/server/src/test/java/org/apache/druid/server/coordinator/ServerHolderTest.java index ed9735c6bcc..fd1fb7c3062 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/ServerHolderTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/ServerHolderTest.java @@ -26,7 +26,7 @@ import org.apache.druid.client.ImmutableDruidServer; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.server.coordination.DruidServerMetadata; import org.apache.druid.server.coordination.ServerType; -import org.apache.druid.server.coordinator.loading.LoadQueuePeonTester; +import org.apache.druid.server.coordinator.loading.TestLoadQueuePeon; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.NoneShardSpec; import org.junit.Assert; @@ -79,7 +79,7 @@ public class ServerHolderTest ImmutableMap.of("src1", DATA_SOURCES.get("src1")), 1 ), - new LoadQueuePeonTester() + new TestLoadQueuePeon() ); // available size of 100 @@ -90,7 +90,7 @@ public class ServerHolderTest ImmutableMap.of("src1", DATA_SOURCES.get("src1")), 1 ), - new LoadQueuePeonTester() + new TestLoadQueuePeon() ); // available size of 10 @@ -101,7 +101,7 @@ public class ServerHolderTest ImmutableMap.of("src1", DATA_SOURCES.get("src1")), 1 ), - new LoadQueuePeonTester() + new TestLoadQueuePeon() ); // available size of 50 @@ -112,7 +112,7 @@ public class ServerHolderTest ImmutableMap.of("src1", DATA_SOURCES.get("src1")), 1 ), - new LoadQueuePeonTester() + new TestLoadQueuePeon() ); Assert.assertEquals(0, h1.compareTo(h2)); @@ -130,7 +130,7 @@ public class ServerHolderTest ImmutableMap.of("src1", DATA_SOURCES.get("src1")), 1 ), - new LoadQueuePeonTester() + new TestLoadQueuePeon() ); final ServerHolder h2 = new ServerHolder( @@ -140,7 +140,7 @@ public class ServerHolderTest ImmutableMap.of("src1", DATA_SOURCES.get("src1")), 1 ), - new LoadQueuePeonTester() + new TestLoadQueuePeon() ); final ServerHolder h3 = new ServerHolder( @@ -150,7 +150,7 @@ public class ServerHolderTest ImmutableMap.of("src1", DATA_SOURCES.get("src1")), 1 ), - new LoadQueuePeonTester() + new TestLoadQueuePeon() ); final ServerHolder h4 = new ServerHolder( @@ -160,7 +160,7 @@ public class ServerHolderTest ImmutableMap.of("src1", DATA_SOURCES.get("src1")), 1 ), - new LoadQueuePeonTester() + new TestLoadQueuePeon() ); final ServerHolder h5 = new ServerHolder( @@ -170,7 +170,7 @@ public class ServerHolderTest ImmutableMap.of("src1", DATA_SOURCES.get("src1")), 1 ), - new LoadQueuePeonTester() + new TestLoadQueuePeon() ); Assert.assertEquals(h1, h2); @@ -189,7 +189,7 @@ public class ServerHolderTest ImmutableMap.of("src1", DATA_SOURCES.get("src1")), 1 ), - new LoadQueuePeonTester() + new TestLoadQueuePeon() ); Assert.assertTrue(h1.isServingSegment(SEGMENTS.get(0))); Assert.assertFalse(h1.isServingSegment(SEGMENTS.get(1))); diff --git a/server/src/test/java/org/apache/druid/server/coordinator/balancer/BalancerStrategyTest.java b/server/src/test/java/org/apache/druid/server/coordinator/balancer/BalancerStrategyTest.java index ad4d7f65aed..6e0e3ddf0b3 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/balancer/BalancerStrategyTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/balancer/BalancerStrategyTest.java @@ -24,7 +24,7 @@ import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.server.coordination.ServerType; import org.apache.druid.server.coordinator.ServerHolder; -import org.apache.druid.server.coordinator.loading.LoadQueuePeonTester; +import org.apache.druid.server.coordinator.loading.TestLoadQueuePeon; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.NoneShardSpec; import org.junit.Assert; @@ -84,7 +84,7 @@ public class BalancerStrategyTest { final ServerHolder serverHolder = new ServerHolder( new DruidServer("server1", "host1", null, 10L, ServerType.HISTORICAL, DruidServer.DEFAULT_TIER, 0).addDataSegment(proposedDataSegment).toImmutableDruidServer(), - new LoadQueuePeonTester()); + new TestLoadQueuePeon()); Assert.assertFalse( balancerStrategy.findServersToLoadSegment( proposedDataSegment, @@ -99,12 +99,12 @@ public class BalancerStrategyTest final ServerHolder serverHolder1 = new ServerHolder( new DruidServer("server1", "host1", null, 1000L, ServerType.HISTORICAL, DruidServer.DEFAULT_TIER, 0) .addDataSegment(proposedDataSegment).toImmutableDruidServer(), - new LoadQueuePeonTester()); + new TestLoadQueuePeon()); final ServerHolder serverHolder2 = new ServerHolder( new DruidServer("server2", "host2", null, 1000L, ServerType.HISTORICAL, DruidServer.DEFAULT_TIER, 0) .addDataSegment(proposedDataSegment).toImmutableDruidServer(), - new LoadQueuePeonTester()); + new TestLoadQueuePeon()); serverHolders = new ArrayList<>(); serverHolders.add(serverHolder1); @@ -119,7 +119,7 @@ public class BalancerStrategyTest { final ServerHolder serverHolder = new ServerHolder( new DruidServer("server1", "host1", null, 1000L, ServerType.HISTORICAL, DruidServer.DEFAULT_TIER, 0).toImmutableDruidServer(), - new LoadQueuePeonTester()); + new TestLoadQueuePeon()); serverHolders = new ArrayList<>(); serverHolders.add(serverHolder); final ServerHolder foundServerHolder = balancerStrategy diff --git a/server/src/test/java/org/apache/druid/server/coordinator/balancer/CachingCostBalancerStrategyTest.java b/server/src/test/java/org/apache/druid/server/coordinator/balancer/CachingCostBalancerStrategyTest.java index 23975352b33..f0d65199745 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/balancer/CachingCostBalancerStrategyTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/balancer/CachingCostBalancerStrategyTest.java @@ -26,7 +26,7 @@ import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.server.coordination.ServerType; import org.apache.druid.server.coordinator.ServerHolder; -import org.apache.druid.server.coordinator.loading.LoadQueuePeonTester; +import org.apache.druid.server.coordinator.loading.TestLoadQueuePeon; import org.apache.druid.timeline.DataSegment; import org.joda.time.DateTime; import org.joda.time.Interval; @@ -141,7 +141,7 @@ public class CachingCostBalancerStrategyTest .forEach(druidServer::addDataSegment); return new ServerHolder( druidServer.toImmutableDruidServer(), - new LoadQueuePeonTester() + new TestLoadQueuePeon() ); } diff --git a/server/src/test/java/org/apache/druid/server/coordinator/balancer/CostBalancerStrategyTest.java b/server/src/test/java/org/apache/druid/server/coordinator/balancer/CostBalancerStrategyTest.java index 17d0a8716a1..e4b3715960f 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/balancer/CostBalancerStrategyTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/balancer/CostBalancerStrategyTest.java @@ -29,7 +29,7 @@ import org.apache.druid.java.util.metrics.StubServiceEmitter; import org.apache.druid.server.coordination.ServerType; import org.apache.druid.server.coordinator.CreateDataSegments; import org.apache.druid.server.coordinator.ServerHolder; -import org.apache.druid.server.coordinator.loading.LoadQueuePeonTester; +import org.apache.druid.server.coordinator.loading.TestLoadQueuePeon; import org.apache.druid.server.coordinator.stats.CoordinatorRunStats; import org.apache.druid.server.coordinator.stats.Dimension; import org.apache.druid.server.coordinator.stats.RowKey; @@ -283,7 +283,7 @@ public class CostBalancerStrategyTest // Create ServerHolder for each server final List serverHolders = historicals.stream().map( - server -> new ServerHolder(server.toImmutableDruidServer(), new LoadQueuePeonTester()) + server -> new ServerHolder(server.toImmutableDruidServer(), new TestLoadQueuePeon()) ).collect(Collectors.toList()); final ServerHolder serverA = serverHolders.get(0); @@ -321,11 +321,11 @@ public class CostBalancerStrategyTest { final ServerHolder serverA = new ServerHolder( createHistorical().toImmutableDruidServer(), - new LoadQueuePeonTester() + new TestLoadQueuePeon() ); final ServerHolder serverB = new ServerHolder( createHistorical().toImmutableDruidServer(), - new LoadQueuePeonTester() + new TestLoadQueuePeon() ); final DataSegment segment = CreateDataSegments.ofDatasource(DS_WIKI).eachOfSizeInMb(100).get(0); @@ -356,7 +356,7 @@ public class CostBalancerStrategyTest .startingAt("2012-10-24") .eachOfSizeInMb(100).get(0); - final LoadQueuePeonTester peon = new LoadQueuePeonTester(); + final TestLoadQueuePeon peon = new TestLoadQueuePeon(); ServerHolder serverA = new ServerHolder(createHistorical().toImmutableDruidServer(), peon); ServerHolder serverB = new ServerHolder(createHistorical().toImmutableDruidServer(), peon); diff --git a/server/src/test/java/org/apache/druid/server/coordinator/balancer/DiskNormalizedCostBalancerStrategyTest.java b/server/src/test/java/org/apache/druid/server/coordinator/balancer/DiskNormalizedCostBalancerStrategyTest.java index 98f86ec3181..cbc4b8f0f7b 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/balancer/DiskNormalizedCostBalancerStrategyTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/balancer/DiskNormalizedCostBalancerStrategyTest.java @@ -29,7 +29,7 @@ import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.server.coordination.DruidServerMetadata; import org.apache.druid.server.coordination.ServerType; import org.apache.druid.server.coordinator.ServerHolder; -import org.apache.druid.server.coordinator.loading.LoadQueuePeonTester; +import org.apache.druid.server.coordinator.loading.TestLoadQueuePeon; import org.apache.druid.timeline.DataSegment; import org.easymock.EasyMock; import org.joda.time.Interval; @@ -57,7 +57,7 @@ public class DiskNormalizedCostBalancerStrategyTest // Create 10 servers with current size being 3K & max size being 10K // Each having having 100 segments for (int i = 0; i < serverCount; i++) { - LoadQueuePeonTester fromPeon = new LoadQueuePeonTester(); + TestLoadQueuePeon fromPeon = new TestLoadQueuePeon(); List segments = IntStream .range(0, maxSegments) @@ -78,7 +78,7 @@ public class DiskNormalizedCostBalancerStrategyTest } // The best server to be available for next segment assignment has greater max Size - LoadQueuePeonTester fromPeon = new LoadQueuePeonTester(); + TestLoadQueuePeon fromPeon = new TestLoadQueuePeon(); ImmutableDruidServer druidServer = EasyMock.createMock(ImmutableDruidServer.class); EasyMock.expect(druidServer.getName()).andReturn("BEST_SERVER").anyTimes(); EasyMock.expect(druidServer.getCurrSize()).andReturn(3000L).anyTimes(); diff --git a/server/src/test/java/org/apache/druid/server/coordinator/balancer/ReservoirSegmentSamplerTest.java b/server/src/test/java/org/apache/druid/server/coordinator/balancer/ReservoirSegmentSamplerTest.java index e6387a5a343..c438219e1b3 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/balancer/ReservoirSegmentSamplerTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/balancer/ReservoirSegmentSamplerTest.java @@ -25,8 +25,8 @@ import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.server.coordination.ServerType; import org.apache.druid.server.coordinator.CreateDataSegments; import org.apache.druid.server.coordinator.ServerHolder; -import org.apache.druid.server.coordinator.loading.LoadQueuePeonTester; import org.apache.druid.server.coordinator.loading.SegmentAction; +import org.apache.druid.server.coordinator.loading.TestLoadQueuePeon; import org.apache.druid.timeline.DataSegment; import org.junit.Assert; import org.junit.Before; @@ -175,7 +175,7 @@ public class ReservoirSegmentSamplerTest .addDataSegment(segments.get(2)) .addDataSegment(segments.get(3)) .toImmutableDruidServer(), - new LoadQueuePeonTester() + new TestLoadQueuePeon() ); // Try to pick all the segments on the servers @@ -380,6 +380,6 @@ public class ReservoirSegmentSamplerTest for (DataSegment segment : loadedSegments) { server.addDataSegment(segment); } - return new ServerHolder(server.toImmutableDruidServer(), new LoadQueuePeonTester()); + return new ServerHolder(server.toImmutableDruidServer(), new TestLoadQueuePeon()); } } diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/BalanceSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/BalanceSegmentsTest.java index 185f8e82630..c608f58d9a4 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/BalanceSegmentsTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/BalanceSegmentsTest.java @@ -31,8 +31,8 @@ import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams; import org.apache.druid.server.coordinator.ServerHolder; import org.apache.druid.server.coordinator.balancer.BalancerStrategy; import org.apache.druid.server.coordinator.balancer.CostBalancerStrategyFactory; -import org.apache.druid.server.coordinator.loading.LoadQueuePeonTester; import org.apache.druid.server.coordinator.loading.SegmentLoadQueueManager; +import org.apache.druid.server.coordinator.loading.TestLoadQueuePeon; import org.apache.druid.server.coordinator.stats.CoordinatorRunStats; import org.apache.druid.server.coordinator.stats.Stats; import org.apache.druid.timeline.DataSegment; @@ -423,7 +423,7 @@ public class BalanceSegmentsTest return new ServerHolder( server.toImmutableDruidServer(), - new LoadQueuePeonTester(), + new TestLoadQueuePeon(), isDecommissioning, maxSegmentsInLoadQueue, 10 diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/UnloadUnusedSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/UnloadUnusedSegmentsTest.java index 83aa11fb303..ba39c5fc439 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/UnloadUnusedSegmentsTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/UnloadUnusedSegmentsTest.java @@ -33,8 +33,8 @@ import org.apache.druid.server.coordinator.DruidCluster; import org.apache.druid.server.coordinator.DruidCoordinator; import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams; import org.apache.druid.server.coordinator.ServerHolder; -import org.apache.druid.server.coordinator.loading.LoadQueuePeonTester; import org.apache.druid.server.coordinator.loading.SegmentLoadQueueManager; +import org.apache.druid.server.coordinator.loading.TestLoadQueuePeon; import org.apache.druid.server.coordinator.rules.ForeverBroadcastDistributionRule; import org.apache.druid.server.coordinator.rules.ForeverLoadRule; import org.apache.druid.server.coordinator.stats.CoordinatorRunStats; @@ -61,10 +61,10 @@ public class UnloadUnusedSegmentsTest private ImmutableDruidServer historicalServerTier2; private ImmutableDruidServer brokerServer; private ImmutableDruidServer indexerServer; - private LoadQueuePeonTester historicalPeon; - private LoadQueuePeonTester historicalTier2Peon; - private LoadQueuePeonTester brokerPeon; - private LoadQueuePeonTester indexerPeon; + private TestLoadQueuePeon historicalPeon; + private TestLoadQueuePeon historicalTier2Peon; + private TestLoadQueuePeon brokerPeon; + private TestLoadQueuePeon indexerPeon; private DataSegment segment1; private DataSegment segment2; private List segments; @@ -143,10 +143,10 @@ public class UnloadUnusedSegmentsTest segmentsForRealtime.add(realtimeOnlySegment); segmentsForRealtime.add(broadcastSegment); - historicalPeon = new LoadQueuePeonTester(); - historicalTier2Peon = new LoadQueuePeonTester(); - brokerPeon = new LoadQueuePeonTester(); - indexerPeon = new LoadQueuePeonTester(); + historicalPeon = new TestLoadQueuePeon(); + historicalTier2Peon = new TestLoadQueuePeon(); + brokerPeon = new TestLoadQueuePeon(); + indexerPeon = new TestLoadQueuePeon(); final ImmutableDruidDataSource dataSource1 = new ImmutableDruidDataSource( "datasource1", diff --git a/server/src/test/java/org/apache/druid/server/coordinator/loading/LoadQueuePeonTester.java b/server/src/test/java/org/apache/druid/server/coordinator/loading/LoadQueuePeonTester.java deleted file mode 100644 index e1a22aaa078..00000000000 --- a/server/src/test/java/org/apache/druid/server/coordinator/loading/LoadQueuePeonTester.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.server.coordinator.loading; - -import org.apache.druid.java.util.common.concurrent.Execs; -import org.apache.druid.server.coordinator.TestDruidCoordinatorConfig; -import org.apache.druid.timeline.DataSegment; -import org.joda.time.Duration; - -import javax.annotation.Nullable; -import java.util.concurrent.ConcurrentSkipListSet; - -public class LoadQueuePeonTester extends CuratorLoadQueuePeon -{ - private final ConcurrentSkipListSet segmentsToLoad = new ConcurrentSkipListSet<>(); - - public LoadQueuePeonTester() - { - super( - null, - null, - null, - Execs.scheduledSingleThreaded("LoadQueuePeonTester--%d"), - null, - new TestDruidCoordinatorConfig.Builder() - .withLoadTimeoutDelay(new Duration(1)) - .withCoordinatorKillMaxSegments(10) - .withCoordinatorKillIgnoreDurationToRetain(false) - .build() - ); - } - - @Override - public void loadSegment(DataSegment segment, SegmentAction action, @Nullable LoadPeonCallback callback) - { - segmentsToLoad.add(segment); - } - - @Override - public ConcurrentSkipListSet getSegmentsToLoad() - { - return segmentsToLoad; - } -} diff --git a/server/src/test/java/org/apache/druid/server/coordinator/loading/TestLoadQueuePeon.java b/server/src/test/java/org/apache/druid/server/coordinator/loading/TestLoadQueuePeon.java new file mode 100644 index 00000000000..c496c37e3ec --- /dev/null +++ b/server/src/test/java/org/apache/druid/server/coordinator/loading/TestLoadQueuePeon.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordinator.loading; + +import org.apache.druid.server.coordinator.stats.CoordinatorRunStats; +import org.apache.druid.timeline.DataSegment; + +import javax.annotation.Nullable; +import java.util.Collections; +import java.util.Set; +import java.util.concurrent.ConcurrentSkipListSet; + +/** + * Test implementation of {@link LoadQueuePeon} that maintains a set of segments + * that have been queued for load or drop. + */ +public class TestLoadQueuePeon implements LoadQueuePeon +{ + private final ConcurrentSkipListSet segmentsToLoad = new ConcurrentSkipListSet<>(); + private final ConcurrentSkipListSet segmentsToDrop = new ConcurrentSkipListSet<>(); + + private final CoordinatorRunStats stats = new CoordinatorRunStats(); + + @Override + public void loadSegment(DataSegment segment, SegmentAction action, @Nullable LoadPeonCallback callback) + { + segmentsToLoad.add(segment); + } + + @Override + public void dropSegment(DataSegment segment, LoadPeonCallback callback) + { + segmentsToDrop.add(segment); + } + + @Override + public long getSizeOfSegmentsToLoad() + { + return 0; + } + + @Override + public CoordinatorRunStats getAndResetStats() + { + return stats; + } + + @Override + public boolean cancelOperation(DataSegment segment) + { + return false; + } + + @Override + public void start() + { + + } + + @Override + public void stop() + { + + } + + @Override + public ConcurrentSkipListSet getSegmentsToLoad() + { + return segmentsToLoad; + } + + @Override + public Set getSegmentsInQueue() + { + return Collections.emptySet(); + } + + @Override + public Set getSegmentsToDrop() + { + return segmentsToDrop; + } + + @Override + public Set getTimedOutSegments() + { + return Collections.emptySet(); + } + + @Override + public void markSegmentToDrop(DataSegment segmentToLoad) + { + + } + + @Override + public void unmarkSegmentToDrop(DataSegment segmentToLoad) + { + + } + + @Override + public Set getSegmentsMarkedToDrop() + { + return Collections.emptySet(); + } +} diff --git a/server/src/test/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRuleTest.java b/server/src/test/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRuleTest.java index acf815089bb..e21d823f346 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRuleTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRuleTest.java @@ -27,9 +27,9 @@ import org.apache.druid.server.coordinator.DruidCluster; import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams; import org.apache.druid.server.coordinator.ServerHolder; import org.apache.druid.server.coordinator.balancer.RandomBalancerStrategy; -import org.apache.druid.server.coordinator.loading.LoadQueuePeonTester; import org.apache.druid.server.coordinator.loading.SegmentLoadQueueManager; import org.apache.druid.server.coordinator.loading.StrategicSegmentAssigner; +import org.apache.druid.server.coordinator.loading.TestLoadQueuePeon; import org.apache.druid.server.coordinator.stats.CoordinatorRunStats; import org.apache.druid.server.coordinator.stats.Stats; import org.apache.druid.timeline.DataSegment; @@ -120,7 +120,7 @@ public class BroadcastDistributionRuleTest 0 ).addDataSegment(smallSegment) .toImmutableDruidServer(), - new LoadQueuePeonTester() + new TestLoadQueuePeon() ); holdersOfLargeSegments.add( @@ -135,7 +135,7 @@ public class BroadcastDistributionRuleTest 0 ).addDataSegment(largeSegments.get(0)) .toImmutableDruidServer(), - new LoadQueuePeonTester() + new TestLoadQueuePeon() ) ); holdersOfLargeSegments.add( @@ -150,7 +150,7 @@ public class BroadcastDistributionRuleTest 0 ).addDataSegment(largeSegments.get(1)) .toImmutableDruidServer(), - new LoadQueuePeonTester() + new TestLoadQueuePeon() ) ); holdersOfLargeSegments.add( @@ -165,7 +165,7 @@ public class BroadcastDistributionRuleTest 0 ).addDataSegment(largeSegments.get(2)) .toImmutableDruidServer(), - new LoadQueuePeonTester() + new TestLoadQueuePeon() ) ); @@ -181,7 +181,7 @@ public class BroadcastDistributionRuleTest 0 ).addDataSegment(largeSegments2.get(0)) .toImmutableDruidServer(), - new LoadQueuePeonTester() + new TestLoadQueuePeon() ) ); holdersOfLargeSegments2.add( @@ -196,7 +196,7 @@ public class BroadcastDistributionRuleTest 0 ).addDataSegment(largeSegments2.get(1)) .toImmutableDruidServer(), - new LoadQueuePeonTester() + new TestLoadQueuePeon() ) ); @@ -211,7 +211,7 @@ public class BroadcastDistributionRuleTest 0 ).addDataSegment(largeSegments.get(0)) .toImmutableDruidServer(), - new LoadQueuePeonTester() + new TestLoadQueuePeon() ); decommissioningServer1 = new ServerHolder( @@ -225,7 +225,7 @@ public class BroadcastDistributionRuleTest 0 ).addDataSegment(smallSegment) .toImmutableDruidServer(), - new LoadQueuePeonTester(), + new TestLoadQueuePeon(), true ); @@ -240,7 +240,7 @@ public class BroadcastDistributionRuleTest 0 ).addDataSegment(largeSegments.get(1)) .toImmutableDruidServer(), - new LoadQueuePeonTester(), + new TestLoadQueuePeon(), true ); diff --git a/server/src/test/java/org/apache/druid/server/coordinator/rules/LoadRuleTest.java b/server/src/test/java/org/apache/druid/server/coordinator/rules/LoadRuleTest.java index c7e3b5b239f..7ef07719bef 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/rules/LoadRuleTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/rules/LoadRuleTest.java @@ -24,7 +24,6 @@ import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import nl.jqno.equalsverifier.EqualsVerifier; import org.apache.druid.client.DruidServer; -import org.apache.druid.client.ImmutableDruidServer; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.concurrent.Execs; @@ -38,18 +37,14 @@ import org.apache.druid.server.coordinator.ServerHolder; import org.apache.druid.server.coordinator.balancer.BalancerStrategy; import org.apache.druid.server.coordinator.balancer.CachingCostBalancerStrategy; import org.apache.druid.server.coordinator.balancer.ClusterCostCache; -import org.apache.druid.server.coordinator.balancer.CostBalancerStrategyFactory; -import org.apache.druid.server.coordinator.loading.LoadQueuePeon; -import org.apache.druid.server.coordinator.loading.LoadQueuePeonTester; -import org.apache.druid.server.coordinator.loading.SegmentAction; -import org.apache.druid.server.coordinator.loading.SegmentHolder; +import org.apache.druid.server.coordinator.balancer.CostBalancerStrategy; import org.apache.druid.server.coordinator.loading.SegmentLoadQueueManager; import org.apache.druid.server.coordinator.loading.StrategicSegmentAssigner; +import org.apache.druid.server.coordinator.loading.TestLoadQueuePeon; import org.apache.druid.server.coordinator.stats.CoordinatorRunStats; import org.apache.druid.server.coordinator.stats.Stats; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.NoneShardSpec; -import org.easymock.EasyMock; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -62,7 +57,6 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; /** @@ -96,7 +90,7 @@ public class LoadRuleTest public void setUp() { exec = MoreExecutors.listeningDecorator(Execs.multiThreaded(1, "LoadRuleTest-%d")); - balancerStrategy = new CostBalancerStrategyFactory().createBalancerStrategy(exec); + balancerStrategy = new CostBalancerStrategy(exec); loadQueueManager = new SegmentLoadQueueManager(null, null, null); } @@ -107,17 +101,15 @@ public class LoadRuleTest } @Test - public void testLoad() + public void testLoadRuleAssignsSegments() { - final LoadQueuePeon mockPeon = createEmptyPeon(); - mockPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject()); - EasyMock.expectLastCall().atLeastOnce(); - EasyMock.replay(mockPeon); - + // Cluster has 2 tiers with 1 server each + final ServerHolder server1 = createServer(Tier.T1); + final ServerHolder server2 = createServer(Tier.T2); DruidCluster druidCluster = DruidCluster .builder() - .addTier(Tier.T1, createServerHolder(Tier.T1, mockPeon, false)) - .addTier(Tier.T2, createServerHolder(Tier.T2, mockPeon, false)) + .addTier(Tier.T1, server1) + .addTier(Tier.T2, server2) .build(); final DataSegment segment = createDataSegment(DS_WIKI); @@ -126,8 +118,6 @@ public class LoadRuleTest Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED, Tier.T1, DS_WIKI)); Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED, Tier.T2, DS_WIKI)); - - EasyMock.verify(mockPeon); } private CoordinatorRunStats runRuleAndGetStats(LoadRule rule, DataSegment segment, DruidCluster cluster) @@ -169,99 +159,56 @@ public class LoadRuleTest @Test public void testLoadPrimaryAssignDoesNotOverAssign() { - final LoadQueuePeon mockPeon = createEmptyPeon(); - mockPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject()); - EasyMock.expectLastCall().atLeastOnce(); - EasyMock.replay(mockPeon); - - ImmutableDruidServer server1 = createServer(Tier.T1).toImmutableDruidServer(); - ImmutableDruidServer server2 = createServer(Tier.T1).toImmutableDruidServer(); + ServerHolder server1 = createServer(Tier.T1); + ServerHolder server2 = createServer(Tier.T1); DruidCluster druidCluster = DruidCluster .builder() - .addTier(Tier.T1, new ServerHolder(server1, mockPeon), new ServerHolder(server2, mockPeon)) + .addTier(Tier.T1, server1, server2) .build(); final LoadRule rule = loadForever(ImmutableMap.of(Tier.T1, 1)); final DataSegment segment = createDataSegment(DS_WIKI); - CoordinatorRunStats stats = runRuleAndGetStats(rule, segment, druidCluster); - Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED, Tier.T1, segment.getDataSource())); + CoordinatorRunStats firstRunStats = runRuleAndGetStats(rule, segment, druidCluster); + Assert.assertEquals(1L, firstRunStats.getSegmentStat(Stats.Segments.ASSIGNED, Tier.T1, segment.getDataSource())); + Assert.assertEquals(1, server1.getLoadingSegments().size() + server2.getLoadingSegments().size()); - // ensure multiple runs don't assign primary segment again if at replication count - final LoadQueuePeon loadingPeon = createLoadingPeon(segment, false); - EasyMock.replay(loadingPeon); - - DruidCluster afterLoad = DruidCluster - .builder() - .addTier(Tier.T1, new ServerHolder(server1, loadingPeon), new ServerHolder(server2, mockPeon)) - .build(); - - CoordinatorRunStats statsAfterLoadPrimary = runRuleAndGetStats(rule, segment, afterLoad); - - Assert.assertFalse(statsAfterLoadPrimary.hasStat(Stats.Segments.ASSIGNED)); - - EasyMock.verify(mockPeon); + // Verify that multiple runs don't assign primary segment again if at replication count + CoordinatorRunStats secondRunStats = runRuleAndGetStats(rule, segment, druidCluster); + Assert.assertFalse(secondRunStats.hasStat(Stats.Segments.ASSIGNED)); + Assert.assertEquals(1, server1.getLoadingSegments().size() + server2.getLoadingSegments().size()); } @Test @Ignore("Enable this test when timeout behaviour is fixed") public void testOverAssignForTimedOutSegments() { - final LoadQueuePeon emptyPeon = createEmptyPeon(); - emptyPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject()); - EasyMock.expectLastCall().atLeastOnce(); - EasyMock.replay(emptyPeon); - - ImmutableDruidServer server1 = createServer(Tier.T1).toImmutableDruidServer(); - ImmutableDruidServer server2 = createServer(Tier.T1).toImmutableDruidServer(); + ServerHolder server1 = createServer(Tier.T1); + ServerHolder server2 = createServer(Tier.T1); DruidCluster druidCluster = DruidCluster .builder() - .addTier(Tier.T1, new ServerHolder(server1, emptyPeon), new ServerHolder(server2, emptyPeon)) + .addTier(Tier.T1, server1, server2) .build(); final LoadRule rule = loadForever(ImmutableMap.of(Tier.T1, 1)); final DataSegment segment = createDataSegment(DS_WIKI); - CoordinatorRunStats stats = runRuleAndGetStats( - rule, - segment, - makeCoordinatorRuntimeParams(druidCluster, segment) - ); + CoordinatorRunStats stats = runRuleAndGetStats(rule, segment, druidCluster); // Ensure that the segment is assigned to one of the historicals Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED, Tier.T1, segment.getDataSource())); // Ensure that the primary segment is assigned again in case the peon timed out on loading the segment - final LoadQueuePeon slowLoadingPeon = createLoadingPeon(segment, true); - EasyMock.replay(slowLoadingPeon); - - DruidCluster withLoadTimeout = DruidCluster - .builder() - .addTier(Tier.T1, new ServerHolder(server1, slowLoadingPeon), new ServerHolder(server2, emptyPeon)) - .build(); - - CoordinatorRunStats statsAfterLoadPrimary = runRuleAndGetStats( - rule, - segment, - makeCoordinatorRuntimeParams(withLoadTimeout, segment) - ); - + CoordinatorRunStats statsAfterLoadPrimary = runRuleAndGetStats(rule, segment, druidCluster); Assert.assertEquals(1L, statsAfterLoadPrimary.getSegmentStat(Stats.Segments.ASSIGNED, Tier.T1, DS_WIKI)); - - EasyMock.verify(emptyPeon); } @Test public void testSkipReplicationForTimedOutSegments() { - final LoadQueuePeon emptyPeon = createEmptyPeon(); - emptyPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject()); - EasyMock.expectLastCall().atLeastOnce(); - EasyMock.replay(emptyPeon); - - ImmutableDruidServer server1 = createServer(Tier.T1).toImmutableDruidServer(); - ImmutableDruidServer server2 = createServer(Tier.T1).toImmutableDruidServer(); + ServerHolder server1 = createServer(Tier.T1); + ServerHolder server2 = createServer(Tier.T1); DruidCluster druidCluster = DruidCluster .builder() - .addTier(Tier.T1, new ServerHolder(server1, emptyPeon), new ServerHolder(server2, emptyPeon)) + .addTier(Tier.T1, server1, server2) .build(); final LoadRule rule = loadForever(ImmutableMap.of(Tier.T1, 1)); @@ -272,20 +219,10 @@ public class LoadRuleTest Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED, Tier.T1, segment.getDataSource())); // Add the segment to the timed out list to simulate peon timeout on loading the segment - final LoadQueuePeon slowLoadingPeon = createLoadingPeon(segment, true); - EasyMock.replay(slowLoadingPeon); - - DruidCluster withLoadTimeout = DruidCluster - .builder() - .addTier(Tier.T1, new ServerHolder(server1, slowLoadingPeon), new ServerHolder(server2, emptyPeon)) - .build(); - // Default behavior is to not replicate the timed out segments on other servers - CoordinatorRunStats statsAfterLoadPrimary = runRuleAndGetStats(rule, segment, withLoadTimeout); + CoordinatorRunStats statsAfterLoadPrimary = runRuleAndGetStats(rule, segment, druidCluster); Assert.assertFalse(statsAfterLoadPrimary.hasStat(Stats.Segments.ASSIGNED)); - - EasyMock.verify(emptyPeon); } @Test @@ -297,14 +234,10 @@ public class LoadRuleTest .withNumPartitions(2) .eachOfSizeInMb(100); - final LoadQueuePeon loadingPeon = createLoadingPeon(segments.get(0), true); - loadingPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject()); - EasyMock.expectLastCall().once(); - EasyMock.replay(loadingPeon); - + final ServerHolder server1 = createServer(Tier.T1); DruidCluster druidCluster = DruidCluster .builder() - .addTier(Tier.T1, createServerHolder(Tier.T1, loadingPeon, false)) + .addTier(Tier.T1, server1) .build(); balancerStrategy = new CachingCostBalancerStrategy(ClusterCostCache.builder().build(), exec); @@ -316,104 +249,71 @@ public class LoadRuleTest makeCoordinatorRuntimeParams(druidCluster, segments.toArray(new DataSegment[0])) ); Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED, Tier.T1, DS_WIKI)); - - EasyMock.verify(loadingPeon); } @Test - public void testDrop() + public void testSegmentsAreDroppedIfLoadRuleHasZeroReplicas() { - final LoadQueuePeon mockPeon = createEmptyPeon(); - mockPeon.dropSegment(EasyMock.anyObject(), EasyMock.anyObject()); - EasyMock.expectLastCall().atLeastOnce(); - EasyMock.replay(mockPeon); - final DataSegment segment = createDataSegment(DS_WIKI); - DruidServer server1 = createServer(Tier.T1); - server1.addDataSegment(segment); - DruidServer server2 = createServer(Tier.T2); - server2.addDataSegment(segment); - DruidServer server3 = createServer(Tier.T2); + final ServerHolder serverT11 = createServer(Tier.T1, segment); + final ServerHolder serverT12 = createServer(Tier.T2, segment); + final ServerHolder serverT21 = createServer(Tier.T2, segment); DruidCluster druidCluster = DruidCluster .builder() - .addTier(Tier.T1, new ServerHolder(server1.toImmutableDruidServer(), mockPeon)) - .addTier( - Tier.T2, - new ServerHolder(server2.toImmutableDruidServer(), mockPeon), - new ServerHolder(server3.toImmutableDruidServer(), mockPeon) - ) + .addTier(Tier.T1, serverT11) + .addTier(Tier.T2, serverT12, serverT21) .build(); LoadRule rule = loadForever(ImmutableMap.of(Tier.T1, 0, Tier.T2, 0)); CoordinatorRunStats stats = runRuleAndGetStats(rule, segment, druidCluster); Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.DROPPED, Tier.T1, DS_WIKI)); - Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.DROPPED, Tier.T2, DS_WIKI)); - - EasyMock.verify(mockPeon); + Assert.assertEquals(2L, stats.getSegmentStat(Stats.Segments.DROPPED, Tier.T2, DS_WIKI)); } @Test - public void testLoadWithNonExistentTier() + public void testLoadIgnoresInvalidTiers() { - final LoadQueuePeon mockPeon = createEmptyPeon(); - mockPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject()); - EasyMock.expectLastCall().atLeastOnce(); - EasyMock.replay(mockPeon); - + ServerHolder server = createServer(Tier.T1); DruidCluster druidCluster = DruidCluster .builder() - .addTier(Tier.T1, createServerHolder(Tier.T1, mockPeon, false)) + .addTier(Tier.T1, server) .build(); final DataSegment segment = createDataSegment(DS_WIKI); - LoadRule rule = loadForever(ImmutableMap.of("nonExistentTier", 1, Tier.T1, 1)); + LoadRule rule = loadForever(ImmutableMap.of("invalidTier", 1, Tier.T1, 1)); CoordinatorRunStats stats = runRuleAndGetStats(rule, segment, druidCluster); - Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED, Tier.T1, DS_WIKI)); - - EasyMock.verify(mockPeon); + Assert.assertEquals(0L, stats.getSegmentStat(Stats.Segments.ASSIGNED, "invalidTier", DS_WIKI)); } @Test - public void testDropWithNonExistentTier() + public void testDropIgnoresInvalidTiers() { - final LoadQueuePeon mockPeon = createEmptyPeon(); - mockPeon.dropSegment(EasyMock.anyObject(), EasyMock.anyObject()); - EasyMock.expectLastCall().atLeastOnce(); - EasyMock.replay(mockPeon); - final DataSegment segment = createDataSegment(DS_WIKI); - DruidServer server1 = createServer(Tier.T1); - DruidServer server2 = createServer(Tier.T1); - server1.addDataSegment(segment); - server2.addDataSegment(segment); - + // Cluster has 1 tier with 2 servers + ServerHolder server1 = createServer(Tier.T1, segment); + ServerHolder server2 = createServer(Tier.T1, segment); DruidCluster druidCluster = DruidCluster .builder() - .addTier( - Tier.T1, - new ServerHolder(server1.toImmutableDruidServer(), mockPeon), - new ServerHolder(server2.toImmutableDruidServer(), mockPeon) - ) + .addTier(Tier.T1, server1, server2) .build(); - LoadRule rule = loadForever(ImmutableMap.of("nonExistentTier", 1, Tier.T1, 1)); + LoadRule rule = loadForever(ImmutableMap.of("invalidTier", 1, Tier.T1, 1)); CoordinatorRunStats stats = runRuleAndGetStats(rule, segment, druidCluster); Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.DROPPED, Tier.T1, DS_WIKI)); - - EasyMock.verify(mockPeon); + Assert.assertEquals(0L, stats.getSegmentStat(Stats.Segments.DROPPED, "invalidTier", DS_WIKI)); } @Test public void testMaxLoadingQueueSize() { - final LoadQueuePeonTester peon = new LoadQueuePeonTester(); + final TestLoadQueuePeon peon = new TestLoadQueuePeon(); final int maxSegmentsInQueue = 2; DruidCluster druidCluster = DruidCluster @@ -421,7 +321,7 @@ public class LoadRuleTest .addTier( Tier.T1, new ServerHolder( - createServer(Tier.T1).toImmutableDruidServer(), + createDruidServer(Tier.T1).toImmutableDruidServer(), peon, false, maxSegmentsInQueue, 10 ) ) @@ -456,63 +356,55 @@ public class LoadRuleTest Assert.assertEquals(0L, stats3.getSegmentStat(Stats.Segments.ASSIGNED, Tier.T1, dataSegment3.getDataSource())); } - /** - * 2 servers in different tiers, the first is decommissioning. - * Should not load a segment to the server that is decommissioning - */ @Test - public void testLoadDecommissioning() + public void testSegmentIsAssignedOnlyToActiveServer() { - final LoadQueuePeon mockPeon1 = createEmptyPeon(); - final LoadQueuePeon mockPeon2 = createOneCallPeonMock(); - EasyMock.replay(mockPeon1, mockPeon2); + final ServerHolder decommServerT1 = createDecommissioningServer(Tier.T1); + final ServerHolder serverT2 = createServer(Tier.T2); DruidCluster druidCluster = DruidCluster .builder() - .addTier(Tier.T1, createServerHolder(Tier.T1, mockPeon1, true)) - .addTier(Tier.T2, createServerHolder(Tier.T2, mockPeon2, false)) + .addTier(Tier.T1, decommServerT1) + .addTier(Tier.T2, serverT2) .build(); + // Load rule requires 1 replica on each tier LoadRule rule = loadForever(ImmutableMap.of(Tier.T1, 1, Tier.T2, 1)); DataSegment segment = createDataSegment(DS_WIKI); CoordinatorRunStats stats = runRuleAndGetStats(rule, segment, druidCluster); + // Verify that segment is not loaded on decommissioning server Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED, Tier.T2, DS_WIKI)); - EasyMock.verify(mockPeon1, mockPeon2); + Assert.assertEquals(0, decommServerT1.getLoadingSegments().size()); + Assert.assertTrue(serverT2.getLoadingSegments().contains(segment)); } - /** - * 2 tiers, 2 servers each, 1 server of the second tier is decommissioning. - * Should not load a segment to the server that is decommssioning. - */ @Test - public void testLoadReplicaDuringDecommissioning() + public void testSegmentIsAssignedOnlyToActiveServers() { - final LoadQueuePeon mockPeon1 = createEmptyPeon(); - final LoadQueuePeon mockPeon2 = createOneCallPeonMock(); - final LoadQueuePeon mockPeon3 = createOneCallPeonMock(); - final LoadQueuePeon mockPeon4 = createOneCallPeonMock(); - EasyMock.replay(mockPeon1, mockPeon2, mockPeon3, mockPeon4); - - ServerHolder holder1 = createServerHolder(Tier.T1, mockPeon1, true); - ServerHolder holder2 = createServerHolder(Tier.T1, mockPeon2, false); - ServerHolder holder3 = createServerHolder(Tier.T2, mockPeon3, false); - ServerHolder holder4 = createServerHolder(Tier.T2, mockPeon4, false); + // 2 tiers with 2 servers each, 1 server is decommissioning + ServerHolder decommServerT11 = createDecommissioningServer(Tier.T1); + ServerHolder serverT12 = createServer(Tier.T1); + ServerHolder serverT21 = createServer(Tier.T2); + ServerHolder serverT22 = createServer(Tier.T2); final DataSegment segment = createDataSegment(DS_WIKI); DruidCluster druidCluster = DruidCluster .builder() - .addTier(Tier.T1, holder1, holder2) - .addTier(Tier.T2, holder3, holder4) + .addTier(Tier.T1, decommServerT11, serverT12) + .addTier(Tier.T2, serverT21, serverT22) .build(); + // Load rule requires 2 replicas on each server LoadRule rule = loadForever(ImmutableMap.of(Tier.T1, 2, Tier.T2, 2)); CoordinatorRunStats stats = runRuleAndGetStats(rule, segment, druidCluster); + // Verify that no replica is assigned to decommissioning server Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED, Tier.T1, DS_WIKI)); - Assert.assertEquals(2L, stats.getSegmentStat(Stats.Segments.ASSIGNED, Tier.T2, DS_WIKI)); + Assert.assertTrue(decommServerT11.getLoadingSegments().isEmpty()); + Assert.assertEquals(0, decommServerT11.getLoadingSegments().size()); - EasyMock.verify(mockPeon1, mockPeon2, mockPeon3, mockPeon4); + Assert.assertEquals(2L, stats.getSegmentStat(Stats.Segments.ASSIGNED, Tier.T2, DS_WIKI)); } /** @@ -522,26 +414,15 @@ public class LoadRuleTest @Test public void testDropDuringDecommissioning() { - final LoadQueuePeon mockPeon = createEmptyPeon(); - mockPeon.dropSegment(EasyMock.anyObject(), EasyMock.anyObject()); - EasyMock.expectLastCall().times(2); - EasyMock.replay(mockPeon); - final DataSegment segment1 = createDataSegment("foo1"); final DataSegment segment2 = createDataSegment("foo2"); - DruidServer server1 = createServer(Tier.T1); - server1.addDataSegment(segment1); - DruidServer server2 = createServer(Tier.T1); - server2.addDataSegment(segment2); + final ServerHolder server1 = createDecommissioningServer(Tier.T1, segment1); + final ServerHolder server2 = createServer(Tier.T1, segment2); DruidCluster druidCluster = DruidCluster .builder() - .addTier( - Tier.T1, - new ServerHolder(server1.toImmutableDruidServer(), mockPeon, true), - new ServerHolder(server2.toImmutableDruidServer(), mockPeon, false) - ) + .addTier(Tier.T1, server1, server2) .build(); DruidCoordinatorRuntimeParams params = makeCoordinatorRuntimeParams(druidCluster, segment1, segment2); @@ -549,45 +430,29 @@ public class LoadRuleTest CoordinatorRunStats stats = runRuleAndGetStats(rule, segment1, params); Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.DROPPED, Tier.T1, segment1.getDataSource())); + Assert.assertTrue(server1.getPeon().getSegmentsToDrop().contains(segment1)); stats = runRuleAndGetStats(rule, segment2, params); Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.DROPPED, Tier.T1, segment2.getDataSource())); - - EasyMock.verify(mockPeon); + Assert.assertTrue(server2.getPeon().getSegmentsToDrop().contains(segment2)); } - /** - * 3 servers hosting 3 replicas of the segment. - * 1 servers is decommissioning. - * 1 replica is redundant. - * Should drop from the decommissioning server. - */ @Test - public void testRedundantReplicaDropDuringDecommissioning() + public void testExtraReplicasAreDroppedFromDecommissioningServer() { - final LoadQueuePeon mockPeon1 = new LoadQueuePeonTester(); - final LoadQueuePeon mockPeon2 = new LoadQueuePeonTester(); - final LoadQueuePeon mockPeon3 = new LoadQueuePeonTester(); - final DataSegment segment1 = createDataSegment(DS_WIKI); - DruidServer server1 = createServer(Tier.T1); - server1.addDataSegment(segment1); - DruidServer server2 = createServer(Tier.T1); - server2.addDataSegment(segment1); - DruidServer server3 = createServer(Tier.T1); - server3.addDataSegment(segment1); + // 3 servers, each serving the same segment + final ServerHolder server1 = createServer(Tier.T1, segment1); + final ServerHolder server2 = createDecommissioningServer(Tier.T1, segment1); + final ServerHolder server3 = createServer(Tier.T1, segment1); DruidCluster druidCluster = DruidCluster .builder() - .addTier( - Tier.T1, - new ServerHolder(server1.toImmutableDruidServer(), mockPeon1, false), - new ServerHolder(server2.toImmutableDruidServer(), mockPeon2, true), - new ServerHolder(server3.toImmutableDruidServer(), mockPeon3, false) - ) + .addTier(Tier.T1, server1, server2, server3) .build(); + // Load rule requires 2 replicas LoadRule rule = loadForever(ImmutableMap.of(Tier.T1, 2)); CoordinatorRunStats stats = runRuleAndGetStats( rule, @@ -595,10 +460,11 @@ public class LoadRuleTest makeCoordinatorRuntimeParams(druidCluster, segment1) ); + // Verify that the extra replica is dropped from the decommissioning server Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.DROPPED, Tier.T1, DS_WIKI)); - Assert.assertEquals(0, mockPeon1.getSegmentsToDrop().size()); - Assert.assertEquals(1, mockPeon2.getSegmentsToDrop().size()); - Assert.assertEquals(0, mockPeon3.getSegmentsToDrop().size()); + Assert.assertEquals(0, server1.getPeon().getSegmentsToDrop().size()); + Assert.assertEquals(1, server2.getPeon().getSegmentsToDrop().size()); + Assert.assertEquals(0, server3.getPeon().getSegmentsToDrop().size()); } private DataSegment createDataSegment(String dataSource) @@ -621,54 +487,36 @@ public class LoadRuleTest return new ForeverLoadRule(tieredReplicants, null); } - private static LoadQueuePeon createEmptyPeon() - { - final LoadQueuePeon mockPeon = EasyMock.createMock(LoadQueuePeon.class); - EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Collections.emptySet()).anyTimes(); - EasyMock.expect(mockPeon.getSegmentsInQueue()).andReturn(Collections.emptySet()).anyTimes(); - EasyMock.expect(mockPeon.getSegmentsMarkedToDrop()).andReturn(Collections.emptySet()).anyTimes(); - EasyMock.expect(mockPeon.getSizeOfSegmentsToLoad()).andReturn(0L).anyTimes(); - - return mockPeon; - } - - private static LoadQueuePeon createLoadingPeon(DataSegment segment, boolean slowLoading) - { - final Set segs = Collections.singleton(segment); - - final LoadQueuePeon mockPeon = EasyMock.createMock(LoadQueuePeon.class); - EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(segs).anyTimes(); - EasyMock.expect(mockPeon.getSegmentsMarkedToDrop()).andReturn(Collections.emptySet()).anyTimes(); - EasyMock.expect(mockPeon.getSegmentsToDrop()).andReturn(Collections.emptySet()).anyTimes(); - EasyMock.expect(mockPeon.getSegmentsInQueue()) - .andReturn(Collections.singleton(new SegmentHolder(segment, SegmentAction.LOAD, null))).anyTimes(); - - EasyMock.expect(mockPeon.getTimedOutSegments()) - .andReturn(slowLoading ? segs : Collections.emptySet()).anyTimes(); - - return mockPeon; - } - - private DruidServer createServer(String tier) + private DruidServer createDruidServer(String tier) { final String serverName = "hist_" + tier + "_" + serverId.incrementAndGet(); return new DruidServer(serverName, serverName, null, 10L << 30, ServerType.HISTORICAL, tier, 0); } - private static LoadQueuePeon createOneCallPeonMock() + private ServerHolder createServer(String tier, DataSegment... segments) { - final LoadQueuePeon mockPeon2 = createEmptyPeon(); - mockPeon2.loadSegment(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject()); - EasyMock.expectLastCall().once(); - return mockPeon2; + final DruidServer server = createDruidServer(tier); + for (DataSegment segment : segments) { + server.addDataSegment(segment); + } + + return new ServerHolder( + server.toImmutableDruidServer(), + new TestLoadQueuePeon() + ); } - private ServerHolder createServerHolder(String tier, LoadQueuePeon loadQueuePeon, boolean isDecommissioning) + private ServerHolder createDecommissioningServer(String tier, DataSegment... segments) { + final DruidServer server = createDruidServer(tier); + for (DataSegment segment : segments) { + server.addDataSegment(segment); + } + return new ServerHolder( - createServer(tier).toImmutableDruidServer(), - loadQueuePeon, - isDecommissioning + server.toImmutableDruidServer(), + new TestLoadQueuePeon(), + true ); }