Fix balancer strategy (#10070)

* fix server overassignment

* fix random balancer strategy, add more tests

* comment

* added more tests

* fix forbidden apis

* fix typo
This commit is contained in:
Parag Jain 2020-06-25 16:45:00 +05:30 committed by GitHub
parent ec1f443a5c
commit 422a8af14e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 406 additions and 11 deletions

View File

@ -367,7 +367,8 @@ public class CostBalancerStrategy implements BalancerStrategy
final boolean includeCurrentServer
)
{
Pair<Double, ServerHolder> bestServer = Pair.of(Double.POSITIVE_INFINITY, null);
final Pair<Double, ServerHolder> noServer = Pair.of(Double.POSITIVE_INFINITY, null);
Pair<Double, ServerHolder> bestServer = noServer;
List<ListenableFuture<Pair<Double, ServerHolder>>> futures = new ArrayList<>();
@ -391,7 +392,11 @@ public class CostBalancerStrategy implements BalancerStrategy
bestServers.add(server);
}
}
// If the best server list contains server whose cost of serving the segment is INFINITE then this means
// no usable servers are found so return a null server so that segment assignment does not happen
if (bestServers.get(0).lhs.isInfinite()) {
return noServer;
}
// Randomly choose a server from the best servers
bestServer = bestServers.get(ThreadLocalRandom.current().nextInt(bestServers.size()));
}

View File

