Merge pull request #20071 from rjernst/pull_shards_allocator
Plugins: Switch custom ShardsAllocators to pull based model
This commit is contained in:
commit
1a7a9d3c62
|
@ -69,25 +69,26 @@ import java.util.Collection;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Objects;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
|
import java.util.function.Supplier;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Configures classes and services that affect the entire cluster.
|
* Configures classes and services that affect the entire cluster.
|
||||||
*/
|
*/
|
||||||
public class ClusterModule extends AbstractModule {
|
public class ClusterModule extends AbstractModule {
|
||||||
|
|
||||||
public static final String EVEN_SHARD_COUNT_ALLOCATOR = "even_shard";
|
|
||||||
public static final String BALANCED_ALLOCATOR = "balanced"; // default
|
public static final String BALANCED_ALLOCATOR = "balanced"; // default
|
||||||
public static final Setting<String> SHARDS_ALLOCATOR_TYPE_SETTING =
|
public static final Setting<String> SHARDS_ALLOCATOR_TYPE_SETTING =
|
||||||
new Setting<>("cluster.routing.allocation.type", BALANCED_ALLOCATOR, Function.identity(), Property.NodeScope);
|
new Setting<>("cluster.routing.allocation.type", BALANCED_ALLOCATOR, Function.identity(), Property.NodeScope);
|
||||||
|
|
||||||
private final Settings settings;
|
private final Settings settings;
|
||||||
private final ExtensionPoint.SelectedType<ShardsAllocator> shardsAllocators = new ExtensionPoint.SelectedType<>("shards_allocator", ShardsAllocator.class);
|
|
||||||
private final ExtensionPoint.ClassSet<IndexTemplateFilter> indexTemplateFilters = new ExtensionPoint.ClassSet<>("index_template_filter", IndexTemplateFilter.class);
|
private final ExtensionPoint.ClassSet<IndexTemplateFilter> indexTemplateFilters = new ExtensionPoint.ClassSet<>("index_template_filter", IndexTemplateFilter.class);
|
||||||
private final ClusterService clusterService;
|
private final ClusterService clusterService;
|
||||||
private final IndexNameExpressionResolver indexNameExpressionResolver;
|
private final IndexNameExpressionResolver indexNameExpressionResolver;
|
||||||
// pkg private for tests
|
// pkg private for tests
|
||||||
final Collection<AllocationDecider> allocationDeciders;
|
final Collection<AllocationDecider> allocationDeciders;
|
||||||
|
final ShardsAllocator shardsAllocator;
|
||||||
|
|
||||||
// pkg private so tests can mock
|
// pkg private so tests can mock
|
||||||
Class<? extends ClusterInfoService> clusterInfoServiceImpl = InternalClusterInfoService.class;
|
Class<? extends ClusterInfoService> clusterInfoServiceImpl = InternalClusterInfoService.class;
|
||||||
|
@ -95,16 +96,11 @@ public class ClusterModule extends AbstractModule {
|
||||||
public ClusterModule(Settings settings, ClusterService clusterService, List<ClusterPlugin> clusterPlugins) {
|
public ClusterModule(Settings settings, ClusterService clusterService, List<ClusterPlugin> clusterPlugins) {
|
||||||
this.settings = settings;
|
this.settings = settings;
|
||||||
this.allocationDeciders = createAllocationDeciders(settings, clusterService.getClusterSettings(), clusterPlugins);
|
this.allocationDeciders = createAllocationDeciders(settings, clusterService.getClusterSettings(), clusterPlugins);
|
||||||
registerShardsAllocator(ClusterModule.BALANCED_ALLOCATOR, BalancedShardsAllocator.class);
|
this.shardsAllocator = createShardsAllocator(settings, clusterService.getClusterSettings(), clusterPlugins);
|
||||||
registerShardsAllocator(ClusterModule.EVEN_SHARD_COUNT_ALLOCATOR, BalancedShardsAllocator.class);
|
|
||||||
this.clusterService = clusterService;
|
this.clusterService = clusterService;
|
||||||
indexNameExpressionResolver = new IndexNameExpressionResolver(settings);
|
indexNameExpressionResolver = new IndexNameExpressionResolver(settings);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void registerShardsAllocator(String name, Class<? extends ShardsAllocator> clazz) {
|
|
||||||
shardsAllocators.registerExtension(name, clazz);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void registerIndexTemplateFilter(Class<? extends IndexTemplateFilter> indexTemplateFilter) {
|
public void registerIndexTemplateFilter(Class<? extends IndexTemplateFilter> indexTemplateFilter) {
|
||||||
indexTemplateFilters.registerExtension(indexTemplateFilter);
|
indexTemplateFilters.registerExtension(indexTemplateFilter);
|
||||||
}
|
}
|
||||||
|
@ -148,14 +144,29 @@ public class ClusterModule extends AbstractModule {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static ShardsAllocator createShardsAllocator(Settings settings, ClusterSettings clusterSettings,
|
||||||
|
List<ClusterPlugin> clusterPlugins) {
|
||||||
|
Map<String, Supplier<ShardsAllocator>> allocators = new HashMap<>();
|
||||||
|
allocators.put(BALANCED_ALLOCATOR, () -> new BalancedShardsAllocator(settings, clusterSettings));
|
||||||
|
|
||||||
|
for (ClusterPlugin plugin : clusterPlugins) {
|
||||||
|
plugin.getShardsAllocators(settings, clusterSettings).forEach((k, v) -> {
|
||||||
|
if (allocators.put(k, v) != null) {
|
||||||
|
throw new IllegalArgumentException("ShardsAllocator [" + k + "] already defined");
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
String allocatorName = SHARDS_ALLOCATOR_TYPE_SETTING.get(settings);
|
||||||
|
Supplier<ShardsAllocator> allocatorSupplier = allocators.get(allocatorName);
|
||||||
|
if (allocatorSupplier == null) {
|
||||||
|
throw new IllegalArgumentException("Unknown ShardsAllocator [" + allocatorName + "]");
|
||||||
|
}
|
||||||
|
return Objects.requireNonNull(allocatorSupplier.get(),
|
||||||
|
"ShardsAllocator factory for [" + allocatorName + "] returned null");
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void configure() {
|
protected void configure() {
|
||||||
// bind ShardsAllocator
|
|
||||||
String shardsAllocatorType = shardsAllocators.bindType(binder(), settings, ClusterModule.SHARDS_ALLOCATOR_TYPE_SETTING.getKey(), ClusterModule.BALANCED_ALLOCATOR);
|
|
||||||
if (shardsAllocatorType.equals(ClusterModule.EVEN_SHARD_COUNT_ALLOCATOR)) {
|
|
||||||
final ESLogger logger = Loggers.getLogger(getClass(), settings);
|
|
||||||
logger.warn("{} allocator has been removed in 2.0 using {} instead", ClusterModule.EVEN_SHARD_COUNT_ALLOCATOR, ClusterModule.BALANCED_ALLOCATOR);
|
|
||||||
}
|
|
||||||
indexTemplateFilters.bind(binder());
|
indexTemplateFilters.bind(binder());
|
||||||
|
|
||||||
bind(ClusterInfoService.class).to(clusterInfoServiceImpl).asEagerSingleton();
|
bind(ClusterInfoService.class).to(clusterInfoServiceImpl).asEagerSingleton();
|
||||||
|
@ -178,5 +189,6 @@ public class ClusterModule extends AbstractModule {
|
||||||
bind(MappingUpdatedAction.class).asEagerSingleton();
|
bind(MappingUpdatedAction.class).asEagerSingleton();
|
||||||
bind(TaskResultsService.class).asEagerSingleton();
|
bind(TaskResultsService.class).asEagerSingleton();
|
||||||
bind(AllocationDeciders.class).toInstance(new AllocationDeciders(settings, allocationDeciders));
|
bind(AllocationDeciders.class).toInstance(new AllocationDeciders(settings, allocationDeciders));
|
||||||
|
bind(ShardsAllocator.class).toInstance(shardsAllocator);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,7 +21,10 @@ package org.elasticsearch.plugins;
|
||||||
|
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.function.Supplier;
|
||||||
|
|
||||||
|
import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocator;
|
||||||
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider;
|
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider;
|
||||||
import org.elasticsearch.common.settings.ClusterSettings;
|
import org.elasticsearch.common.settings.ClusterSettings;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
|
@ -41,4 +44,18 @@ public interface ClusterPlugin {
|
||||||
default Collection<AllocationDecider> createAllocationDeciders(Settings settings, ClusterSettings clusterSettings) {
|
default Collection<AllocationDecider> createAllocationDeciders(Settings settings, ClusterSettings clusterSettings) {
|
||||||
return Collections.emptyList();
|
return Collections.emptyList();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return {@link ShardsAllocator} implementations added by this plugin.
|
||||||
|
*
|
||||||
|
* The key of the returned {@link Map} is the name of the allocator, and the value
|
||||||
|
* is a function to construct the allocator.
|
||||||
|
*
|
||||||
|
* @param settings Settings for the node
|
||||||
|
* @param clusterSettings Settings for the cluster
|
||||||
|
* @return A map of allocator implementations
|
||||||
|
*/
|
||||||
|
default Map<String, Supplier<ShardsAllocator>> getShardsAllocators(Settings settings, ClusterSettings clusterSettings) {
|
||||||
|
return Collections.emptyMap();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -44,6 +44,8 @@ import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.function.Supplier;
|
||||||
|
|
||||||
public class ClusterModuleTests extends ModuleTestCase {
|
public class ClusterModuleTests extends ModuleTestCase {
|
||||||
private ClusterService clusterService = new ClusterService(Settings.EMPTY,
|
private ClusterService clusterService = new ClusterService(Settings.EMPTY,
|
||||||
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null);
|
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null);
|
||||||
|
@ -126,33 +128,40 @@ public class ClusterModuleTests extends ModuleTestCase {
|
||||||
assertTrue(module.allocationDeciders.stream().anyMatch(d -> d.getClass().equals(FakeAllocationDecider.class)));
|
assertTrue(module.allocationDeciders.stream().anyMatch(d -> d.getClass().equals(FakeAllocationDecider.class)));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private ClusterModule newClusterModuleWithShardsAllocator(Settings settings, String name, Supplier<ShardsAllocator> supplier) {
|
||||||
|
return new ClusterModule(settings, clusterService, Collections.singletonList(
|
||||||
|
new ClusterPlugin() {
|
||||||
|
@Override
|
||||||
|
public Map<String, Supplier<ShardsAllocator>> getShardsAllocators(Settings settings, ClusterSettings clusterSettings) {
|
||||||
|
return Collections.singletonMap(name, supplier);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
public void testRegisterShardsAllocator() {
|
public void testRegisterShardsAllocator() {
|
||||||
Settings settings = Settings.builder().put(ClusterModule.SHARDS_ALLOCATOR_TYPE_SETTING.getKey(), "custom").build();
|
Settings settings = Settings.builder().put(ClusterModule.SHARDS_ALLOCATOR_TYPE_SETTING.getKey(), "custom").build();
|
||||||
ClusterModule module = new ClusterModule(settings, clusterService, Collections.emptyList());
|
ClusterModule module = newClusterModuleWithShardsAllocator(settings, "custom", FakeShardsAllocator::new);
|
||||||
module.registerShardsAllocator("custom", FakeShardsAllocator.class);
|
assertEquals(FakeShardsAllocator.class, module.shardsAllocator.getClass());
|
||||||
assertBinding(module, ShardsAllocator.class, FakeShardsAllocator.class);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testRegisterShardsAllocatorAlreadyRegistered() {
|
public void testRegisterShardsAllocatorAlreadyRegistered() {
|
||||||
ClusterModule module = new ClusterModule(Settings.EMPTY, clusterService, Collections.emptyList());
|
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () ->
|
||||||
try {
|
newClusterModuleWithShardsAllocator(Settings.EMPTY, ClusterModule.BALANCED_ALLOCATOR, FakeShardsAllocator::new));
|
||||||
module.registerShardsAllocator(ClusterModule.BALANCED_ALLOCATOR, FakeShardsAllocator.class);
|
assertEquals("ShardsAllocator [" + ClusterModule.BALANCED_ALLOCATOR + "] already defined", e.getMessage());
|
||||||
} catch (IllegalArgumentException e) {
|
|
||||||
assertEquals(e.getMessage(), "Can't register the same [shards_allocator] more than once for [balanced]");
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testUnknownShardsAllocator() {
|
public void testUnknownShardsAllocator() {
|
||||||
Settings settings = Settings.builder().put(ClusterModule.SHARDS_ALLOCATOR_TYPE_SETTING.getKey(), "dne").build();
|
Settings settings = Settings.builder().put(ClusterModule.SHARDS_ALLOCATOR_TYPE_SETTING.getKey(), "dne").build();
|
||||||
ClusterModule module = new ClusterModule(settings, clusterService, Collections.emptyList());
|
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () ->
|
||||||
assertBindingFailure(module, "Unknown [shards_allocator]");
|
new ClusterModule(settings, clusterService, Collections.emptyList()));
|
||||||
|
assertEquals("Unknown ShardsAllocator [dne]", e.getMessage());
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testEvenShardsAllocatorBackcompat() {
|
public void testShardsAllocatorFactoryNull() {
|
||||||
Settings settings = Settings.builder()
|
Settings settings = Settings.builder().put(ClusterModule.SHARDS_ALLOCATOR_TYPE_SETTING.getKey(), "bad").build();
|
||||||
.put(ClusterModule.SHARDS_ALLOCATOR_TYPE_SETTING.getKey(), ClusterModule.EVEN_SHARD_COUNT_ALLOCATOR).build();
|
NullPointerException e = expectThrows(NullPointerException.class, () ->
|
||||||
ClusterModule module = new ClusterModule(settings, clusterService, Collections.emptyList());
|
newClusterModuleWithShardsAllocator(settings, "bad", () -> null));
|
||||||
assertBinding(module, ShardsAllocator.class, BalancedShardsAllocator.class);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testRegisterIndexTemplateFilterDuplicate() {
|
public void testRegisterIndexTemplateFilterDuplicate() {
|
||||||
|
|
Loading…
Reference in New Issue