[7.x] Add data tiers (hot, warm, cold, frozen) as custom node roles (#60994) (#61045)

This commit adds the `data_hot`, `data_warm`, `data_cold`, and `data_frozen` node roles to the
x-pack plugin. These roles are intended to be the base for the formalization of data tiers in
Elasticsearch.

These roles all act as data nodes (meaning shards can be allocated to them). Nodes with the existing
`data` role acts as though they have all of the roles configured (it is a hot, warm, cold, and
frozen node).

This also includes a custom `AllocationDecider` that allows the user to configure the following
settings on a cluster level:
- `cluster.routing.allocation.require._tier`
- `cluster.routing.allocation.include._tier`
- `cluster.routing.allocation.exclude._tier`

And in index settings:
- `index.routing.allocation.require._tier`
- `index.routing.allocation.include._tier`
- `index.routing.allocation.exclude._tier`

Relates to #60848
This commit is contained in:
Lee Hinman 2020-08-12 11:06:23 -06:00 committed by GitHub
parent 5b3c10c379
commit e3df64a429
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 893 additions and 9 deletions

View File

@ -24,6 +24,7 @@ import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.xcontent.ToXContentFragment;
@ -83,8 +84,13 @@ public class DiscoveryNode implements Writeable, ToXContentFragment {
return hasRole(settings, DiscoveryNodeRole.MASTER_ROLE);
}
/**
* Due to the way that plugins may not be available when settings are being initialized,
* not all roles may be available from a static/initializing context such as a {@link Setting}
* default value function. In that case, be warned that this may not include all plugin roles.
*/
public static boolean isDataNode(final Settings settings) {
return hasRole(settings, DiscoveryNodeRole.DATA_ROLE);
return getRolesFromSettings(settings).stream().anyMatch(DiscoveryNodeRole::canContainData);
}
public static boolean isIngestNode(Settings settings) {
@ -383,7 +389,7 @@ public class DiscoveryNode implements Writeable, ToXContentFragment {
* Should this node hold data (shards) or not.
*/
public boolean isDataNode() {
return roles.contains(DiscoveryNodeRole.DATA_ROLE);
return roles.stream().anyMatch(DiscoveryNodeRole::canContainData);
}
/**

View File

@ -84,6 +84,13 @@ public abstract class DiscoveryNodeRole implements Comparable<DiscoveryNodeRole>
public abstract Setting<Boolean> legacySetting();
/**
* Indicates whether a node with the given role can contain data. Defaults to false and can be overridden
*/
public boolean canContainData() {
return false;
}
@Override
public final boolean equals(Object o) {
if (this == o) return true;
@ -124,6 +131,10 @@ public abstract class DiscoveryNodeRole implements Comparable<DiscoveryNodeRole>
return Setting.boolSetting("node.data", true, Property.Deprecated, Property.NodeScope);
}
@Override
public boolean canContainData() {
return true;
}
};
/**

View File

@ -190,6 +190,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@ -345,8 +346,11 @@ public class Node implements Closeable {
this.environment = new Environment(settings, initialEnvironment.configFile(), Node.NODE_LOCAL_STORAGE_SETTING.get(settings));
Environment.assertEquivalent(initialEnvironment, this.environment);
nodeEnvironment = new NodeEnvironment(tmpSettings, environment);
logger.info("node name [{}], node ID [{}], cluster name [{}]",
NODE_NAME_SETTING.get(tmpSettings), nodeEnvironment.nodeId(), ClusterName.CLUSTER_NAME_SETTING.get(tmpSettings).value());
logger.info("node name [{}], node ID [{}], cluster name [{}], roles {}",
NODE_NAME_SETTING.get(tmpSettings), nodeEnvironment.nodeId(), ClusterName.CLUSTER_NAME_SETTING.get(tmpSettings).value(),
DiscoveryNode.getRolesFromSettings(settings).stream()
.map(DiscoveryNodeRole::roleName)
.collect(Collectors.toCollection(LinkedHashSet::new)));
resourcesToClose.add(nodeEnvironment);
localNodeFactory = new LocalNodeFactory(settings, nodeEnvironment.nodeId());

View File

@ -766,11 +766,11 @@ public final class InternalTestCluster extends TestCluster {
if (DiscoveryNode.hasRole(settings, DiscoveryNodeRole.MASTER_ROLE)) {
suffix = suffix + DiscoveryNodeRole.MASTER_ROLE.roleNameAbbreviation();
}
if (DiscoveryNode.hasRole(settings, DiscoveryNodeRole.DATA_ROLE)) {
if (DiscoveryNode.isDataNode(settings)) {
suffix = suffix + DiscoveryNodeRole.DATA_ROLE.roleNameAbbreviation();
}
if (DiscoveryNode.hasRole(settings, DiscoveryNodeRole.MASTER_ROLE) == false
&& DiscoveryNode.hasRole(settings, DiscoveryNodeRole.DATA_ROLE) == false) {
&& DiscoveryNode.isDataNode(settings) == false) {
suffix = suffix + "c";
}
}

View File

@ -0,0 +1,210 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.cluster.routing.allocation;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.xpack.core.DataTier;
import java.util.Arrays;
import java.util.Set;
import java.util.stream.Collectors;
/**
* The {@code DataTierAllocationDecider} is a custom allocation decider that behaves similar to the
* {@link org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider}, however it
* is specific to the {@code _tier} setting for both the cluster and index level.
*/
public class DataTierAllocationDecider extends AllocationDecider {
public static final String NAME = "data_tier";
public static final String CLUSTER_ROUTING_REQUIRE = "cluster.routing.allocation.require._tier";
public static final String CLUSTER_ROUTING_INCLUDE = "cluster.routing.allocation.include._tier";
public static final String CLUSTER_ROUTING_EXCLUDE = "cluster.routing.allocation.exclude._tier";
public static final String INDEX_ROUTING_REQUIRE = "index.routing.allocation.require._tier";
public static final String INDEX_ROUTING_INCLUDE = "index.routing.allocation.include._tier";
public static final String INDEX_ROUTING_EXCLUDE = "index.routing.allocation.exclude._tier";
public static final Setting<String> CLUSTER_ROUTING_REQUIRE_SETTING = Setting.simpleString(CLUSTER_ROUTING_REQUIRE,
DataTierAllocationDecider::validateTierSetting, Setting.Property.Dynamic, Setting.Property.NodeScope);
public static final Setting<String> CLUSTER_ROUTING_INCLUDE_SETTING = Setting.simpleString(CLUSTER_ROUTING_INCLUDE,
DataTierAllocationDecider::validateTierSetting, Setting.Property.Dynamic, Setting.Property.NodeScope);
public static final Setting<String> CLUSTER_ROUTING_EXCLUDE_SETTING = Setting.simpleString(CLUSTER_ROUTING_EXCLUDE,
DataTierAllocationDecider::validateTierSetting, Setting.Property.Dynamic, Setting.Property.NodeScope);
public static final Setting<String> INDEX_ROUTING_REQUIRE_SETTING = Setting.simpleString(INDEX_ROUTING_REQUIRE,
DataTierAllocationDecider::validateTierSetting, Setting.Property.Dynamic, Setting.Property.IndexScope);
public static final Setting<String> INDEX_ROUTING_INCLUDE_SETTING = Setting.simpleString(INDEX_ROUTING_INCLUDE,
DataTierAllocationDecider::validateTierSetting, Setting.Property.Dynamic, Setting.Property.IndexScope);
public static final Setting<String> INDEX_ROUTING_EXCLUDE_SETTING = Setting.simpleString(INDEX_ROUTING_EXCLUDE,
DataTierAllocationDecider::validateTierSetting, Setting.Property.Dynamic, Setting.Property.IndexScope);
private static void validateTierSetting(String setting) {
if (Strings.hasText(setting)) {
Set<String> invalidTiers = Arrays.stream(setting.split(","))
.filter(tier -> DataTier.validTierName(tier) == false)
.collect(Collectors.toSet());
if (invalidTiers.size() > 0) {
throw new IllegalArgumentException("invalid tier names: " + invalidTiers);
}
}
}
private volatile String clusterRequire = null;
private volatile String clusterInclude = null;
private volatile String clusterExclude = null;
public DataTierAllocationDecider(ClusterSettings clusterSettings) {
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_REQUIRE_SETTING, s -> this.clusterRequire = s);
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_INCLUDE_SETTING, s -> this.clusterInclude = s);
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_EXCLUDE_SETTING, s -> this.clusterExclude = s);
}
@Override
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
return shouldFilter(shardRouting, node.node(), allocation);
}
@Override
public Decision canAllocate(IndexMetadata indexMetadata, RoutingNode node, RoutingAllocation allocation) {
return shouldFilter(indexMetadata, node.node(), allocation);
}
@Override
public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
return shouldFilter(shardRouting, node.node(), allocation);
}
@Override
public Decision shouldAutoExpandToNode(IndexMetadata indexMetadata, DiscoveryNode node, RoutingAllocation allocation) {
Decision decision = shouldClusterFilter(node, allocation);
if (decision != null) {
return decision;
}
decision = shouldIndexFilter(indexMetadata, node, allocation);
if (decision != null) {
return decision;
}
return allocation.decision(Decision.YES, NAME, "node passes include/exclude/require tier filters");
}
private Decision shouldFilter(ShardRouting shardRouting, DiscoveryNode node, RoutingAllocation allocation) {
Decision decision = shouldClusterFilter(node, allocation);
if (decision != null) {
return decision;
}
decision = shouldIndexFilter(allocation.metadata().getIndexSafe(shardRouting.index()), node, allocation);
if (decision != null) {
return decision;
}
return allocation.decision(Decision.YES, NAME, "node passes include/exclude/require tier filters");
}
private Decision shouldFilter(IndexMetadata indexMd, DiscoveryNode node, RoutingAllocation allocation) {
Decision decision = shouldClusterFilter(node, allocation);
if (decision != null) {
return decision;
}
decision = shouldIndexFilter(indexMd, node, allocation);
if (decision != null) {
return decision;
}
return allocation.decision(Decision.YES, NAME, "node passes include/exclude/require tier filters");
}
private Decision shouldIndexFilter(IndexMetadata indexMd, DiscoveryNode node, RoutingAllocation allocation) {
Settings indexSettings = indexMd.getSettings();
String indexRequire = INDEX_ROUTING_REQUIRE_SETTING.get(indexSettings);
String indexInclude = INDEX_ROUTING_INCLUDE_SETTING.get(indexSettings);
String indexExclude = INDEX_ROUTING_EXCLUDE_SETTING.get(indexSettings);
if (Strings.hasText(indexRequire)) {
if (allocationAllowed(OpType.AND, indexRequire, node) == false) {
return allocation.decision(Decision.NO, NAME, "node does not match all index setting [%s] tier filters [%s]",
INDEX_ROUTING_REQUIRE, indexRequire);
}
}
if (Strings.hasText(indexInclude)) {
if (allocationAllowed(OpType.OR, indexInclude, node) == false) {
return allocation.decision(Decision.NO, NAME, "node does not match any index setting [%s] tier filters [%s]",
INDEX_ROUTING_INCLUDE, indexInclude);
}
}
if (Strings.hasText(indexExclude)) {
if (allocationAllowed(OpType.OR, indexExclude, node)) {
return allocation.decision(Decision.NO, NAME, "node matches any index setting [%s] tier filters [%s]",
INDEX_ROUTING_EXCLUDE, indexExclude);
}
}
return null;
}
private Decision shouldClusterFilter(DiscoveryNode node, RoutingAllocation allocation) {
if (Strings.hasText(clusterRequire)) {
if (allocationAllowed(OpType.AND, clusterRequire, node) == false) {
return allocation.decision(Decision.NO, NAME, "node does not match all cluster setting [%s] tier filters [%s]",
CLUSTER_ROUTING_REQUIRE, clusterRequire);
}
}
if (Strings.hasText(clusterInclude)) {
if (allocationAllowed(OpType.OR, clusterInclude, node) == false) {
return allocation.decision(Decision.NO, NAME, "node does not match any cluster setting [%s] tier filters [%s]",
CLUSTER_ROUTING_INCLUDE, clusterInclude);
}
}
if (Strings.hasText(clusterExclude)) {
if (allocationAllowed(OpType.OR, clusterExclude, node)) {
return allocation.decision(Decision.NO, NAME, "node matches any cluster setting [%s] tier filters [%s]",
CLUSTER_ROUTING_EXCLUDE, clusterExclude);
}
}
return null;
}
private enum OpType {
AND,
OR
}
private static boolean allocationAllowed(OpType opType, String tierSetting, DiscoveryNode node) {
String[] values = Strings.tokenizeToStringArray(tierSetting, ",");
for (String value : values) {
// generic "data" roles are considered to have all tiers
if (node.getRoles().contains(DiscoveryNodeRole.DATA_ROLE) ||
node.getRoles().stream().map(DiscoveryNodeRole::roleName).collect(Collectors.toSet()).contains(value)) {
if (opType == OpType.OR) {
return true;
}
} else {
if (opType == OpType.AND) {
return false;
}
}
}
if (opType == OpType.OR) {
return false;
} else {
return true;
}
}
}

