diff --git a/server/src/main/java/org/apache/druid/server/coordinator/CostBalancerStrategy.java b/server/src/main/java/org/apache/druid/server/coordinator/CostBalancerStrategy.java index 5d656d643f9..e5e3cb57b12 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/CostBalancerStrategy.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/CostBalancerStrategy.java @@ -367,7 +367,8 @@ public class CostBalancerStrategy implements BalancerStrategy final boolean includeCurrentServer ) { - Pair bestServer = Pair.of(Double.POSITIVE_INFINITY, null); + final Pair noServer = Pair.of(Double.POSITIVE_INFINITY, null); + Pair bestServer = noServer; List>> futures = new ArrayList<>(); @@ -391,7 +392,11 @@ public class CostBalancerStrategy implements BalancerStrategy bestServers.add(server); } } - + // If the best server list contains server whose cost of serving the segment is INFINITE then this means + // no usable servers are found so return a null server so that segment assignment does not happen + if (bestServers.get(0).lhs.isInfinite()) { + return noServer; + } // Randomly choose a server from the best servers bestServer = bestServers.get(ThreadLocalRandom.current().nextInt(bestServers.size())); } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/RandomBalancerStrategy.java b/server/src/main/java/org/apache/druid/server/coordinator/RandomBalancerStrategy.java index 72fdedf6e45..de3e46e66f4 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/RandomBalancerStrategy.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/RandomBalancerStrategy.java @@ -28,20 +28,22 @@ import java.util.List; import java.util.NavigableSet; import java.util.Set; import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.Collectors; public class RandomBalancerStrategy implements BalancerStrategy { @Override public ServerHolder findNewSegmentHomeReplicator(DataSegment proposalSegment, List serverHolders) { - if (serverHolders.size() == 1) { + // filter out servers whose avaialable size is less than required for this segment and those already serving this segment + final List usableServerHolders = serverHolders.stream().filter( + serverHolder -> serverHolder.getAvailableSize() >= proposalSegment.getSize() && !serverHolder.isServingSegment( + proposalSegment) + ).collect(Collectors.toList()); + if (usableServerHolders.size() == 0) { return null; } else { - ServerHolder holder = serverHolders.get(ThreadLocalRandom.current().nextInt(serverHolders.size())); - while (holder.isServingSegment(proposalSegment)) { - holder = serverHolders.get(ThreadLocalRandom.current().nextInt(serverHolders.size())); - } - return holder; + return usableServerHolders.get(ThreadLocalRandom.current().nextInt(usableServerHolders.size())); } } diff --git a/server/src/test/java/org/apache/druid/server/coordinator/BalancerStrategyTest.java b/server/src/test/java/org/apache/druid/server/coordinator/BalancerStrategyTest.java new file mode 100644 index 00000000000..b4d3ac5d11d --- /dev/null +++ b/server/src/test/java/org/apache/druid/server/coordinator/BalancerStrategyTest.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordinator; + +import org.apache.druid.client.DruidServer; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.concurrent.Execs; +import org.apache.druid.server.coordination.ServerType; +import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.partition.NoneShardSpec; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; + +@RunWith(Parameterized.class) +public class BalancerStrategyTest +{ + private final BalancerStrategy balancerStrategy; + private DataSegment proposedDataSegment; + private List serverHolders; + + @Parameterized.Parameters(name = "{index}: BalancerStrategy:{0}") + public static Iterable data() + { + return Arrays.asList( + new Object[][]{ + {new CostBalancerStrategy(Execs.directExecutor())}, + {new RandomBalancerStrategy()} + } + ); + } + + public BalancerStrategyTest(BalancerStrategy balancerStrategy) + { + this.balancerStrategy = balancerStrategy; + } + + @Before + public void setUp() + { + this.proposedDataSegment = new DataSegment( + "datasource1", + Intervals.utc(0, 1), + "", + new HashMap<>(), + new ArrayList<>(), + new ArrayList<>(), + NoneShardSpec.instance(), + 0, + 11L + ); + } + + + @Test + public void findNewSegmentHomeReplicatorNotEnoughSpace() + { + final ServerHolder serverHolder = new ServerHolder( + new DruidServer("server1", "host1", null, 10L, ServerType.HISTORICAL, DruidServer.DEFAULT_TIER, 0).addDataSegment(proposedDataSegment).toImmutableDruidServer(), + new LoadQueuePeonTester()); + serverHolders = new ArrayList<>(); + serverHolders.add(serverHolder); + final ServerHolder foundServerHolder = balancerStrategy.findNewSegmentHomeReplicator(proposedDataSegment, serverHolders); + // since there is not enough space on server having available size 10L to host a segment of size 11L, it should be null + Assert.assertNull(foundServerHolder); + } + + @Test(timeout = 5000L) + public void findNewSegmentHomeReplicatorNotEnoughNodesForReplication() + { + final ServerHolder serverHolder1 = new ServerHolder( + new DruidServer("server1", "host1", null, 1000L, ServerType.HISTORICAL, DruidServer.DEFAULT_TIER, 0).addDataSegment(proposedDataSegment).toImmutableDruidServer(), + new LoadQueuePeonTester()); + + final ServerHolder serverHolder2 = new ServerHolder( + new DruidServer("server2", "host2", null, 1000L, ServerType.HISTORICAL, DruidServer.DEFAULT_TIER, 0).addDataSegment(proposedDataSegment).toImmutableDruidServer(), + new LoadQueuePeonTester()); + + serverHolders = new ArrayList<>(); + serverHolders.add(serverHolder1); + serverHolders.add(serverHolder2); + + final ServerHolder foundServerHolder = balancerStrategy.findNewSegmentHomeReplicator(proposedDataSegment, serverHolders); + // since there is not enough nodes to load 3 replicas of segment + Assert.assertNull(foundServerHolder); + } + + @Test + public void findNewSegmentHomeReplicatorEnoughSpace() + { + final ServerHolder serverHolder = new ServerHolder( + new DruidServer("server1", "host1", null, 1000L, ServerType.HISTORICAL, DruidServer.DEFAULT_TIER, 0).toImmutableDruidServer(), + new LoadQueuePeonTester()); + serverHolders = new ArrayList<>(); + serverHolders.add(serverHolder); + final ServerHolder foundServerHolder = balancerStrategy.findNewSegmentHomeReplicator(proposedDataSegment, serverHolders); + // since there is enough space on server it should be selected + Assert.assertEquals(serverHolder, foundServerHolder); + } +} diff --git a/server/src/test/java/org/apache/druid/server/coordinator/RunRulesTest.java b/server/src/test/java/org/apache/druid/server/coordinator/RunRulesTest.java index 96f38c3a852..e138111c76e 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/RunRulesTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/RunRulesTest.java @@ -37,6 +37,7 @@ import org.apache.druid.server.coordinator.duty.RunRules; import org.apache.druid.server.coordinator.rules.ForeverLoadRule; import org.apache.druid.server.coordinator.rules.IntervalDropRule; import org.apache.druid.server.coordinator.rules.IntervalLoadRule; +import org.apache.druid.server.coordinator.rules.LoadRule; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.NoneShardSpec; import org.easymock.EasyMock; @@ -193,17 +194,31 @@ public class RunRulesTest BalancerStrategy balancerStrategy ) { - return createCoordinatorRuntimeParams(druidCluster) - .withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster())) + return makeCoordinatorRuntimeParams(druidCluster, balancerStrategy, usedSegments); + } + + private DruidCoordinatorRuntimeParams.Builder makeCoordinatorRuntimeParams( + DruidCluster druidCluster, + BalancerStrategy balancerStrategy, + List dataSegments + ) + { + return createCoordinatorRuntimeParams(druidCluster, dataSegments) + .withSegmentReplicantLookup(SegmentReplicantLookup.make(druidCluster)) .withBalancerStrategy(balancerStrategy); } private DruidCoordinatorRuntimeParams.Builder createCoordinatorRuntimeParams(DruidCluster druidCluster) + { + return createCoordinatorRuntimeParams(druidCluster, usedSegments); + } + + private DruidCoordinatorRuntimeParams.Builder createCoordinatorRuntimeParams(DruidCluster druidCluster, List dataSegments) { return CoordinatorRuntimeParamsTestHelpers .newBuilder() .withDruidCluster(druidCluster) - .withUsedSegmentsInTest(usedSegments) + .withUsedSegmentsInTest(dataSegments) .withDatabaseRuleManager(databaseRuleManager); } @@ -1067,6 +1082,255 @@ public class RunRulesTest exec.shutdown(); } + /** + * Tier - __default_tier + * Nodes - 2 + * Replicants - 3 + * Random balancer strategy should not assign anything and not get into loop as there are not enough nodes for replication + */ + @Test(timeout = 5000L) + public void testTwoNodesOneTierThreeReplicantsRandomStrategyNotEnoughNodes() + { + mockCoordinator(); + mockEmptyPeon(); + + EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.anyObject())).andReturn( + Collections.singletonList( + new ForeverLoadRule( + ImmutableMap.of(DruidServer.DEFAULT_TIER, 3) + ) + )).atLeastOnce(); + EasyMock.replay(databaseRuleManager); + + DataSegment dataSegment = new DataSegment( + "test", + Intervals.utc(0, 1), + DateTimes.nowUtc().toString(), + new HashMap<>(), + new ArrayList<>(), + new ArrayList<>(), + NoneShardSpec.instance(), + IndexIO.CURRENT_VERSION_ID, + 1 + ); + + DruidCluster druidCluster = DruidClusterBuilder + .newBuilder() + .addTier( + DruidServer.DEFAULT_TIER, + new ServerHolder( + new DruidServer("server1", "host1", null, 1000, ServerType.HISTORICAL, DruidServer.DEFAULT_TIER, 0).addDataSegment(dataSegment) + .toImmutableDruidServer(), + mockPeon + ), + new ServerHolder( + new DruidServer("server2", "host2", null, 1000, ServerType.HISTORICAL, DruidServer.DEFAULT_TIER, 0).addDataSegment(dataSegment) + .toImmutableDruidServer(), + mockPeon + ) + ) + .build(); + + RandomBalancerStrategy balancerStrategy = new RandomBalancerStrategy(); + + DruidCoordinatorRuntimeParams params = makeCoordinatorRuntimeParams(druidCluster, balancerStrategy, Collections.singletonList(dataSegment)) + .withDynamicConfigs(CoordinatorDynamicConfig.builder().withMaxSegmentsToMove(5).build()) + .build(); + + DruidCoordinatorRuntimeParams afterParams = ruleRunner.run(params); + CoordinatorStats stats = afterParams.getCoordinatorStats(); + + Assert.assertEquals(0L, stats.getTieredStat("assignedCount", DruidServer.DEFAULT_TIER)); + Assert.assertTrue(stats.getTiers("unassignedCount").isEmpty()); + Assert.assertTrue(stats.getTiers("unassignedSize").isEmpty()); + + EasyMock.verify(mockPeon); + } + + + /** + * Tier - __default_tier + * Nodes - 1 + * Replicants - 1 + * Random balancer strategy should select the only node + */ + @Test(timeout = 5000L) + public void testOneNodesOneTierOneReplicantRandomStrategyEnoughSpace() + { + mockCoordinator(); + mockPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject()); + EasyMock.expectLastCall().atLeastOnce(); + mockEmptyPeon(); + + EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.anyObject())).andReturn( + Collections.singletonList( + new ForeverLoadRule( + ImmutableMap.of(DruidServer.DEFAULT_TIER, 1) + ) + )).atLeastOnce(); + EasyMock.replay(databaseRuleManager); + + DataSegment dataSegment = new DataSegment( + "test", + Intervals.utc(0, 1), + DateTimes.nowUtc().toString(), + new HashMap<>(), + new ArrayList<>(), + new ArrayList<>(), + NoneShardSpec.instance(), + IndexIO.CURRENT_VERSION_ID, + 1 + ); + + DruidCluster druidCluster = DruidClusterBuilder + .newBuilder() + .addTier( + DruidServer.DEFAULT_TIER, + new ServerHolder( + new DruidServer("server1", "host1", null, 1000, ServerType.HISTORICAL, DruidServer.DEFAULT_TIER, 0) + .toImmutableDruidServer(), + mockPeon + ) + ) + .build(); + + RandomBalancerStrategy balancerStrategy = new RandomBalancerStrategy(); + + DruidCoordinatorRuntimeParams params = makeCoordinatorRuntimeParams(druidCluster, balancerStrategy, Collections.singletonList(dataSegment)) + .withDynamicConfigs(CoordinatorDynamicConfig.builder().withMaxSegmentsToMove(5).build()) + .build(); + + DruidCoordinatorRuntimeParams afterParams = ruleRunner.run(params); + CoordinatorStats stats = afterParams.getCoordinatorStats(); + Assert.assertEquals(1L, stats.getTieredStat("assignedCount", DruidServer.DEFAULT_TIER)); + Assert.assertTrue(stats.getTiers("unassignedCount").isEmpty()); + Assert.assertTrue(stats.getTiers("unassignedSize").isEmpty()); + + EasyMock.verify(mockPeon); + } + + /** + * Tier - __default_tier + * Nodes - 1 + * Replicants - 1 + * Random balancer strategy should not assign anything as there is not enough space + */ + @Test(timeout = 5000L) + public void testOneNodesOneTierOneReplicantRandomStrategyNotEnoughSpace() + { + mockCoordinator(); + mockEmptyPeon(); + int numReplicants = 1; + EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.anyObject())).andReturn( + Collections.singletonList( + new ForeverLoadRule( + ImmutableMap.of(DruidServer.DEFAULT_TIER, numReplicants) + ) + )).atLeastOnce(); + EasyMock.replay(databaseRuleManager); + + DataSegment dataSegment = new DataSegment( + "test", + Intervals.utc(0, 1), + DateTimes.nowUtc().toString(), + new HashMap<>(), + new ArrayList<>(), + new ArrayList<>(), + NoneShardSpec.instance(), + IndexIO.CURRENT_VERSION_ID, + 11 + ); + + DruidCluster druidCluster = DruidClusterBuilder + .newBuilder() + .addTier( + DruidServer.DEFAULT_TIER, + new ServerHolder( + new DruidServer("server1", "host1", null, 10, ServerType.HISTORICAL, DruidServer.DEFAULT_TIER, 0) + .toImmutableDruidServer(), + mockPeon + ) + ) + .build(); + + RandomBalancerStrategy balancerStrategy = new RandomBalancerStrategy(); + + DruidCoordinatorRuntimeParams params = makeCoordinatorRuntimeParams(druidCluster, balancerStrategy, Collections.singletonList(dataSegment)) + .withDynamicConfigs(CoordinatorDynamicConfig.builder().withMaxSegmentsToMove(5).build()) + .build(); + + DruidCoordinatorRuntimeParams afterParams = ruleRunner.run(params); + CoordinatorStats stats = afterParams.getCoordinatorStats(); + Assert.assertEquals(dataSegment.getSize() * numReplicants, stats.getTieredStat(LoadRule.REQUIRED_CAPACITY, DruidServer.DEFAULT_TIER)); + Assert.assertTrue(stats.getTiers("assignedCount").isEmpty()); // since primary assignment failed + Assert.assertTrue(stats.getTiers("unassignedCount").isEmpty()); + Assert.assertTrue(stats.getTiers("unassignedSize").isEmpty()); + + EasyMock.verify(mockPeon); + } + + /** + * Tier - __default_tier + * Nodes - 1 + * Replicants - 1 + * Cost balancer strategy should not assign anything as there is not enough space + */ + @Test + public void testOneNodesOneTierOneReplicantCostBalancerStrategyNotEnoughSpace() + { + mockCoordinator(); + mockEmptyPeon(); + int numReplicants = 1; + EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.anyObject())).andReturn( + Collections.singletonList( + new ForeverLoadRule( + ImmutableMap.of(DruidServer.DEFAULT_TIER, numReplicants) + ) + )).atLeastOnce(); + EasyMock.replay(databaseRuleManager); + + DataSegment dataSegment = new DataSegment( + "test", + Intervals.utc(0, 1), + DateTimes.nowUtc().toString(), + new HashMap<>(), + new ArrayList<>(), + new ArrayList<>(), + NoneShardSpec.instance(), + IndexIO.CURRENT_VERSION_ID, + 11 + ); + + DruidCluster druidCluster = DruidClusterBuilder + .newBuilder() + .addTier( + DruidServer.DEFAULT_TIER, + new ServerHolder( + new DruidServer("server1", "host1", null, 10, ServerType.HISTORICAL, DruidServer.DEFAULT_TIER, 0) + .toImmutableDruidServer(), + mockPeon + ) + ) + .build(); + + ListeningExecutorService exec = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1)); + CostBalancerStrategy balancerStrategy = new CostBalancerStrategy(exec); + + DruidCoordinatorRuntimeParams params = makeCoordinatorRuntimeParams(druidCluster, balancerStrategy, Collections.singletonList(dataSegment)) + .withDynamicConfigs(CoordinatorDynamicConfig.builder().withMaxSegmentsToMove(5).build()) + .build(); + + DruidCoordinatorRuntimeParams afterParams = ruleRunner.run(params); + CoordinatorStats stats = afterParams.getCoordinatorStats(); + Assert.assertEquals(dataSegment.getSize() * numReplicants, stats.getTieredStat(LoadRule.REQUIRED_CAPACITY, DruidServer.DEFAULT_TIER)); + Assert.assertTrue(stats.getTiers("assignedCount").isEmpty()); // since primary assignment should fail + Assert.assertTrue(stats.getTiers("unassignedCount").isEmpty()); + Assert.assertTrue(stats.getTiers("unassignedSize").isEmpty()); + + exec.shutdown(); + EasyMock.verify(mockPeon); + } + private void mockCoordinator() { EasyMock.expect(coordinator.getDynamicConfigs()).andReturn(createCoordinatorDynamicConfig()).anyTimes();