Internal: Flatten Allocation modules and add back ability to plugin ShardsAllocators

There were two submodules of AllocationModule. This combines them into a
single module, adds a base test case for module testing, and adds back
the ability for plugins to provide custom ShardsAllocators.

closes #12781
This commit is contained in:
Ryan Ernst 2015-08-10 15:00:07 -07:00
parent 152178f6b0
commit 50ba3bcdd6
8 changed files with 286 additions and 175 deletions

View File

@ -19,35 +19,120 @@
package org.elasticsearch.cluster.routing.allocation;
import com.google.common.collect.ImmutableList;
import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocatorModule;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecidersModule;
import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
import org.elasticsearch.cluster.routing.allocation.decider.AwarenessAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.ConcurrentRebalanceAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.DisableAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider;
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.NodeVersionAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.RebalanceOnlyWhenActiveAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.ReplicaAfterPrimaryActiveAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.SameShardAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.SnapshotInProgressAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.inject.SpawnModules;
import org.elasticsearch.common.inject.multibindings.Multibinder;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.gateway.GatewayAllocator;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* The {@link AllocationModule} manages several
* modules related to the allocation process. To do so
* it manages a {@link ShardsAllocatorModule} and an {@link AllocationDecidersModule}.
* A module to setup classes related to shard allocation.
*
* There are two basic concepts for allocation.
* <ul>
* <li>An {@link AllocationDecider} decides *when* an allocation should be attempted.</li>
* <li>A {@link ShardsAllocator} determins *how* an allocation takes place</li>
* </ul>
*/
public class AllocationModule extends AbstractModule implements SpawnModules {
public class AllocationModule extends AbstractModule {
public static final String EVEN_SHARD_COUNT_ALLOCATOR = "even_shard";
public static final String BALANCED_ALLOCATOR = "balanced"; // default
public static final String SHARDS_ALLOCATOR_TYPE_KEY = "cluster.routing.allocation.type";
public static final List<Class<? extends AllocationDecider>> DEFAULT_ALLOCATION_DECIDERS =
Collections.unmodifiableList(Arrays.asList(
SameShardAllocationDecider.class,
FilterAllocationDecider.class,
ReplicaAfterPrimaryActiveAllocationDecider.class,
ThrottlingAllocationDecider.class,
RebalanceOnlyWhenActiveAllocationDecider.class,
ClusterRebalanceAllocationDecider.class,
ConcurrentRebalanceAllocationDecider.class,
EnableAllocationDecider.class, // new enable allocation logic should proceed old disable allocation logic
DisableAllocationDecider.class,
AwarenessAllocationDecider.class,
ShardsLimitAllocationDecider.class,
NodeVersionAllocationDecider.class,
DiskThresholdDecider.class,
SnapshotInProgressAllocationDecider.class));
private final Settings settings;
private final Map<String, Class<? extends ShardsAllocator>> shardsAllocators = new HashMap<>();
private final Set<Class<? extends AllocationDecider>> allocationDeciders = new HashSet<>();
//TODO: Documentation
public AllocationModule(Settings settings) {
this.settings = settings;
this.allocationDeciders.addAll(DEFAULT_ALLOCATION_DECIDERS);
registerShardAllocator(BALANCED_ALLOCATOR, BalancedShardsAllocator.class);
registerShardAllocator(EVEN_SHARD_COUNT_ALLOCATOR, BalancedShardsAllocator.class);
}
@Override
public Iterable<? extends Module> spawnModules() {
return ImmutableList.of(new ShardsAllocatorModule(settings), new AllocationDecidersModule(settings));
/** Register a custom allocation decider */
public void registerAllocationDecider(Class<? extends AllocationDecider> allocationDecider) {
boolean isNew = allocationDeciders.add(allocationDecider);
if (isNew == false) {
throw new IllegalArgumentException("Cannot register AllocationDecider " + allocationDecider.getName() + " twice");
}
}
/** Register a custom shard allocator with the given name */
public void registerShardAllocator(String name, Class<? extends ShardsAllocator> clazz) {
Class<? extends ShardsAllocator> existing = shardsAllocators.put(name, clazz);
if (existing != null) {
throw new IllegalArgumentException("Cannot register ShardAllocator [" + name + "] to " + clazz.getName() + ", already registered to " + existing.getName());
}
}
@Override
protected void configure() {
// bind ShardsAllocator
final String shardsAllocatorType = settings.get(AllocationModule.SHARDS_ALLOCATOR_TYPE_KEY, AllocationModule.BALANCED_ALLOCATOR);
final Class<? extends ShardsAllocator> shardsAllocator = shardsAllocators.get(shardsAllocatorType);
if (shardsAllocator == null) {
throw new IllegalArgumentException("Unknown ShardsAllocator type [" + shardsAllocatorType + "]");
} else if (shardsAllocatorType.equals(EVEN_SHARD_COUNT_ALLOCATOR)) {
final ESLogger logger = Loggers.getLogger(getClass(), settings);
logger.warn("{} allocator has been removed in 2.0 using {} instead", AllocationModule.EVEN_SHARD_COUNT_ALLOCATOR, AllocationModule.BALANCED_ALLOCATOR);
}
bind(ShardsAllocator.class).to(shardsAllocator).asEagerSingleton();
// bind AllocationDeciders
Multibinder<AllocationDecider> allocationMultibinder = Multibinder.newSetBinder(binder(), AllocationDecider.class);
for (Class<? extends AllocationDecider> allocation : allocationDeciders) {
allocationMultibinder.addBinding().to(allocation).asEagerSingleton();
}
bind(GatewayAllocator.class).asEagerSingleton();
bind(AllocationDeciders.class).asEagerSingleton();
bind(AllocationService.class).asEagerSingleton();
}
}

View File

@ -1,72 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.routing.allocation.allocator;
import org.elasticsearch.common.Classes;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.gateway.GatewayAllocator;
/**
*/
public class ShardsAllocatorModule extends AbstractModule {
private static final String EVEN_SHARD_COUNT_ALLOCATOR_KEY = "even_shard";
public static final String BALANCED_ALLOCATOR_KEY = "balanced"; // default
public static final String TYPE_KEY = "cluster.routing.allocation.type";
private Settings settings;
private Class<? extends ShardsAllocator> shardsAllocator;
public ShardsAllocatorModule(Settings settings) {
this.settings = settings;
shardsAllocator = loadShardsAllocator(settings);
}
@Override
protected void configure() {
if (shardsAllocator == null) {
shardsAllocator = loadShardsAllocator(settings);
}
bind(GatewayAllocator.class).asEagerSingleton();
bind(ShardsAllocator.class).to(shardsAllocator).asEagerSingleton();
}
private Class<? extends ShardsAllocator> loadShardsAllocator(Settings settings) {
final Class<? extends ShardsAllocator> shardsAllocator;
final String type = settings.get(TYPE_KEY, BALANCED_ALLOCATOR_KEY);
if (BALANCED_ALLOCATOR_KEY.equals(type)) {
shardsAllocator = BalancedShardsAllocator.class;
} else if (EVEN_SHARD_COUNT_ALLOCATOR_KEY.equals(type)) {
final ESLogger logger = Loggers.getLogger(getClass(), settings);
logger.warn("{} allocator has been removed in 2.0 using {} instead", EVEN_SHARD_COUNT_ALLOCATOR_KEY, BALANCED_ALLOCATOR_KEY);
shardsAllocator = BalancedShardsAllocator.class;
} else {
throw new IllegalArgumentException("Unknown ShardsAllocator type [" + type + "]");
}
return shardsAllocator;
}
}

View File

@ -1,80 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.routing.allocation.decider;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.multibindings.Multibinder;
import org.elasticsearch.common.settings.Settings;
import java.util.List;
/**
* This module configures several {@link AllocationDecider}s
* that make configuration specific decisions if shards can be allocated on certain nodes.
*
* @see Decision
* @see AllocationDecider
*/
public class AllocationDecidersModule extends AbstractModule {
private final Settings settings;
private List<Class<? extends AllocationDecider>> allocations = Lists.newArrayList();
public AllocationDecidersModule(Settings settings) {
this.settings = settings;
}
public AllocationDecidersModule add(Class<? extends AllocationDecider> allocationDecider) {
this.allocations.add(allocationDecider);
return this;
}
@Override
protected void configure() {
Multibinder<AllocationDecider> allocationMultibinder = Multibinder.newSetBinder(binder(), AllocationDecider.class);
for (Class<? extends AllocationDecider> deciderClass : DEFAULT_ALLOCATION_DECIDERS) {
allocationMultibinder.addBinding().to(deciderClass).asEagerSingleton();
}
for (Class<? extends AllocationDecider> allocation : allocations) {
allocationMultibinder.addBinding().to(allocation).asEagerSingleton();
}
bind(AllocationDeciders.class).asEagerSingleton();
}
public static final ImmutableSet<Class<? extends AllocationDecider>> DEFAULT_ALLOCATION_DECIDERS = ImmutableSet.<Class<? extends AllocationDecider>>builder().
add(SameShardAllocationDecider.class).
add(FilterAllocationDecider.class).
add(ReplicaAfterPrimaryActiveAllocationDecider.class).
add(ThrottlingAllocationDecider.class).
add(RebalanceOnlyWhenActiveAllocationDecider.class).
add(ClusterRebalanceAllocationDecider.class).
add(ConcurrentRebalanceAllocationDecider.class).
add(EnableAllocationDecider.class). // new enable allocation logic should proceed old disable allocation logic
add(DisableAllocationDecider.class).
add(AwarenessAllocationDecider.class).
add(ShardsLimitAllocationDecider.class).
add(NodeVersionAllocationDecider.class).
add(DiskThresholdDecider.class).
add(SnapshotInProgressAllocationDecider.class).build();
}

View File

@ -211,6 +211,9 @@ public final class Elements {
try {
module.configure(binder);
} catch (IllegalArgumentException e) {
// NOTE: This is not in the original guice. We rethrow here to expose any explicit errors in configure()
throw e;
} catch (RuntimeException e) {
Collection<Message> messages = Errors.getMessagesFromThrowable(e);
if (!messages.isEmpty()) {

View File

@ -19,9 +19,9 @@
package org.elasticsearch.cluster.allocation;
import org.elasticsearch.cluster.routing.allocation.AllocationModule;
import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocatorModule;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
@ -40,10 +40,10 @@ public class ShardsAllocatorModuleIT extends ESIntegTestCase {
}
public void testLoadByShortKeyShardsAllocator() throws IOException {
Settings build = settingsBuilder().put(ShardsAllocatorModule.TYPE_KEY, "even_shard") // legacy just to make sure we don't barf
Settings build = settingsBuilder().put(AllocationModule.SHARDS_ALLOCATOR_TYPE_KEY, "even_shard") // legacy just to make sure we don't barf
.build();
assertAllocatorInstance(build, BalancedShardsAllocator.class);
build = settingsBuilder().put(ShardsAllocatorModule.TYPE_KEY, ShardsAllocatorModule.BALANCED_ALLOCATOR_KEY).build();
build = settingsBuilder().put(AllocationModule.SHARDS_ALLOCATOR_TYPE_KEY, AllocationModule.BALANCED_ALLOCATOR).build();
assertAllocatorInstance(build, BalancedShardsAllocator.class);
}

View File

@ -0,0 +1,83 @@
package org.elasticsearch.cluster.routing.allocation;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
import org.elasticsearch.common.inject.ModuleTestCase;
import org.elasticsearch.common.settings.Settings;
public class AllocationModuleTests extends ModuleTestCase {
public static class FakeAllocationDecider extends AllocationDecider {
protected FakeAllocationDecider(Settings settings) {
super(settings);
}
}
public static class FakeShardsAllocator implements ShardsAllocator {
@Override
public void applyStartedShards(StartedRerouteAllocation allocation) {}
@Override
public void applyFailedShards(FailedRerouteAllocation allocation) {}
@Override
public boolean allocateUnassigned(RoutingAllocation allocation) {
return false;
}
@Override
public boolean rebalance(RoutingAllocation allocation) {
return false;
}
@Override
public boolean move(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
return false;
}
}
public void testRegisterAllocationDeciderDuplicate() {
AllocationModule module = new AllocationModule(Settings.EMPTY);
try {
module.registerAllocationDecider(EnableAllocationDecider.class);
} catch (IllegalArgumentException e) {
assertTrue(e.getMessage().contains("Cannot register AllocationDecider"));
assertTrue(e.getMessage().contains("twice"));
}
}
public void testRegisterAllocationDecider() {
AllocationModule module = new AllocationModule(Settings.EMPTY);
module.registerAllocationDecider(FakeAllocationDecider.class);
assertSetMultiBinding(module, AllocationDecider.class, FakeAllocationDecider.class);
}
public void testRegisterShardsAllocator() {
Settings settings = Settings.builder().put(AllocationModule.SHARDS_ALLOCATOR_TYPE_KEY, "custom").build();
AllocationModule module = new AllocationModule(settings);
module.registerShardAllocator("custom", FakeShardsAllocator.class);
assertBinding(module, ShardsAllocator.class, FakeShardsAllocator.class);
}
public void testRegisterShardsAllocatorAlreadyRegistered() {
AllocationModule module = new AllocationModule(Settings.EMPTY);
try {
module.registerShardAllocator(AllocationModule.BALANCED_ALLOCATOR, FakeShardsAllocator.class);
} catch (IllegalArgumentException e) {
assertTrue(e.getMessage().contains("already registered"));
}
}
public void testUnknownShardsAllocator() {
Settings settings = Settings.builder().put(AllocationModule.SHARDS_ALLOCATOR_TYPE_KEY, "dne").build();
AllocationModule module = new AllocationModule(settings);
assertBindingFailure(module, "Unknown ShardsAllocator");
}
public void testEvenShardsAllocatorBackcompat() {
Settings settings = Settings.builder()
.put(AllocationModule.SHARDS_ALLOCATOR_TYPE_KEY, AllocationModule.EVEN_SHARD_COUNT_ALLOCATOR).build();
AllocationModule module = new AllocationModule(settings);
assertBinding(module, ShardsAllocator.class, BalancedShardsAllocator.class);
}
}

View File

@ -0,0 +1,89 @@
package org.elasticsearch.common.inject;
import org.elasticsearch.common.inject.spi.Element;
import org.elasticsearch.common.inject.spi.Elements;
import org.elasticsearch.common.inject.spi.LinkedKeyBinding;
import org.elasticsearch.common.inject.spi.ProviderInstanceBinding;
import org.elasticsearch.test.ESTestCase;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
/**
* Base testcase for testing {@link Module} implementations.
*/
public class ModuleTestCase extends ESTestCase {
/** Configures the module and asserts "clazz" is bound to "to". */
public void assertBinding(Module module, Class to, Class clazz) {
List<Element> elements = Elements.getElements(module);
for (Element element : elements) {
if (element instanceof LinkedKeyBinding) {
LinkedKeyBinding binding = (LinkedKeyBinding)element;
if (to.getName().equals(binding.getKey().getTypeLiteral().getType().getTypeName())) {
assertEquals(clazz.getName(), binding.getLinkedKey().getTypeLiteral().getType().getTypeName());
return;
}
}
}
StringBuilder s = new StringBuilder();
for (Element element : elements) {
s.append(element + "\n");
}
fail("Did not find any binding to " + to.getName() + ". Found these bindings:\n" + s);
}
/**
* Attempts to configure the module, and asserts an {@link IllegalArgumentException} is
* caught, containing the given messages
*/
public void assertBindingFailure(Module module, String... msgs) {
try {
List<Element> elements = Elements.getElements(module);
StringBuilder s = new StringBuilder();
for (Element element : elements) {
s.append(element + "\n");
}
fail("Expected exception from configuring module. Found these bindings:\n" + s);
} catch (IllegalArgumentException e) {
for (String msg : msgs) {
assertTrue(e.getMessage().contains(msg));
}
}
}
/**
* Configures the module and checks a Set of the "to" class
* is bound to "classes". There may be more classes bound
* to "to" than just "classes".
*/
public void assertSetMultiBinding(Module module, Class to, Class... classes) {
List<Element> elements = Elements.getElements(module);
Set<String> bindings = new HashSet<>();
boolean providerFound = false;
for (Element element : elements) {
if (element instanceof LinkedKeyBinding) {
LinkedKeyBinding binding = (LinkedKeyBinding)element;
if (to.getName().equals(binding.getKey().getTypeLiteral().getType().getTypeName())) {
bindings.add(binding.getLinkedKey().getTypeLiteral().getType().getTypeName());
}
} else if (element instanceof ProviderInstanceBinding) {
ProviderInstanceBinding binding = (ProviderInstanceBinding)element;
String setType = binding.getKey().getTypeLiteral().getType().getTypeName();
if (setType.equals("java.util.Set<" + to.getName() + ">")) {
providerFound = true;
}
}
}
for (Class clazz : classes) {
if (bindings.contains(clazz.getName()) == false) {
fail("Expected to find " + clazz.getName() + " as set binding to " + to.getName() + ", found these classes:\n" + bindings);
}
}
assertTrue("Did not find provider for set of " + to.getName(), providerFound);
}
// TODO: add assert for map multibinding
}

View File

@ -18,29 +18,32 @@
*/
package org.elasticsearch.test;
import com.google.common.collect.ImmutableSet;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.EmptyClusterInfoService;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.AllocationModule;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocators;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecidersModule;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.DummyTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.test.gateway.NoopGatewayAllocator;
import org.elasticsearch.node.settings.NodeSettingsService;
import org.elasticsearch.test.gateway.NoopGatewayAllocator;
import java.lang.reflect.Constructor;
import java.util.*;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Random;
import static com.google.common.collect.Lists.newArrayList;
import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
@ -71,9 +74,9 @@ public abstract class ESAllocationTestCase extends ESTestCase {
public static AllocationDeciders randomAllocationDeciders(Settings settings, NodeSettingsService nodeSettingsService, Random random) {
final ImmutableSet<Class<? extends AllocationDecider>> defaultAllocationDeciders = AllocationDecidersModule.DEFAULT_ALLOCATION_DECIDERS;
final List<Class<? extends AllocationDecider>> defaultAllocationDeciders = AllocationModule.DEFAULT_ALLOCATION_DECIDERS;
final List<AllocationDecider> list = new ArrayList<>();
for (Class<? extends AllocationDecider> deciderClass : defaultAllocationDeciders) {
for (Class<? extends AllocationDecider> deciderClass : AllocationModule.DEFAULT_ALLOCATION_DECIDERS) {
try {
try {
Constructor<? extends AllocationDecider> constructor = deciderClass.getConstructor(Settings.class, NodeSettingsService.class);