View File

@ -0,0 +1,140 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
/**
* The {@code DataTier} class encapsulates the formalization of the "hot",
* "warm", "cold", and "frozen" tiers as node roles. In contains the roles
* themselves as well as helpers for validation and determining if a node has
* a tier configured.
*
* Related:
* {@link org.elasticsearch.xpack.cluster.routing.allocation.DataTierAllocationDecider}
*/
public class DataTier {
public static final String DATA_HOT = "data_hot";
public static final String DATA_WARM = "data_warm";
public static final String DATA_COLD = "data_cold";
public static final String DATA_FROZEN = "data_frozen";
/**
* Returns true if the given tier name is a valid tier
*/
public static boolean validTierName(String tierName) {
return DATA_HOT.equals(tierName) ||
DATA_WARM.equals(tierName) ||
DATA_COLD.equals(tierName) ||
DATA_FROZEN.equals(tierName);
}
/**
* Returns true iff the given settings have a data tier setting configured
*/
public static boolean isExplicitDataTier(Settings settings) {
/*
* This method can be called before the o.e.n.NodeRoleSettings.NODE_ROLES_SETTING is
* initialized. We do not want to trigger initialization prematurely because that will bake
* the default roles before plugins have had a chance to register them. Therefore,
* to avoid initializing this setting prematurely, we avoid using the actual node roles
* setting instance here in favor of the string.
*/
if (settings.hasValue("node.roles")) {
return settings.getAsList("node.roles").stream().anyMatch(DataTier::validTierName);
}
return false;
}
public static DiscoveryNodeRole DATA_HOT_NODE_ROLE = new DiscoveryNodeRole("data_hot", "h") {
@Override
public boolean isEnabledByDefault(final Settings settings) {
return false;
}
@Override
public Setting<Boolean> legacySetting() {
return null;
}
@Override
public boolean canContainData() {
return true;
}
};
public static DiscoveryNodeRole DATA_WARM_NODE_ROLE = new DiscoveryNodeRole("data_warm", "w") {
@Override
public boolean isEnabledByDefault(final Settings settings) {
return false;
}
@Override
public Setting<Boolean> legacySetting() {
return null;
}
@Override
public boolean canContainData() {
return true;
}
};
public static DiscoveryNodeRole DATA_COLD_NODE_ROLE = new DiscoveryNodeRole("data_cold", "c") {
@Override
public boolean isEnabledByDefault(final Settings settings) {
return false;
}
@Override
public Setting<Boolean> legacySetting() {
return null;
}
@Override
public boolean canContainData() {
return true;
}
};
public static DiscoveryNodeRole DATA_FROZEN_NODE_ROLE = new DiscoveryNodeRole("data_frozen", "f") {
@Override
public boolean isEnabledByDefault(final Settings settings) {
return false;
}
@Override
public Setting<Boolean> legacySetting() {
return null;
}
@Override
public boolean canContainData() {
return true;
}
};
public static boolean isHotNode(DiscoveryNode discoveryNode) {
return discoveryNode.getRoles().contains(DATA_HOT_NODE_ROLE) || discoveryNode.getRoles().contains(DiscoveryNodeRole.DATA_ROLE);
}
public static boolean isWarmNode(DiscoveryNode discoveryNode) {
return discoveryNode.getRoles().contains(DATA_WARM_NODE_ROLE) || discoveryNode.getRoles().contains(DiscoveryNodeRole.DATA_ROLE);
}
public static boolean isColdNode(DiscoveryNode discoveryNode) {
return discoveryNode.getRoles().contains(DATA_COLD_NODE_ROLE) || discoveryNode.getRoles().contains(DiscoveryNodeRole.DATA_ROLE);
}
public static boolean isFrozenNode(DiscoveryNode discoveryNode) {
return discoveryNode.getRoles().contains(DATA_FROZEN_NODE_ROLE) || discoveryNode.getRoles().contains(DiscoveryNodeRole.DATA_ROLE);
}
}

