Merge pull request #16238 from s1monw/convert_some_settings
Convert `cluster.routing.allocation.type` and `processors` to the new settings infra.
This commit is contained in:
commit
f8cb1912af
|
@ -57,6 +57,7 @@ import org.elasticsearch.cluster.service.InternalClusterService;
|
|||
import org.elasticsearch.common.inject.AbstractModule;
|
||||
import org.elasticsearch.common.logging.ESLogger;
|
||||
import org.elasticsearch.common.logging.Loggers;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.ExtensionPoint;
|
||||
import org.elasticsearch.gateway.GatewayAllocator;
|
||||
|
@ -64,6 +65,7 @@ import org.elasticsearch.gateway.GatewayAllocator;
|
|||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.function.Function;
|
||||
|
||||
/**
|
||||
* Configures classes and services that affect the entire cluster.
|
||||
|
@ -72,7 +74,7 @@ public class ClusterModule extends AbstractModule {
|
|||
|
||||
public static final String EVEN_SHARD_COUNT_ALLOCATOR = "even_shard";
|
||||
public static final String BALANCED_ALLOCATOR = "balanced"; // default
|
||||
public static final String SHARDS_ALLOCATOR_TYPE_KEY = "cluster.routing.allocation.type";
|
||||
public static final Setting<String> SHARDS_ALLOCATOR_TYPE_SETTING = new Setting<>("cluster.routing.allocation.type", BALANCED_ALLOCATOR, Function.identity(), false, Setting.Scope.CLUSTER);
|
||||
public static final List<Class<? extends AllocationDecider>> DEFAULT_ALLOCATION_DECIDERS =
|
||||
Collections.unmodifiableList(Arrays.asList(
|
||||
SameShardAllocationDecider.class,
|
||||
|
@ -121,7 +123,7 @@ public class ClusterModule extends AbstractModule {
|
|||
@Override
|
||||
protected void configure() {
|
||||
// bind ShardsAllocator
|
||||
String shardsAllocatorType = shardsAllocators.bindType(binder(), settings, ClusterModule.SHARDS_ALLOCATOR_TYPE_KEY, ClusterModule.BALANCED_ALLOCATOR);
|
||||
String shardsAllocatorType = shardsAllocators.bindType(binder(), settings, ClusterModule.SHARDS_ALLOCATOR_TYPE_SETTING.getKey(), ClusterModule.BALANCED_ALLOCATOR);
|
||||
if (shardsAllocatorType.equals(ClusterModule.EVEN_SHARD_COUNT_ALLOCATOR)) {
|
||||
final ESLogger logger = Loggers.getLogger(getClass(), settings);
|
||||
logger.warn("{} allocator has been removed in 2.0 using {} instead", ClusterModule.EVEN_SHARD_COUNT_ALLOCATOR, ClusterModule.BALANCED_ALLOCATOR);
|
||||
|
|
|
@ -21,6 +21,7 @@ package org.elasticsearch.common.settings;
|
|||
import org.elasticsearch.action.admin.indices.close.TransportCloseIndexAction;
|
||||
import org.elasticsearch.action.support.DestructiveOperations;
|
||||
import org.elasticsearch.client.transport.TransportClientNodesService;
|
||||
import org.elasticsearch.cluster.ClusterModule;
|
||||
import org.elasticsearch.cluster.InternalClusterInfoService;
|
||||
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
|
@ -36,6 +37,7 @@ import org.elasticsearch.cluster.routing.allocation.decider.SnapshotInProgressAl
|
|||
import org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider;
|
||||
import org.elasticsearch.cluster.service.InternalClusterService;
|
||||
import org.elasticsearch.common.logging.ESLoggerFactory;
|
||||
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
||||
import org.elasticsearch.discovery.DiscoveryModule;
|
||||
import org.elasticsearch.discovery.DiscoveryService;
|
||||
import org.elasticsearch.common.network.NetworkModule;
|
||||
|
@ -258,5 +260,7 @@ public final class ClusterSettings extends AbstractScopedSettings {
|
|||
URLRepository.ALLOWED_URLS_SETTING,
|
||||
URLRepository.REPOSITORIES_LIST_DIRECTORIES_SETTING,
|
||||
URLRepository.REPOSITORIES_URL_SETTING,
|
||||
URLRepository.SUPPORTED_PROTOCOLS_SETTING)));
|
||||
URLRepository.SUPPORTED_PROTOCOLS_SETTING,
|
||||
ClusterModule.SHARDS_ALLOCATOR_TYPE_SETTING,
|
||||
EsExecutors.PROCESSORS_SETTING)));
|
||||
}
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
|
||||
package org.elasticsearch.common.util.concurrent;
|
||||
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
||||
import java.util.Arrays;
|
||||
|
@ -40,10 +41,7 @@ public class EsExecutors {
|
|||
* Settings key to manually set the number of available processors.
|
||||
* This is used to adjust thread pools sizes etc. per node.
|
||||
*/
|
||||
public static final String PROCESSORS = "processors";
|
||||
|
||||
/** Useful for testing */
|
||||
public static final String DEFAULT_SYSPROP = "es.processors.override";
|
||||
public static final Setting<Integer> PROCESSORS_SETTING = Setting.intSetting("processors", Math.min(32, Runtime.getRuntime().availableProcessors()), 1, false, Setting.Scope.CLUSTER) ;
|
||||
|
||||
/**
|
||||
* Returns the number of processors available but at most <tt>32</tt>.
|
||||
|
@ -53,11 +51,7 @@ public class EsExecutors {
|
|||
* ie. >= 48 create too many threads and run into OOM see #3478
|
||||
* We just use an 32 core upper-bound here to not stress the system
|
||||
* too much with too many created threads */
|
||||
int defaultValue = Math.min(32, Runtime.getRuntime().availableProcessors());
|
||||
try {
|
||||
defaultValue = Integer.parseInt(System.getProperty(DEFAULT_SYSPROP));
|
||||
} catch (Throwable ignored) {}
|
||||
return settings.getAsInt(PROCESSORS, defaultValue);
|
||||
return PROCESSORS_SETTING.get(settings);
|
||||
}
|
||||
|
||||
public static PrioritizedEsThreadPoolExecutor newSinglePrioritizing(String name, ThreadFactory threadFactory) {
|
||||
|
|
|
@ -165,7 +165,7 @@ public class ClusterStatsIT extends ESIntegTestCase {
|
|||
internalCluster().ensureAtMostNumDataNodes(0);
|
||||
|
||||
// start one node with 7 processors.
|
||||
internalCluster().startNodesAsync(Settings.builder().put(EsExecutors.PROCESSORS, 7).build()).get();
|
||||
internalCluster().startNodesAsync(Settings.builder().put(EsExecutors.PROCESSORS_SETTING.getKey(), 7).build()).get();
|
||||
waitForNodes(1);
|
||||
|
||||
ClusterStatsResponse response = client().admin().cluster().prepareClusterStats().get();
|
||||
|
|
|
@ -122,7 +122,7 @@ public class ClusterModuleTests extends ModuleTestCase {
|
|||
}
|
||||
|
||||
public void testRegisterShardsAllocator() {
|
||||
Settings settings = Settings.builder().put(ClusterModule.SHARDS_ALLOCATOR_TYPE_KEY, "custom").build();
|
||||
Settings settings = Settings.builder().put(ClusterModule.SHARDS_ALLOCATOR_TYPE_SETTING.getKey(), "custom").build();
|
||||
ClusterModule module = new ClusterModule(settings);
|
||||
module.registerShardsAllocator("custom", FakeShardsAllocator.class);
|
||||
assertBinding(module, ShardsAllocator.class, FakeShardsAllocator.class);
|
||||
|
@ -138,14 +138,14 @@ public class ClusterModuleTests extends ModuleTestCase {
|
|||
}
|
||||
|
||||
public void testUnknownShardsAllocator() {
|
||||
Settings settings = Settings.builder().put(ClusterModule.SHARDS_ALLOCATOR_TYPE_KEY, "dne").build();
|
||||
Settings settings = Settings.builder().put(ClusterModule.SHARDS_ALLOCATOR_TYPE_SETTING.getKey(), "dne").build();
|
||||
ClusterModule module = new ClusterModule(settings);
|
||||
assertBindingFailure(module, "Unknown [shards_allocator]");
|
||||
}
|
||||
|
||||
public void testEvenShardsAllocatorBackcompat() {
|
||||
Settings settings = Settings.builder()
|
||||
.put(ClusterModule.SHARDS_ALLOCATOR_TYPE_KEY, ClusterModule.EVEN_SHARD_COUNT_ALLOCATOR).build();
|
||||
.put(ClusterModule.SHARDS_ALLOCATOR_TYPE_SETTING.getKey(), ClusterModule.EVEN_SHARD_COUNT_ALLOCATOR).build();
|
||||
ClusterModule module = new ClusterModule(settings);
|
||||
assertBinding(module, ShardsAllocator.class, BalancedShardsAllocator.class);
|
||||
}
|
||||
|
|
|
@ -40,10 +40,10 @@ public class ShardsAllocatorModuleIT extends ESIntegTestCase {
|
|||
}
|
||||
|
||||
public void testLoadByShortKeyShardsAllocator() throws IOException {
|
||||
Settings build = settingsBuilder().put(ClusterModule.SHARDS_ALLOCATOR_TYPE_KEY, "even_shard") // legacy just to make sure we don't barf
|
||||
Settings build = settingsBuilder().put(ClusterModule.SHARDS_ALLOCATOR_TYPE_SETTING.getKey(), "even_shard") // legacy just to make sure we don't barf
|
||||
.build();
|
||||
assertAllocatorInstance(build, BalancedShardsAllocator.class);
|
||||
build = settingsBuilder().put(ClusterModule.SHARDS_ALLOCATOR_TYPE_KEY, ClusterModule.BALANCED_ALLOCATOR).build();
|
||||
build = settingsBuilder().put(ClusterModule.SHARDS_ALLOCATOR_TYPE_SETTING.getKey(), ClusterModule.BALANCED_ALLOCATOR).build();
|
||||
assertAllocatorInstance(build, BalancedShardsAllocator.class);
|
||||
}
|
||||
|
||||
|
|
|
@ -335,7 +335,7 @@ public class ClusterSettingsIT extends ESIntegTestCase {
|
|||
.put("node.name", "ClusterSettingsIT")
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
|
||||
.put(EsExecutors.PROCESSORS, 1) // limit the number of threads created
|
||||
.put(EsExecutors.PROCESSORS_SETTING.getKey(), 1) // limit the number of threads created
|
||||
.put("http.enabled", false)
|
||||
.put("config.ignore_system_properties", true) // make sure we get what we set :)
|
||||
.put(settings)
|
||||
|
|
|
@ -88,8 +88,8 @@ public class SimpleNodesInfoIT extends ESIntegTestCase {
|
|||
public void testAllocatedProcessors() throws Exception {
|
||||
List<String> nodesIds = internalCluster().
|
||||
startNodesAsync(
|
||||
Settings.builder().put(EsExecutors.PROCESSORS, 3).build(),
|
||||
Settings.builder().put(EsExecutors.PROCESSORS, 6).build()
|
||||
Settings.builder().put(EsExecutors.PROCESSORS_SETTING.getKey(), 3).build(),
|
||||
Settings.builder().put(EsExecutors.PROCESSORS_SETTING.getKey(), 6).build()
|
||||
).get();
|
||||
|
||||
final String node_1 = nodesIds.get(0);
|
||||
|
|
|
@ -65,8 +65,6 @@ public class SimpleTTLIT extends ESIntegTestCase {
|
|||
return settingsBuilder()
|
||||
.put(super.nodeSettings(nodeOrdinal))
|
||||
.put("indices.ttl.interval", PURGE_INTERVAL, TimeUnit.MILLISECONDS)
|
||||
.put("cluster.routing.operation.use_type", false) // make sure we control the shard computation
|
||||
.put("cluster.routing.operation.hash.type", "djb")
|
||||
.build();
|
||||
}
|
||||
|
||||
|
|
|
@ -168,7 +168,7 @@ public abstract class ESSingleNodeTestCase extends ESTestCase {
|
|||
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
|
||||
.put("script.inline", "on")
|
||||
.put("script.indexed", "on")
|
||||
.put(EsExecutors.PROCESSORS, 1) // limit the number of threads created
|
||||
.put(EsExecutors.PROCESSORS_SETTING.getKey(), 1) // limit the number of threads created
|
||||
.put("http.enabled", false)
|
||||
.put(Node.NODE_LOCAL_SETTING.getKey(), true)
|
||||
.put(Node.NODE_DATA_SETTING.getKey(), true)
|
||||
|
|
|
@ -40,14 +40,12 @@ import org.elasticsearch.bootstrap.BootstrapForTesting;
|
|||
import org.elasticsearch.cache.recycler.MockPageCacheRecycler;
|
||||
import org.elasticsearch.client.Requests;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.common.SuppressForbidden;
|
||||
import org.elasticsearch.common.io.PathUtils;
|
||||
import org.elasticsearch.common.io.PathUtilsForTesting;
|
||||
import org.elasticsearch.common.logging.ESLogger;
|
||||
import org.elasticsearch.common.logging.Loggers;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.MockBigArrays;
|
||||
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.env.Environment;
|
||||
import org.elasticsearch.env.NodeEnvironment;
|
||||
|
@ -162,22 +160,6 @@ public abstract class ESTestCase extends LuceneTestCase {
|
|||
Requests.INDEX_CONTENT_TYPE = XContentType.JSON;
|
||||
}
|
||||
|
||||
// randomize and override the number of cpus so tests reproduce regardless of real number of cpus
|
||||
|
||||
@BeforeClass
|
||||
@SuppressForbidden(reason = "sets the number of cpus during tests")
|
||||
public static void setProcessors() {
|
||||
int numCpu = TestUtil.nextInt(random(), 1, 4);
|
||||
System.setProperty(EsExecutors.DEFAULT_SYSPROP, Integer.toString(numCpu));
|
||||
assertEquals(numCpu, EsExecutors.boundedNumberOfProcessors(Settings.EMPTY));
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
@SuppressForbidden(reason = "clears the number of cpus during tests")
|
||||
public static void restoreProcessors() {
|
||||
System.clearProperty(EsExecutors.DEFAULT_SYSPROP);
|
||||
}
|
||||
|
||||
@After
|
||||
public final void ensureCleanedUp() throws Exception {
|
||||
MockPageCacheRecycler.ensureAllPagesAreReleased();
|
||||
|
|
|
@ -391,11 +391,7 @@ public final class InternalTestCluster extends TestCluster {
|
|||
builder.put(SearchService.DEFAULT_KEEPALIVE_SETTING.getKey(), TimeValue.timeValueSeconds(100 + random.nextInt(5 * 60)));
|
||||
}
|
||||
|
||||
if (random.nextInt(10) == 0) {
|
||||
// node gets an extra cpu this time
|
||||
builder.put(EsExecutors.PROCESSORS, 1 + EsExecutors.boundedNumberOfProcessors(Settings.EMPTY));
|
||||
}
|
||||
|
||||
builder.put(EsExecutors.PROCESSORS_SETTING.getKey(), 1 + random.nextInt(3));
|
||||
if (random.nextBoolean()) {
|
||||
if (random.nextBoolean()) {
|
||||
builder.put("indices.fielddata.cache.size", 1 + random.nextInt(1000), ByteSizeUnit.MB);
|
||||
|
|
Loading…
Reference in New Issue