@ -28,20 +28,22 @@ import java.util.List;
import java.util.NavigableSet;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
public class RandomBalancerStrategy implements BalancerStrategy
{
@Override
public ServerHolder findNewSegmentHomeReplicator(DataSegment proposalSegment, List<ServerHolder> serverHolders)
{
if (serverHolders.size() == 1) {
// filter out servers whose avaialable size is less than required for this segment and those already serving this segment
final List<ServerHolder> usableServerHolders = serverHolders.stream().filter(
serverHolder -> serverHolder.getAvailableSize() >= proposalSegment.getSize() && !serverHolder.isServingSegment(
proposalSegment)
).collect(Collectors.toList());
if (usableServerHolders.size() == 0) {
return null;
} else {
ServerHolder holder = serverHolders.get(ThreadLocalRandom.current().nextInt(serverHolders.size()));
while (holder.isServingSegment(proposalSegment)) {
holder = serverHolders.get(ThreadLocalRandom.current().nextInt(serverHolders.size()));
}
return holder;
return usableServerHolders.get(ThreadLocalRandom.current().nextInt(usableServerHolders.size()));
}
}

View File

@ -0,0 +1,124 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.coordinator;
import org.apache.druid.client.DruidServer;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NoneShardSpec;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
@RunWith(Parameterized.class)
public class BalancerStrategyTest
{
private final BalancerStrategy balancerStrategy;
private DataSegment proposedDataSegment;
private List<ServerHolder> serverHolders;
@Parameterized.Parameters(name = "{index}: BalancerStrategy:{0}")
public static Iterable<Object[]> data()
{
return Arrays.asList(
new Object[][]{
{new CostBalancerStrategy(Execs.directExecutor())},
{new RandomBalancerStrategy()}
}
);
}
public BalancerStrategyTest(BalancerStrategy balancerStrategy)
{
this.balancerStrategy = balancerStrategy;
}
@Before
public void setUp()
{
this.proposedDataSegment = new DataSegment(
"datasource1",
Intervals.utc(0, 1),
"",
new HashMap<>(),
new ArrayList<>(),
new ArrayList<>(),
NoneShardSpec.instance(),
0,
11L
);
}
@Test
public void findNewSegmentHomeReplicatorNotEnoughSpace()
{
final ServerHolder serverHolder = new ServerHolder(
new DruidServer("server1", "host1", null, 10L, ServerType.HISTORICAL, DruidServer.DEFAULT_TIER, 0).addDataSegment(proposedDataSegment).toImmutableDruidServer(),
new LoadQueuePeonTester());
serverHolders = new ArrayList<>();
serverHolders.add(serverHolder);
final ServerHolder foundServerHolder = balancerStrategy.findNewSegmentHomeReplicator(proposedDataSegment, serverHolders);
// since there is not enough space on server having available size 10L to host a segment of size 11L, it should be null
Assert.assertNull(foundServerHolder);
}
@Test(timeout = 5000L)
public void findNewSegmentHomeReplicatorNotEnoughNodesForReplication()
{
final ServerHolder serverHolder1 = new ServerHolder(
new DruidServer("server1", "host1", null, 1000L, ServerType.HISTORICAL, DruidServer.DEFAULT_TIER, 0).addDataSegment(proposedDataSegment).toImmutableDruidServer(),
new LoadQueuePeonTester());
final ServerHolder serverHolder2 = new ServerHolder(
new DruidServer("server2", "host2", null, 1000L, ServerType.HISTORICAL, DruidServer.DEFAULT_TIER, 0).addDataSegment(proposedDataSegment).toImmutableDruidServer(),
new LoadQueuePeonTester());
serverHolders = new ArrayList<>();
serverHolders.add(serverHolder1);
serverHolders.add(serverHolder2);
final ServerHolder foundServerHolder = balancerStrategy.findNewSegmentHomeReplicator(proposedDataSegment, serverHolders);
// since there is not enough nodes to load 3 replicas of segment
Assert.assertNull(foundServerHolder);
}
@Test
public void findNewSegmentHomeReplicatorEnoughSpace()
{
final ServerHolder serverHolder = new ServerHolder(
new DruidServer("server1", "host1", null, 1000L, ServerType.HISTORICAL, DruidServer.DEFAULT_TIER, 0).toImmutableDruidServer(),
new LoadQueuePeonTester());
serverHolders = new ArrayList<>();
serverHolders.add(serverHolder);
final ServerHolder foundServerHolder = balancerStrategy.findNewSegmentHomeReplicator(proposedDataSegment, serverHolders);
// since there is enough space on server it should be selected
Assert.assertEquals(serverHolder, foundServerHolder);
}
}

View File

@ -37,6 +37,7 @@ import org.apache.druid.server.coordinator.duty.RunRules;
import org.apache.druid.server.coordinator.rules.ForeverLoadRule;
import org.apache.druid.server.coordinator.rules.IntervalDropRule;
import org.apache.druid.server.coordinator.rules.IntervalLoadRule;
import org.apache.druid.server.coordinator.rules.LoadRule;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NoneShardSpec;
import org.easymock.EasyMock;
@ -193,17 +194,31 @@ public class RunRulesTest
BalancerStrategy balancerStrategy
)
{
return createCoordinatorRuntimeParams(druidCluster)
.withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster()))
return makeCoordinatorRuntimeParams(druidCluster, balancerStrategy, usedSegments);
}
private DruidCoordinatorRuntimeParams.Builder makeCoordinatorRuntimeParams(
DruidCluster druidCluster,
BalancerStrategy balancerStrategy,
List<DataSegment> dataSegments
)
{
return createCoordinatorRuntimeParams(druidCluster, dataSegments)
.withSegmentReplicantLookup(SegmentReplicantLookup.make(druidCluster))
.withBalancerStrategy(balancerStrategy);
}
private DruidCoordinatorRuntimeParams.Builder createCoordinatorRuntimeParams(DruidCluster druidCluster)
{
return createCoordinatorRuntimeParams(druidCluster, usedSegments);
}
private DruidCoordinatorRuntimeParams.Builder createCoordinatorRuntimeParams(DruidCluster druidCluster, List<DataSegment> dataSegments)
{
return CoordinatorRuntimeParamsTestHelpers
.newBuilder()
.withDruidCluster(druidCluster)
.withUsedSegmentsInTest(usedSegments)
.withUsedSegmentsInTest(dataSegments)
.withDatabaseRuleManager(databaseRuleManager);
}
@ -1067,6 +1082,255 @@ public class RunRulesTest
exec.shutdown();
}
/**
* Tier - __default_tier
* Nodes - 2
* Replicants - 3
* Random balancer strategy should not assign anything and not get into loop as there are not enough nodes for replication
*/
@Test(timeout = 5000L)
public void testTwoNodesOneTierThreeReplicantsRandomStrategyNotEnoughNodes()
{
mockCoordinator();
mockEmptyPeon();
EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.anyObject())).andReturn(
Collections.singletonList(
new ForeverLoadRule(
ImmutableMap.of(DruidServer.DEFAULT_TIER, 3)
)
)).atLeastOnce();
EasyMock.replay(databaseRuleManager);
DataSegment dataSegment = new DataSegment(
"test",
Intervals.utc(0, 1),
DateTimes.nowUtc().toString(),
new HashMap<>(),
new ArrayList<>(),
new ArrayList<>(),
NoneShardSpec.instance(),
IndexIO.CURRENT_VERSION_ID,
1
);
DruidCluster druidCluster = DruidClusterBuilder
.newBuilder()
.addTier(
DruidServer.DEFAULT_TIER,
new ServerHolder(
new DruidServer("server1", "host1", null, 1000, ServerType.HISTORICAL, DruidServer.DEFAULT_TIER, 0).addDataSegment(dataSegment)
.toImmutableDruidServer(),
mockPeon
),
new ServerHolder(
new DruidServer("server2", "host2", null, 1000, ServerType.HISTORICAL, DruidServer.DEFAULT_TIER, 0).addDataSegment(dataSegment)
.toImmutableDruidServer(),
mockPeon
)
)
.build();
RandomBalancerStrategy balancerStrategy = new RandomBalancerStrategy();
DruidCoordinatorRuntimeParams params = makeCoordinatorRuntimeParams(druidCluster, balancerStrategy, Collections.singletonList(dataSegment))
.withDynamicConfigs(CoordinatorDynamicConfig.builder().withMaxSegmentsToMove(5).build())
.build();
DruidCoordinatorRuntimeParams afterParams = ruleRunner.run(params);
CoordinatorStats stats = afterParams.getCoordinatorStats();
Assert.assertEquals(0L, stats.getTieredStat("assignedCount", DruidServer.DEFAULT_TIER));
Assert.assertTrue(stats.getTiers("unassignedCount").isEmpty());
Assert.assertTrue(stats.getTiers("unassignedSize").isEmpty());
EasyMock.verify(mockPeon);
}
/**
* Tier - __default_tier
* Nodes - 1
* Replicants - 1
* Random balancer strategy should select the only node
*/
@Test(timeout = 5000L)
public void testOneNodesOneTierOneReplicantRandomStrategyEnoughSpace()
{
mockCoordinator();
mockPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject());
EasyMock.expectLastCall().atLeastOnce();
mockEmptyPeon();
EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.anyObject())).andReturn(
Collections.singletonList(
new ForeverLoadRule(
ImmutableMap.of(DruidServer.DEFAULT_TIER, 1)
)
)).atLeastOnce();
EasyMock.replay(databaseRuleManager);
DataSegment dataSegment = new DataSegment(
"test",
Intervals.utc(0, 1),
DateTimes.nowUtc().toString(),
new HashMap<>(),
new ArrayList<>(),
new ArrayList<>(),
NoneShardSpec.instance(),
IndexIO.CURRENT_VERSION_ID,
1
);
DruidCluster druidCluster = DruidClusterBuilder
.newBuilder()
.addTier(
DruidServer.DEFAULT_TIER,
new ServerHolder(
new DruidServer("server1", "host1", null, 1000, ServerType.HISTORICAL, DruidServer.DEFAULT_TIER, 0)
.toImmutableDruidServer(),
mockPeon
)
)
.build();
RandomBalancerStrategy balancerStrategy = new RandomBalancerStrategy();
DruidCoordinatorRuntimeParams params = makeCoordinatorRuntimeParams(druidCluster, balancerStrategy, Collections.singletonList(dataSegment))
.withDynamicConfigs(CoordinatorDynamicConfig.builder().withMaxSegmentsToMove(5).build())
.build();
DruidCoordinatorRuntimeParams afterParams = ruleRunner.run(params);
CoordinatorStats stats = afterParams.getCoordinatorStats();
Assert.assertEquals(1L, stats.getTieredStat("assignedCount", DruidServer.DEFAULT_TIER));
Assert.assertTrue(stats.getTiers("unassignedCount").isEmpty());
Assert.assertTrue(stats.getTiers("unassignedSize").isEmpty());
EasyMock.verify(mockPeon);
}
/**
* Tier - __default_tier
* Nodes - 1
* Replicants - 1
* Random balancer strategy should not assign anything as there is not enough space
*/
@Test(timeout = 5000L)
public void testOneNodesOneTierOneReplicantRandomStrategyNotEnoughSpace()
{
mockCoordinator();
mockEmptyPeon();
int numReplicants = 1;
EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.anyObject())).andReturn(
Collections.singletonList(
new ForeverLoadRule(
ImmutableMap.of(DruidServer.DEFAULT_TIER, numReplicants)
)
)).atLeastOnce();
EasyMock.replay(databaseRuleManager);
DataSegment dataSegment = new DataSegment(
"test",
Intervals.utc(0, 1),
DateTimes.nowUtc().toString(),
new HashMap<>(),
new ArrayList<>(),
new ArrayList<>(),
NoneShardSpec.instance(),
IndexIO.CURRENT_VERSION_ID,
11
);
DruidCluster druidCluster = DruidClusterBuilder
.newBuilder()
.addTier(
DruidServer.DEFAULT_TIER,
new ServerHolder(
new DruidServer("server1", "host1", null, 10, ServerType.HISTORICAL, DruidServer.DEFAULT_TIER, 0)
.toImmutableDruidServer(),
mockPeon
)
)
.build();
RandomBalancerStrategy balancerStrategy = new RandomBalancerStrategy();
DruidCoordinatorRuntimeParams params = makeCoordinatorRuntimeParams(druidCluster, balancerStrategy, Collections.singletonList(dataSegment))
.withDynamicConfigs(CoordinatorDynamicConfig.builder().withMaxSegmentsToMove(5).build())
.build();
DruidCoordinatorRuntimeParams afterParams = ruleRunner.run(params);
CoordinatorStats stats = afterParams.getCoordinatorStats();
Assert.assertEquals(dataSegment.getSize() * numReplicants, stats.getTieredStat(LoadRule.REQUIRED_CAPACITY, DruidServer.DEFAULT_TIER));
Assert.assertTrue(stats.getTiers("assignedCount").isEmpty()); // since primary assignment failed
Assert.assertTrue(stats.getTiers("unassignedCount").isEmpty());
Assert.assertTrue(stats.getTiers("unassignedSize").isEmpty());
EasyMock.verify(mockPeon);
}
/**
* Tier - __default_tier
* Nodes - 1
* Replicants - 1
* Cost balancer strategy should not assign anything as there is not enough space
*/
@Test
public void testOneNodesOneTierOneReplicantCostBalancerStrategyNotEnoughSpace()
{
mockCoordinator();
mockEmptyPeon();
int numReplicants = 1;
EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.anyObject())).andReturn(
Collections.singletonList(
new ForeverLoadRule(
ImmutableMap.of(DruidServer.DEFAULT_TIER, numReplicants)
)
)).atLeastOnce();
EasyMock.replay(databaseRuleManager);
DataSegment dataSegment = new DataSegment(
"test",
Intervals.utc(0, 1),
DateTimes.nowUtc().toString(),
new HashMap<>(),
new ArrayList<>(),
new ArrayList<>(),
NoneShardSpec.instance(),
IndexIO.CURRENT_VERSION_ID,
11
);
DruidCluster druidCluster = DruidClusterBuilder
.newBuilder()
.addTier(
DruidServer.DEFAULT_TIER,
new ServerHolder(
new DruidServer("server1", "host1", null, 10, ServerType.HISTORICAL, DruidServer.DEFAULT_TIER, 0)
.toImmutableDruidServer(),
mockPeon
)
)
.build();
ListeningExecutorService exec = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1));
CostBalancerStrategy balancerStrategy = new CostBalancerStrategy(exec);
DruidCoordinatorRuntimeParams params = makeCoordinatorRuntimeParams(druidCluster, balancerStrategy, Collections.singletonList(dataSegment))
.withDynamicConfigs(CoordinatorDynamicConfig.builder().withMaxSegmentsToMove(5).build())
.build();
DruidCoordinatorRuntimeParams afterParams = ruleRunner.run(params);
CoordinatorStats stats = afterParams.getCoordinatorStats();
Assert.assertEquals(dataSegment.getSize() * numReplicants, stats.getTieredStat(LoadRule.REQUIRED_CAPACITY, DruidServer.DEFAULT_TIER));
Assert.assertTrue(stats.getTiers("assignedCount").isEmpty()); // since primary assignment should fail
Assert.assertTrue(stats.getTiers("unassignedCount").isEmpty());
Assert.assertTrue(stats.getTiers("unassignedSize").isEmpty());
exec.shutdown();
EasyMock.verify(mockPeon);
}
private void mockCoordinator()
{
EasyMock.expect(coordinator.getDynamicConfigs()).andReturn(createCoordinatorDynamicConfig()).anyTimes();