View File

@ -20,7 +20,9 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.inject.Binder;
@ -45,6 +47,7 @@ import org.elasticsearch.license.LicensesMetadata;
import org.elasticsearch.license.Licensing;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.persistent.PersistentTaskParams;
import org.elasticsearch.plugins.ClusterPlugin;
import org.elasticsearch.plugins.EnginePlugin;
import org.elasticsearch.plugins.ExtensiblePlugin;
import org.elasticsearch.plugins.RepositoryPlugin;
@ -56,6 +59,7 @@ import org.elasticsearch.script.ScriptService;
import org.elasticsearch.snapshots.SourceOnlySnapshotRepository;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.ResourceWatcherService;
import org.elasticsearch.xpack.cluster.routing.allocation.DataTierAllocationDecider;
import org.elasticsearch.xpack.core.action.ReloadAnalyzerAction;
import org.elasticsearch.xpack.core.action.TransportReloadAnalyzersAction;
import org.elasticsearch.xpack.core.action.TransportXPackInfoAction;
@ -80,17 +84,20 @@ import java.security.AccessController;
import java.security.PrivilegedAction;
import java.time.Clock;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
public class XPackPlugin extends XPackClientPlugin implements ExtensiblePlugin, RepositoryPlugin, EnginePlugin {
public class XPackPlugin extends XPackClientPlugin implements ExtensiblePlugin, RepositoryPlugin, EnginePlugin, ClusterPlugin {
private static final Logger logger = LogManager.getLogger(XPackPlugin.class);
private static final DeprecationLogger deprecationLogger = new DeprecationLogger(logger);
@ -397,9 +404,29 @@ public class XPackPlugin extends XPackClientPlugin implements ExtensiblePlugin,
public List<Setting<?>> getSettings() {
List<Setting<?>> settings = super.getSettings();
settings.add(SourceOnlySnapshotRepository.SOURCE_ONLY);
settings.add(DataTierAllocationDecider.CLUSTER_ROUTING_REQUIRE_SETTING);
settings.add(DataTierAllocationDecider.CLUSTER_ROUTING_INCLUDE_SETTING);
settings.add(DataTierAllocationDecider.CLUSTER_ROUTING_EXCLUDE_SETTING);
settings.add(DataTierAllocationDecider.INDEX_ROUTING_REQUIRE_SETTING);
settings.add(DataTierAllocationDecider.INDEX_ROUTING_INCLUDE_SETTING);
settings.add(DataTierAllocationDecider.INDEX_ROUTING_EXCLUDE_SETTING);
return settings;
}
@Override
public Set<DiscoveryNodeRole> getRoles() {
return new HashSet<>(Arrays.asList(
DataTier.DATA_HOT_NODE_ROLE,
DataTier.DATA_WARM_NODE_ROLE,
DataTier.DATA_COLD_NODE_ROLE,
DataTier.DATA_FROZEN_NODE_ROLE));
}
@Override
public Collection<AllocationDecider> createAllocationDeciders(Settings settings, ClusterSettings clusterSettings) {
return Collections.singleton(new DataTierAllocationDecider(clusterSettings));
}
/**
* Handles the creation of the SSLService along with the necessary actions to enable reloading
* of SSLContexts when configuration files change on disk.

View File

@ -0,0 +1,352 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.cluster.routing.allocation;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ESAllocationTestCase;
import org.elasticsearch.cluster.EmptyClusterInfoService;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.cluster.routing.allocation.decider.ReplicaAfterPrimaryActiveAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.SameShardAllocationDecider;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.gateway.TestGatewayAllocator;
import org.elasticsearch.xpack.core.DataTier;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
public class DataTierAllocationDeciderTests extends ESAllocationTestCase {
private static final Set<Setting<?>> ALL_SETTINGS;
private static final DiscoveryNode HOT_NODE = newNode("node-hot", Collections.singleton(DataTier.DATA_HOT_NODE_ROLE));
private static final DiscoveryNode WARM_NODE = newNode("node-warm", Collections.singleton(DataTier.DATA_WARM_NODE_ROLE));
private static final DiscoveryNode COLD_NODE = newNode("node-cold", Collections.singleton(DataTier.DATA_COLD_NODE_ROLE));
private static final DiscoveryNode FROZEN_NODE = newNode("node-frozen", Collections.singleton(DataTier.DATA_FROZEN_NODE_ROLE));
private static final DiscoveryNode DATA_NODE = newNode("node-data", Collections.singleton(DiscoveryNodeRole.DATA_ROLE));
private final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ALL_SETTINGS);
private final DataTierAllocationDecider decider = new DataTierAllocationDecider(clusterSettings);
private final AllocationDeciders allocationDeciders = new AllocationDeciders(
Arrays.asList(decider,
new SameShardAllocationDecider(Settings.EMPTY, clusterSettings),
new ReplicaAfterPrimaryActiveAllocationDecider()));
private final AllocationService service = new AllocationService(allocationDeciders,
new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), EmptyClusterInfoService.INSTANCE);
private final ShardRouting shard = ShardRouting.newUnassigned(new ShardId("myindex", "myindex", 0), true,
RecoverySource.EmptyStoreRecoverySource.INSTANCE, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "index created"));
static {
Set<Setting<?>> allSettings = new HashSet<>(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
allSettings.add(DataTierAllocationDecider.CLUSTER_ROUTING_REQUIRE_SETTING);
allSettings.add(DataTierAllocationDecider.CLUSTER_ROUTING_INCLUDE_SETTING);
allSettings.add(DataTierAllocationDecider.CLUSTER_ROUTING_EXCLUDE_SETTING);
ALL_SETTINGS = allSettings;
}
public void testClusterRequires() {
ClusterState state = prepareState(service.reroute(ClusterState.EMPTY_STATE, "initial state"));
RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, state.getRoutingNodes(), state,
null, 0);
allocation.debugDecision(true);
clusterSettings.applySettings(Settings.builder()
.put(DataTierAllocationDecider.CLUSTER_ROUTING_REQUIRE, "data_hot")
.build());
Decision d;
RoutingNode node;
for (DiscoveryNode n : Arrays.asList(HOT_NODE, DATA_NODE)) {
node = new RoutingNode(n.getId(), n, shard);
d = decider.canAllocate(shard, node, allocation);
assertThat(d.type(), equalTo(Decision.Type.YES));
d = decider.canRemain(shard, node, allocation);
assertThat(d.type(), equalTo(Decision.Type.YES));
}
for (DiscoveryNode n : Arrays.asList(WARM_NODE, COLD_NODE, FROZEN_NODE)) {
node = new RoutingNode(n.getId(), n, shard);
d = decider.canAllocate(shard, node, allocation);
assertThat(d.type(), equalTo(Decision.Type.NO));
assertThat(d.getExplanation(),
containsString("node does not match all cluster setting [cluster.routing.allocation.require._tier] " +
"tier filters [data_hot]"));
d = decider.canRemain(shard, node, allocation);
assertThat(d.type(), equalTo(Decision.Type.NO));
assertThat(d.getExplanation(),
containsString("node does not match all cluster setting [cluster.routing.allocation.require._tier] " +
"tier filters [data_hot]"));
}
}
public void testClusterIncludes() {
ClusterState state = prepareState(service.reroute(ClusterState.EMPTY_STATE, "initial state"));
RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, state.getRoutingNodes(), state,
null, 0);
allocation.debugDecision(true);
clusterSettings.applySettings(Settings.builder()
.put(DataTierAllocationDecider.CLUSTER_ROUTING_INCLUDE, "data_warm,data_frozen")
.build());
Decision d;
RoutingNode node;
for (DiscoveryNode n : Arrays.asList(WARM_NODE, FROZEN_NODE, DATA_NODE)) {
node = new RoutingNode(n.getId(), n, shard);
d = decider.canAllocate(shard, node, allocation);
assertThat(d.type(), equalTo(Decision.Type.YES));
d = decider.canRemain(shard, node, allocation);
assertThat(d.type(), equalTo(Decision.Type.YES));
}
for (DiscoveryNode n : Arrays.asList(HOT_NODE, COLD_NODE)) {
node = new RoutingNode(n.getId(), n, shard);
d = decider.canAllocate(shard, node, allocation);
assertThat(d.type(), equalTo(Decision.Type.NO));
assertThat(d.getExplanation(),
containsString("node does not match any cluster setting [cluster.routing.allocation.include._tier] " +
"tier filters [data_warm,data_frozen]"));
d = decider.canRemain(shard, node, allocation);
assertThat(d.type(), equalTo(Decision.Type.NO));
assertThat(d.getExplanation(),
containsString("node does not match any cluster setting [cluster.routing.allocation.include._tier] " +
"tier filters [data_warm,data_frozen]"));
}
}
public void testClusterExcludes() {
ClusterState state = prepareState(service.reroute(ClusterState.EMPTY_STATE, "initial state"));
RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, state.getRoutingNodes(), state,
null, 0);
allocation.debugDecision(true);
clusterSettings.applySettings(Settings.builder()
.put(DataTierAllocationDecider.CLUSTER_ROUTING_EXCLUDE, "data_warm,data_frozen")
.build());
Decision d;
RoutingNode node;
for (DiscoveryNode n : Arrays.asList(WARM_NODE, FROZEN_NODE, DATA_NODE)) {
node = new RoutingNode(n.getId(), n, shard);
d = decider.canAllocate(shard, node, allocation);
assertThat(d.type(), equalTo(Decision.Type.NO));
assertThat(d.getExplanation(),
containsString("node matches any cluster setting [cluster.routing.allocation.exclude._tier] " +
"tier filters [data_warm,data_frozen]"));
d = decider.canRemain(shard, node, allocation);
assertThat(d.type(), equalTo(Decision.Type.NO));
assertThat(d.getExplanation(),
containsString("node matches any cluster setting [cluster.routing.allocation.exclude._tier] " +
"tier filters [data_warm,data_frozen]"));
}
for (DiscoveryNode n : Arrays.asList(HOT_NODE, COLD_NODE)) {
node = new RoutingNode(n.getId(), n, shard);
d = decider.canAllocate(shard, node, allocation);
assertThat(n.toString(), d.type(), equalTo(Decision.Type.YES));
d = decider.canRemain(shard, node, allocation);
assertThat(n.toString(), d.type(), equalTo(Decision.Type.YES));
}
}
public void testIndexRequires() {
ClusterState state = prepareState(service.reroute(ClusterState.EMPTY_STATE, "initial state"),
Settings.builder()
.put(DataTierAllocationDecider.INDEX_ROUTING_REQUIRE, "data_hot")
.build());
RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, state.getRoutingNodes(), state,
null, 0);
allocation.debugDecision(true);
Decision d;
RoutingNode node;
for (DiscoveryNode n : Arrays.asList(HOT_NODE, DATA_NODE)) {
node = new RoutingNode(n.getId(), n, shard);
d = decider.canAllocate(shard, node, allocation);
assertThat(d.type(), equalTo(Decision.Type.YES));
d = decider.canRemain(shard, node, allocation);
assertThat(d.type(), equalTo(Decision.Type.YES));
}
for (DiscoveryNode n : Arrays.asList(WARM_NODE, COLD_NODE, FROZEN_NODE)) {
node = new RoutingNode(n.getId(), n, shard);
d = decider.canAllocate(shard, node, allocation);
assertThat(d.type(), equalTo(Decision.Type.NO));
assertThat(d.getExplanation(),
containsString("node does not match all index setting [index.routing.allocation.require._tier] tier filters [data_hot]"));
d = decider.canRemain(shard, node, allocation);
assertThat(d.type(), equalTo(Decision.Type.NO));
assertThat(d.getExplanation(),
containsString("node does not match all index setting [index.routing.allocation.require._tier] tier filters [data_hot]"));
}
}
public void testIndexIncludes() {
ClusterState state = prepareState(service.reroute(ClusterState.EMPTY_STATE, "initial state"),
Settings.builder()
.put(DataTierAllocationDecider.INDEX_ROUTING_INCLUDE, "data_warm,data_frozen")
.build());
RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, state.getRoutingNodes(), state,
null, 0);
allocation.debugDecision(true);
Decision d;
RoutingNode node;
for (DiscoveryNode n : Arrays.asList(WARM_NODE, FROZEN_NODE, DATA_NODE)) {
node = new RoutingNode(n.getId(), n, shard);
d = decider.canAllocate(shard, node, allocation);
assertThat(d.type(), equalTo(Decision.Type.YES));
d = decider.canRemain(shard, node, allocation);
assertThat(d.type(), equalTo(Decision.Type.YES));
}
for (DiscoveryNode n : Arrays.asList(HOT_NODE, COLD_NODE)) {
node = new RoutingNode(n.getId(), n, shard);
d = decider.canAllocate(shard, node, allocation);
assertThat(d.type(), equalTo(Decision.Type.NO));
assertThat(d.getExplanation(),
containsString("node does not match any index setting [index.routing.allocation.include._tier] " +
"tier filters [data_warm,data_frozen]"));
d = decider.canRemain(shard, node, allocation);
assertThat(d.type(), equalTo(Decision.Type.NO));
assertThat(d.getExplanation(),
containsString("node does not match any index setting [index.routing.allocation.include._tier] " +
"tier filters [data_warm,data_frozen]"));
}
}
public void testIndexExcludes() {
ClusterState state = prepareState(service.reroute(ClusterState.EMPTY_STATE, "initial state"),
Settings.builder()
.put(DataTierAllocationDecider.INDEX_ROUTING_EXCLUDE, "data_warm,data_frozen")
.build());
RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, state.getRoutingNodes(), state,
null, 0);
allocation.debugDecision(true);
Decision d;
RoutingNode node;
for (DiscoveryNode n : Arrays.asList(WARM_NODE, FROZEN_NODE, DATA_NODE)) {
node = new RoutingNode(n.getId(), n, shard);
d = decider.canAllocate(shard, node, allocation);
assertThat(d.type(), equalTo(Decision.Type.NO));
assertThat(d.getExplanation(),
containsString("node matches any index setting [index.routing.allocation.exclude._tier] " +
"tier filters [data_warm,data_frozen]"));
d = decider.canRemain(shard, node, allocation);
assertThat(d.type(), equalTo(Decision.Type.NO));
assertThat(d.getExplanation(),
containsString("node matches any index setting [index.routing.allocation.exclude._tier] " +
"tier filters [data_warm,data_frozen]"));
}
for (DiscoveryNode n : Arrays.asList(HOT_NODE, COLD_NODE)) {
node = new RoutingNode(n.getId(), n, shard);
d = decider.canAllocate(shard, node, allocation);
assertThat(n.toString(), d.type(), equalTo(Decision.Type.YES));
d = decider.canRemain(shard, node, allocation);
assertThat(n.toString(), d.type(), equalTo(Decision.Type.YES));
}
}
public void testClusterAndIndex() {
ClusterState state = prepareState(service.reroute(ClusterState.EMPTY_STATE, "initial state"),
Settings.builder()
.put(DataTierAllocationDecider.INDEX_ROUTING_INCLUDE, "data_warm,data_frozen")
.build());
RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, state.getRoutingNodes(), state,
null, 0);
clusterSettings.applySettings(Settings.builder()
.put(DataTierAllocationDecider.CLUSTER_ROUTING_EXCLUDE, "data_frozen")
.build());
allocation.debugDecision(true);
Decision d;
RoutingNode node;
for (DiscoveryNode n : Arrays.asList(HOT_NODE, COLD_NODE)) {
node = new RoutingNode(n.getId(), n, shard);
d = decider.canAllocate(shard, node, allocation);
assertThat(node.toString(), d.type(), equalTo(Decision.Type.NO));
assertThat(node.toString(), d.getExplanation(),
containsString("node does not match any index setting [index.routing.allocation.include._tier] " +
"tier filters [data_warm,data_frozen]"));
d = decider.canRemain(shard, node, allocation);
assertThat(node.toString(), d.type(), equalTo(Decision.Type.NO));
assertThat(node.toString(), d.getExplanation(),
containsString("node does not match any index setting [index.routing.allocation.include._tier] " +
"tier filters [data_warm,data_frozen]"));
}
for (DiscoveryNode n : Arrays.asList(FROZEN_NODE, DATA_NODE)) {
node = new RoutingNode(n.getId(), n, shard);
d = decider.canAllocate(shard, node, allocation);
assertThat(node.toString(), d.type(), equalTo(Decision.Type.NO));
assertThat(d.getExplanation(),
containsString("node matches any cluster setting [cluster.routing.allocation.exclude._tier] tier filters [data_frozen]"));
d = decider.canRemain(shard, node, allocation);
assertThat(node.toString(), d.type(), equalTo(Decision.Type.NO));
assertThat(d.getExplanation(),
containsString("node matches any cluster setting [cluster.routing.allocation.exclude._tier] tier filters [data_frozen]"));
}
for (DiscoveryNode n : Arrays.asList(WARM_NODE)) {
node = new RoutingNode(n.getId(), n, shard);
d = decider.canAllocate(shard, node, allocation);
assertThat(n.toString(), d.type(), equalTo(Decision.Type.YES));
d = decider.canRemain(shard, node, allocation);
assertThat(n.toString(), d.type(), equalTo(Decision.Type.YES));
}
}
private ClusterState prepareState(ClusterState initialState) {
return prepareState(initialState, Settings.EMPTY);
}
private ClusterState prepareState(ClusterState initialState, Settings indexSettings) {
return ClusterState.builder(initialState)
.nodes(DiscoveryNodes.builder()
.add(HOT_NODE)
.add(WARM_NODE)
.add(COLD_NODE)
.add(FROZEN_NODE)
.add(DATA_NODE)
.build())
.metadata(Metadata.builder()
.put(IndexMetadata.builder("myindex")
.settings(Settings.builder()
.put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetadata.SETTING_INDEX_UUID, "myindex")
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(indexSettings)
.build()))
.build())
.build();
}
}

View File

@ -0,0 +1,129 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.test.ESTestCase;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.StreamSupport;
import static org.hamcrest.Matchers.arrayContainingInAnyOrder;
public class DataTierTests extends ESTestCase {
private static final AtomicInteger idGenerator = new AtomicInteger();
public void testNodeSelection() {
DiscoveryNodes discoveryNodes = buildDiscoveryNodes();
final String[] dataNodes =
StreamSupport.stream(discoveryNodes.getNodes().values().spliterator(), false)
.map(n -> n.value)
.filter(DiscoveryNode::isDataNode)
.map(DiscoveryNode::getId)
.toArray(String[]::new);
final String[] hotNodes =
StreamSupport.stream(discoveryNodes.getNodes().values().spliterator(), false)
.map(n -> n.value)
.filter(DataTier::isHotNode)
.map(DiscoveryNode::getId)
.toArray(String[]::new);
final String[] warmNodes =
StreamSupport.stream(discoveryNodes.getNodes().values().spliterator(), false)
.map(n -> n.value)
.filter(DataTier::isWarmNode)
.map(DiscoveryNode::getId)
.toArray(String[]::new);
final String[] coldNodes =
StreamSupport.stream(discoveryNodes.getNodes().values().spliterator(), false)
.map(n -> n.value)
.filter(DataTier::isColdNode)
.map(DiscoveryNode::getId)
.toArray(String[]::new);
final String[] frozenNodes =
StreamSupport.stream(discoveryNodes.getNodes().values().spliterator(), false)
.map(n -> n.value)
.filter(DataTier::isFrozenNode)
.map(DiscoveryNode::getId)
.toArray(String[]::new);
assertThat(discoveryNodes.resolveNodes("data:true"), arrayContainingInAnyOrder(dataNodes));
assertThat(discoveryNodes.resolveNodes("data_hot:true"), arrayContainingInAnyOrder(hotNodes));
assertThat(discoveryNodes.resolveNodes("data_warm:true"), arrayContainingInAnyOrder(warmNodes));
assertThat(discoveryNodes.resolveNodes("data_cold:true"), arrayContainingInAnyOrder(coldNodes));
assertThat(discoveryNodes.resolveNodes("data_frozen:true"), arrayContainingInAnyOrder(frozenNodes));
Set<String> allTiers = new HashSet<>(Arrays.asList(hotNodes));
allTiers.addAll(Arrays.asList(warmNodes));
allTiers.addAll(Arrays.asList(coldNodes));
allTiers.addAll(Arrays.asList(frozenNodes));
assertThat(discoveryNodes.resolveNodes("data:true"), arrayContainingInAnyOrder(allTiers.toArray(Strings.EMPTY_ARRAY)));
}
private static DiscoveryNodes buildDiscoveryNodes() {
int numNodes = randomIntBetween(3, 15);
DiscoveryNodes.Builder discoBuilder = DiscoveryNodes.builder();
List<DiscoveryNode> nodesList = randomNodes(numNodes);
for (DiscoveryNode node : nodesList) {
discoBuilder = discoBuilder.add(node);
}
discoBuilder.localNodeId(randomFrom(nodesList).getId());
discoBuilder.masterNodeId(randomFrom(nodesList).getId());
return discoBuilder.build();
}
private static DiscoveryNode newNode(int nodeId, Map<String, String> attributes, Set<DiscoveryNodeRole> roles) {
return new DiscoveryNode("name_" + nodeId, "node_" + nodeId, buildNewFakeTransportAddress(), attributes, roles,
Version.CURRENT);
}
private static List<DiscoveryNode> randomNodes(final int numNodes) {
Set<DiscoveryNodeRole> allRoles = new HashSet<>(DiscoveryNodeRole.BUILT_IN_ROLES);
allRoles.remove(DiscoveryNodeRole.DATA_ROLE);
allRoles.add(DataTier.DATA_HOT_NODE_ROLE);
allRoles.add(DataTier.DATA_WARM_NODE_ROLE);
allRoles.add(DataTier.DATA_COLD_NODE_ROLE);
allRoles.add(DataTier.DATA_FROZEN_NODE_ROLE);
List<DiscoveryNode> nodesList = new ArrayList<>();
for (int i = 0; i < numNodes; i++) {
Map<String, String> attributes = new HashMap<>();
if (frequently()) {
attributes.put("custom", randomBoolean() ? "match" : randomAlphaOfLengthBetween(3, 5));
}
final Set<DiscoveryNodeRole> roles = new HashSet<>(randomSubsetOf(allRoles));
if (frequently()) {
roles.add(new DiscoveryNodeRole("custom_role", "cr") {
@Override
public Setting<Boolean> legacySetting() {
return null;
}
});
}
final DiscoveryNode node = newNode(idGenerator.getAndIncrement(), attributes, roles);
nodesList.add(node);
}
return nodesList;
}
}

View File

@ -47,6 +47,7 @@ import org.elasticsearch.threadpool.ExecutorBuilder;
import org.elasticsearch.threadpool.FixedExecutorBuilder;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.ResourceWatcherService;
import org.elasticsearch.xpack.core.DataTier;
import org.elasticsearch.xpack.core.XPackPlugin;
import org.elasticsearch.xpack.core.scheduler.SchedulerEngine;
import org.elasticsearch.xpack.core.transform.TransformNamedXContentProvider;
@ -157,7 +158,9 @@ public class Transform extends Plugin implements SystemIndexPlugin, PersistentTa
*/
private static final Setting<Boolean> TRANSFORM_ENABLED_NODE = Setting.boolSetting(
"node.transform",
settings -> Boolean.toString(DiscoveryNode.isDataNode(settings)),
settings ->
// Don't use DiscoveryNode#isDataNode(Settings) here, as it is called before all plugins are initialized
Boolean.toString(DiscoveryNode.hasRole(settings, DiscoveryNodeRole.DATA_ROLE) || DataTier.isExplicitDataTier(settings)),
Property.Deprecated,
Property.NodeScope
);
@ -171,7 +174,9 @@ public class Transform extends Plugin implements SystemIndexPlugin, PersistentTa
@Override
public boolean isEnabledByDefault(final Settings settings) {
return super.isEnabledByDefault(settings) && DiscoveryNode.isDataNode(settings);
return super.isEnabledByDefault(settings) &&
// Don't use DiscoveryNode#isDataNode(Settings) here, as it is called before all plugins are initialized
(DiscoveryNode.hasRole(settings, DiscoveryNodeRole.DATA_ROLE) || DataTier.isExplicitDataTier(settings));
}
};