Adds Allocate lifecycle action (#3484)

* Adds Allocate lifcycle action

* Addresses review comments

Still need to make a change in core for the FilterAllocationDecider to make the execute logic simpler

* Addresses more review comments

* Adds randomMap method to AllocateActionTests

* Addresses further review comments
This commit is contained in:
Colin Goodheart-Smithe 2018-01-17 13:27:40 +00:00 committed by GitHub
parent db3a88f487
commit ff883d574b
3 changed files with 588 additions and 14 deletions

View File

@ -5,38 +5,99 @@
*/
package org.elasticsearch.xpack.indexlifecycle;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsResponse;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.logging.ESLoggerFactory;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexNotFoundException;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.BiConsumer;
/**
* A {@link LifecycleAction} which reroutes shards from one allocation to another.
*/
public class AllocateAction implements LifecycleAction {
public static final String NAME = "allocate";
private static final ObjectParser<AllocateAction, Void> PARSER = new ObjectParser<>(NAME, AllocateAction::new);
public static final String NAME = "allocate";
public static final ParseField INCLUDE_FIELD = new ParseField("include");
public static final ParseField EXCLUDE_FIELD = new ParseField("exclude");
public static final ParseField REQUIRE_FIELD = new ParseField("require");
private static final Logger logger = ESLoggerFactory.getLogger(AllocateAction.class);
@SuppressWarnings("unchecked")
private static final ConstructingObjectParser<AllocateAction, Void> PARSER = new ConstructingObjectParser<>(NAME,
a -> new AllocateAction((Map<String, String>) a[0], (Map<String, String>) a[1], (Map<String, String>) a[2]));
static {
PARSER.declareObject(ConstructingObjectParser.constructorArg(), (p, c) -> p.mapStrings(), INCLUDE_FIELD);
PARSER.declareObject(ConstructingObjectParser.constructorArg(), (p, c) -> p.mapStrings(), EXCLUDE_FIELD);
PARSER.declareObject(ConstructingObjectParser.constructorArg(), (p, c) -> p.mapStrings(), REQUIRE_FIELD);
}
private final Map<String, String> include;
private final Map<String, String> exclude;
private final Map<String, String> require;
private AllocationDeciders allocationDeciders;
public static AllocateAction parse(XContentParser parser) {
return PARSER.apply(parser, null);
}
public AllocateAction() {
public AllocateAction(Map<String, String> include, Map<String, String> exclude, Map<String, String> require) {
this.include = include;
this.exclude = exclude;
this.require = require;
FilterAllocationDecider decider = new FilterAllocationDecider(Settings.EMPTY,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS));
this.allocationDeciders = new AllocationDeciders(Settings.EMPTY, Collections.singletonList(decider));
}
@SuppressWarnings("unchecked")
public AllocateAction(StreamInput in) throws IOException {
this((Map<String, String>) in.readGenericValue(), (Map<String, String>) in.readGenericValue(),
(Map<String, String>) in.readGenericValue());
}
public Map<String, String> getInclude() {
return include;
}
public Map<String, String> getExclude() {
return exclude;
}
public Map<String, String> getRequire() {
return require;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeGenericValue(include);
out.writeGenericValue(exclude);
out.writeGenericValue(require);
}
@Override
@ -47,19 +108,103 @@ public class AllocateAction implements LifecycleAction {
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(INCLUDE_FIELD.getPreferredName(), include);
builder.field(EXCLUDE_FIELD.getPreferredName(), exclude);
builder.field(REQUIRE_FIELD.getPreferredName(), require);
builder.endObject();
return builder;
}
/**
* Inspects the <code>existingSettings</code> and adds any attributes that
* are missing for the given <code>settingsPrefix</code> to the
* <code>newSettingsBuilder</code>.
*/
private void addMissingAttrs(Map<String, String> newAttrs, String settingPrefix, Settings existingSettings,
Settings.Builder newSettingsBuilder) {
newAttrs.entrySet().stream().filter(e -> {
String existingValue = existingSettings.get(settingPrefix + e.getKey());
return existingValue == null || (existingValue.equals(e.getValue()) == false);
}).forEach(e -> newSettingsBuilder.put(settingPrefix + e.getKey(), e.getValue()));
}
void execute(Index index, BiConsumer<Settings, Listener> settingsUpdater, ClusterState clusterState, ClusterSettings clusterSettings,
Listener listener) {
// We only want to make progress if all shards are active so check that
// first
if (ActiveShardCount.ALL.enoughShardsActive(clusterState, index.getName()) == false) {
logger.debug("[{}] lifecycle action for index [{}] cannot make progress because not all shards are active", NAME,
index.getName());
listener.onSuccess(false);
return;
}
IndexMetaData idxMeta = clusterState.metaData().index(index);
if (idxMeta == null) {
listener.onFailure(
new IndexNotFoundException("Index not found when executing " + NAME + " lifecycle action.", index.getName()));
return;
}
Settings existingSettings = idxMeta.getSettings();
Settings.Builder newSettings = Settings.builder();
addMissingAttrs(include, IndexMetaData.INDEX_ROUTING_INCLUDE_GROUP_SETTING.getKey(), existingSettings, newSettings);
addMissingAttrs(exclude, IndexMetaData.INDEX_ROUTING_EXCLUDE_GROUP_SETTING.getKey(), existingSettings, newSettings);
addMissingAttrs(require, IndexMetaData.INDEX_ROUTING_REQUIRE_GROUP_SETTING.getKey(), existingSettings, newSettings);
Settings newAllocationIncludes = newSettings.build();
if (newAllocationIncludes.isEmpty()) {
// All the allocation attributes are already set so just need to
// check if the allocation has happened
RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, clusterState.getRoutingNodes(), clusterState, null,
System.nanoTime());
int allocationPendingShards = 0;
List<ShardRouting> allShards = clusterState.getRoutingTable().allShards(index.getName());
for (ShardRouting shardRouting : allShards) {
assert shardRouting.active() : "Shard not active, found " + shardRouting.state() + "for shard with id: "
+ shardRouting.shardId();
String currentNodeId = shardRouting.currentNodeId();
boolean canRemainOnCurrentNode = allocationDeciders.canRemain(shardRouting,
clusterState.getRoutingNodes().node(currentNodeId), allocation).type() == Decision.Type.YES;
if (canRemainOnCurrentNode == false) {
allocationPendingShards++;
}
}
if (allocationPendingShards > 0) {
logger.debug("[{}] lifecycle action for index [{}] waiting for [{}] shards "
+ "to be allocated to nodes matching the given filters", NAME, index.getName(), allocationPendingShards);
listener.onSuccess(false);
} else {
logger.debug("[{}] lifecycle action for index [{}] complete", NAME, index.getName());
listener.onSuccess(true);
}
} else {
// We have some allocation attributes to set
settingsUpdater.accept(newAllocationIncludes, listener);
}
}
@Override
public void execute(Index index, Client client, ClusterService clusterService, Listener listener) {
// NORELEASE: stub
listener.onSuccess(true);
ClusterState clusterState = clusterService.state();
BiConsumer<Settings, Listener> settingsUpdater = (s, l) -> {
client.admin().indices().updateSettings(new UpdateSettingsRequest(s, index.getName()),
new ActionListener<UpdateSettingsResponse>() {
@Override
public void onResponse(UpdateSettingsResponse updateSettingsResponse) {
l.onSuccess(false);
}
@Override
public void onFailure(Exception e) {
l.onFailure(e);
}
});
};
execute(index, settingsUpdater, clusterState, clusterService.getClusterSettings(), listener);
}
@Override
public int hashCode() {
return 1;
return Objects.hash(include, exclude, require);
}
@Override
@ -70,12 +215,12 @@ public class AllocateAction implements LifecycleAction {
if (obj.getClass() != getClass()) {
return false;
}
return true;
AllocateAction other = (AllocateAction) obj;
return Objects.equals(include, other.include) && Objects.equals(exclude, other.exclude) && Objects.equals(require, other.require);
}
@Override
public String toString() {
return Strings.toString(this);
}
}

View File

@ -5,11 +5,40 @@
*/
package org.elasticsearch.xpack.indexlifecycle;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.routing.UnassignedInfo.Reason;
import org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.io.stream.Writeable.Reader;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.node.Node;
import org.elasticsearch.test.AbstractSerializingTestCase;
import org.elasticsearch.xpack.indexlifecycle.LifecycleAction.Listener;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.function.BiConsumer;
import static org.hamcrest.Matchers.instanceOf;
public class AllocateActionTests extends AbstractSerializingTestCase<AllocateAction> {
@ -20,11 +49,411 @@ public class AllocateActionTests extends AbstractSerializingTestCase<AllocateAct
@Override
protected AllocateAction createTestInstance() {
return new AllocateAction();
Map<String, String> includes = randomMap(0, 100);
Map<String, String> excludes = randomMap(0, 100);
Map<String, String> requires = randomMap(0, 100);
return new AllocateAction(includes, excludes, requires);
}
@Override
protected Reader<AllocateAction> instanceReader() {
return AllocateAction::new;
}
@Override
protected AllocateAction mutateInstance(AllocateAction instance) throws IOException {
Map<String, String> include = instance.getInclude();
Map<String, String> exclude = instance.getExclude();
Map<String, String> require = instance.getRequire();
switch (randomIntBetween(0, 2)) {
case 0:
include = new HashMap<>(include);
include.put(randomAlphaOfLengthBetween(11, 15), randomAlphaOfLengthBetween(1, 20));
break;
case 1:
exclude = new HashMap<>(exclude);
exclude.put(randomAlphaOfLengthBetween(11, 15), randomAlphaOfLengthBetween(1, 20));
break;
case 2:
require = new HashMap<>(require);
require.put(randomAlphaOfLengthBetween(11, 15), randomAlphaOfLengthBetween(1, 20));
break;
default:
throw new AssertionError("Illegal randomisation branch");
}
return new AllocateAction(include, exclude, require);
}
public void testExecuteNoExistingSettings() throws Exception {
Map<String, String> includes = randomMap(1, 5);
Map<String, String> excludes = randomMap(1, 5);
Map<String, String> requires = randomMap(1, 5);
Settings.Builder existingSettings = Settings.builder().put("index.version.created", Version.CURRENT.id);
Settings.Builder expectedSettings = Settings.builder();
includes.forEach((k, v) -> expectedSettings.put(IndexMetaData.INDEX_ROUTING_INCLUDE_GROUP_SETTING.getKey() + k, v));
excludes.forEach((k, v) -> expectedSettings.put(IndexMetaData.INDEX_ROUTING_EXCLUDE_GROUP_SETTING.getKey() + k, v));
requires.forEach((k, v) -> expectedSettings.put(IndexMetaData.INDEX_ROUTING_REQUIRE_GROUP_SETTING.getKey() + k, v));
AllocateAction action = new AllocateAction(includes, excludes, requires);
assertSettingsUpdate(action, existingSettings, expectedSettings.build());
}
public void testExecuteSettingsUnassignedShards() throws Exception {
Map<String, String> includes = randomMap(1, 5);
Map<String, String> excludes = randomMap(1, 5);
Map<String, String> requires = randomMap(1, 5);
Settings.Builder existingSettings = Settings.builder().put("index.version.created", Version.CURRENT.id);
Settings.Builder expectedSettings = Settings.builder();
includes.forEach((k, v) -> expectedSettings.put(IndexMetaData.INDEX_ROUTING_INCLUDE_GROUP_SETTING.getKey() + k, v));
excludes.forEach((k, v) -> expectedSettings.put(IndexMetaData.INDEX_ROUTING_EXCLUDE_GROUP_SETTING.getKey() + k, v));
requires.forEach((k, v) -> expectedSettings.put(IndexMetaData.INDEX_ROUTING_REQUIRE_GROUP_SETTING.getKey() + k, v));
AllocateAction action = new AllocateAction(includes, excludes, requires);
assertSettingsUpdate(action, existingSettings, expectedSettings.build());
}
public void testExecuteSomeExistingSettings() throws Exception {
Map<String, String> includes = randomMap(1, 5);
Map<String, String> excludes = randomMap(1, 5);
Map<String, String> requires = randomMap(1, 5);
Settings.Builder existingSettings = Settings.builder().put("index.version.created", Version.CURRENT.id);
Settings.Builder expectedSettings = Settings.builder();
includes.forEach((k, v) -> {
if (randomBoolean()) {
existingSettings.put(IndexMetaData.INDEX_ROUTING_INCLUDE_GROUP_SETTING.getKey() + k, v);
} else {
expectedSettings.put(IndexMetaData.INDEX_ROUTING_INCLUDE_GROUP_SETTING.getKey() + k, v);
}
});
excludes.forEach((k, v) -> {
if (randomBoolean()) {
existingSettings.put(IndexMetaData.INDEX_ROUTING_EXCLUDE_GROUP_SETTING.getKey() + k, v);
} else {
expectedSettings.put(IndexMetaData.INDEX_ROUTING_EXCLUDE_GROUP_SETTING.getKey() + k, v);
}
});
requires.forEach((k, v) -> {
if (randomBoolean()) {
existingSettings.put(IndexMetaData.INDEX_ROUTING_REQUIRE_GROUP_SETTING.getKey() + k, v);
} else {
expectedSettings.put(IndexMetaData.INDEX_ROUTING_REQUIRE_GROUP_SETTING.getKey() + k, v);
}
});
// make sure there is at least one setting that is missing
if (expectedSettings.keys().isEmpty()) {
String key = randomAlphaOfLengthBetween(1, 20);
String value = randomAlphaOfLengthBetween(1, 20);
includes.put(key, value);
expectedSettings.put(IndexMetaData.INDEX_ROUTING_INCLUDE_GROUP_SETTING.getKey() + key, value);
}
AllocateAction action = new AllocateAction(includes, excludes, requires);
assertSettingsUpdate(action, existingSettings, expectedSettings.build());
}
public void testExecuteSomeExistingSettingsDifferentValue() throws Exception {
Map<String, String> includes = randomMap(1, 5);
Map<String, String> excludes = randomMap(1, 5);
Map<String, String> requires = randomMap(1, 5);
Settings.Builder existingSettings = Settings.builder().put("index.version.created", Version.CURRENT.id);
Settings.Builder expectedSettings = Settings.builder();
includes.forEach((k, v) -> {
if (randomBoolean()) {
existingSettings.put(IndexMetaData.INDEX_ROUTING_INCLUDE_GROUP_SETTING.getKey() + k, v);
} else {
expectedSettings.put(IndexMetaData.INDEX_ROUTING_INCLUDE_GROUP_SETTING.getKey() + k, v);
existingSettings.put(IndexMetaData.INDEX_ROUTING_INCLUDE_GROUP_SETTING.getKey() + k, v + randomAlphaOfLength(4));
}
});
excludes.forEach((k, v) -> {
if (randomBoolean()) {
existingSettings.put(IndexMetaData.INDEX_ROUTING_EXCLUDE_GROUP_SETTING.getKey() + k, v);
} else {
expectedSettings.put(IndexMetaData.INDEX_ROUTING_EXCLUDE_GROUP_SETTING.getKey() + k, v);
existingSettings.put(IndexMetaData.INDEX_ROUTING_EXCLUDE_GROUP_SETTING.getKey() + k, v + randomAlphaOfLength(4));
}
});
requires.forEach((k, v) -> {
if (randomBoolean()) {
existingSettings.put(IndexMetaData.INDEX_ROUTING_REQUIRE_GROUP_SETTING.getKey() + k, v);
} else {
expectedSettings.put(IndexMetaData.INDEX_ROUTING_REQUIRE_GROUP_SETTING.getKey() + k, v);
existingSettings.put(IndexMetaData.INDEX_ROUTING_REQUIRE_GROUP_SETTING.getKey() + k, v + randomAlphaOfLength(4));
}
});
// make sure there is at least one setting that is different
if (expectedSettings.keys().isEmpty()) {
String key = randomAlphaOfLengthBetween(1, 20);
String value = randomAlphaOfLengthBetween(1, 20);
includes.put(key, value);
expectedSettings.put(IndexMetaData.INDEX_ROUTING_INCLUDE_GROUP_SETTING.getKey() + key, value);
existingSettings.put(IndexMetaData.INDEX_ROUTING_INCLUDE_GROUP_SETTING.getKey() + key, value + randomAlphaOfLength(4));
}
AllocateAction action = new AllocateAction(includes, excludes, requires);
assertSettingsUpdate(action, existingSettings, expectedSettings.build());
}
public void testExecuteUpdateSettingsFail() throws Exception {
Settings expectedSettings = Settings.builder().put(IndexMetaData.INDEX_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "box_type", "foo")
.put(IndexMetaData.INDEX_ROUTING_EXCLUDE_GROUP_SETTING.getKey() + "box_type", "bar")
.put(IndexMetaData.INDEX_ROUTING_REQUIRE_GROUP_SETTING.getKey() + "box_type", "baz").build();
IndexMetaData indexMetadata = IndexMetaData.builder(randomAlphaOfLengthBetween(1, 20))
.settings(Settings.builder().put("index.version.created", Version.CURRENT.id)).numberOfShards(randomIntBetween(1, 5))
.numberOfReplicas(randomIntBetween(0, 5)).build();
Index index = indexMetadata.getIndex();
ImmutableOpenMap.Builder<String, IndexMetaData> indices = ImmutableOpenMap.<String, IndexMetaData> builder().fPut(index.getName(),
indexMetadata);
ClusterState clusterState = ClusterState.builder(ClusterState.EMPTY_STATE).metaData(MetaData.builder().indices(indices.build()))
.routingTable(RoutingTable.builder()
.add(IndexRoutingTable.builder(index).addShard(
TestShardRouting.newShardRouting(new ShardId(index, 0), "node1", true, ShardRoutingState.STARTED)))
.build())
.build();
Exception exception = new RuntimeException();
BiConsumer<Settings, Listener> settingsUpdater = (s, l) -> {
assertEquals(expectedSettings, s);
l.onFailure(exception);
};
Map<String, String> includes = new HashMap<>();
includes.put("box_type", "foo");
Map<String, String> excludes = new HashMap<>();
excludes.put("box_type", "bar");
Map<String, String> requires = new HashMap<>();
requires.put("box_type", "baz");
AllocateAction action = new AllocateAction(includes, excludes, requires);
RuntimeException thrownException = expectActionFailure(index, clusterState, null, action, settingsUpdater, RuntimeException.class);
assertSame(exception, thrownException);
}
public void testExecuteAllocateComplete() throws Exception {
Index index = new Index(randomAlphaOfLengthBetween(1, 20), randomAlphaOfLengthBetween(1, 20));
Map<String, String> includes = randomMap(1, 5);
Map<String, String> excludes = randomMap(1, 5);
Map<String, String> requires = randomMap(1, 5);
Settings.Builder existingSettings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT.id)
.put(IndexMetaData.SETTING_INDEX_UUID, index.getUUID());
Settings.Builder expectedSettings = Settings.builder();
Settings.Builder node1Settings = Settings.builder();
Settings.Builder node2Settings = Settings.builder();
includes.forEach((k, v) -> {
existingSettings.put(IndexMetaData.INDEX_ROUTING_INCLUDE_GROUP_SETTING.getKey() + k, v);
expectedSettings.put(IndexMetaData.INDEX_ROUTING_INCLUDE_GROUP_SETTING.getKey() + k, v);
node1Settings.put(Node.NODE_ATTRIBUTES.getKey() + k, v);
});
excludes.forEach((k, v) -> {
existingSettings.put(IndexMetaData.INDEX_ROUTING_EXCLUDE_GROUP_SETTING.getKey() + k, v);
expectedSettings.put(IndexMetaData.INDEX_ROUTING_EXCLUDE_GROUP_SETTING.getKey() + k, v);
});
requires.forEach((k, v) -> {
existingSettings.put(IndexMetaData.INDEX_ROUTING_REQUIRE_GROUP_SETTING.getKey() + k, v);
expectedSettings.put(IndexMetaData.INDEX_ROUTING_REQUIRE_GROUP_SETTING.getKey() + k, v);
node1Settings.put(Node.NODE_ATTRIBUTES.getKey() + k, v);
});
IndexRoutingTable.Builder indexRoutingTable = IndexRoutingTable.builder(index)
.addShard(TestShardRouting.newShardRouting(new ShardId(index, 0), "node1", true, ShardRoutingState.STARTED));
AllocateAction action = new AllocateAction(includes, excludes, requires);
assertAllocateStatus(index, 1, 0, action, existingSettings, node1Settings, node2Settings, indexRoutingTable,
true);
}
public void testExecuteAllocateNotComplete() throws Exception {
Index index = new Index(randomAlphaOfLengthBetween(1, 20), randomAlphaOfLengthBetween(1, 20));
Map<String, String> includes = randomMap(1, 5);
Map<String, String> excludes = randomMap(1, 5);
Map<String, String> requires = randomMap(1, 5);
Settings.Builder existingSettings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT.id)
.put(IndexMetaData.SETTING_INDEX_UUID, index.getUUID());
Settings.Builder expectedSettings = Settings.builder();
Settings.Builder node1Settings = Settings.builder();
Settings.Builder node2Settings = Settings.builder();
includes.forEach((k, v) -> {
existingSettings.put(IndexMetaData.INDEX_ROUTING_INCLUDE_GROUP_SETTING.getKey() + k, v);
expectedSettings.put(IndexMetaData.INDEX_ROUTING_INCLUDE_GROUP_SETTING.getKey() + k, v);
node1Settings.put(Node.NODE_ATTRIBUTES.getKey() + k, v);
});
excludes.forEach((k, v) -> {
existingSettings.put(IndexMetaData.INDEX_ROUTING_EXCLUDE_GROUP_SETTING.getKey() + k, v);
expectedSettings.put(IndexMetaData.INDEX_ROUTING_EXCLUDE_GROUP_SETTING.getKey() + k, v);
});
requires.forEach((k, v) -> {
existingSettings.put(IndexMetaData.INDEX_ROUTING_REQUIRE_GROUP_SETTING.getKey() + k, v);
expectedSettings.put(IndexMetaData.INDEX_ROUTING_REQUIRE_GROUP_SETTING.getKey() + k, v);
node1Settings.put(Node.NODE_ATTRIBUTES.getKey() + k, v);
});
IndexRoutingTable.Builder indexRoutingTable = IndexRoutingTable.builder(index)
.addShard(TestShardRouting.newShardRouting(new ShardId(index, 0), "node1", true, ShardRoutingState.STARTED))
.addShard(TestShardRouting.newShardRouting(new ShardId(index, 1), "node2", true, ShardRoutingState.STARTED));
AllocateAction action = new AllocateAction(includes, excludes, requires);
assertAllocateStatus(index, 2, 0, action, existingSettings, node1Settings, node2Settings, indexRoutingTable,
false);
}
public void testExecuteAllocateUnassigned() throws Exception {
Index index = new Index(randomAlphaOfLengthBetween(1, 20), randomAlphaOfLengthBetween(1, 20));
Map<String, String> includes = randomMap(1, 5);
Map<String, String> excludes = randomMap(1, 5);
Map<String, String> requires = randomMap(1, 5);
Settings.Builder existingSettings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT.id)
.put(IndexMetaData.SETTING_INDEX_UUID, index.getUUID());
Settings.Builder expectedSettings = Settings.builder();
Settings.Builder node1Settings = Settings.builder();
Settings.Builder node2Settings = Settings.builder();
includes.forEach((k, v) -> {
existingSettings.put(IndexMetaData.INDEX_ROUTING_INCLUDE_GROUP_SETTING.getKey() + k, v);
expectedSettings.put(IndexMetaData.INDEX_ROUTING_INCLUDE_GROUP_SETTING.getKey() + k, v);
node1Settings.put(Node.NODE_ATTRIBUTES.getKey() + k, v);
});
excludes.forEach((k, v) -> {
existingSettings.put(IndexMetaData.INDEX_ROUTING_EXCLUDE_GROUP_SETTING.getKey() + k, v);
expectedSettings.put(IndexMetaData.INDEX_ROUTING_EXCLUDE_GROUP_SETTING.getKey() + k, v);
});
requires.forEach((k, v) -> {
existingSettings.put(IndexMetaData.INDEX_ROUTING_REQUIRE_GROUP_SETTING.getKey() + k, v);
expectedSettings.put(IndexMetaData.INDEX_ROUTING_REQUIRE_GROUP_SETTING.getKey() + k, v);
node1Settings.put(Node.NODE_ATTRIBUTES.getKey() + k, v);
});
IndexRoutingTable.Builder indexRoutingTable = IndexRoutingTable.builder(index)
.addShard(TestShardRouting.newShardRouting(new ShardId(index, 0), "node1", true, ShardRoutingState.STARTED))
.addShard(TestShardRouting.newShardRouting(new ShardId(index, 1), null, null, true, ShardRoutingState.UNASSIGNED,
new UnassignedInfo(randomFrom(Reason.values()), "the shard is intentionally unassigned")));
AllocateAction action = new AllocateAction(includes, excludes, requires);
assertAllocateStatus(index, 2, 0, action, existingSettings, node1Settings, node2Settings, indexRoutingTable,
false);
}
public void testExecuteIndexMissing() throws Exception {
Index index = new Index(randomAlphaOfLengthBetween(1, 20), randomAlphaOfLengthBetween(1, 20));
ClusterState clusterState = ClusterState.builder(ClusterState.EMPTY_STATE).build();
BiConsumer<Settings, Listener> settingsUpdater = (s, l) -> {
throw new AssertionError("Unexpected settings update");
};
Map<String, String> includes = new HashMap<>();
includes.put("box_type", "foo");
Map<String, String> excludes = new HashMap<>();
excludes.put("box_type", "bar");
Map<String, String> requires = new HashMap<>();
requires.put("box_type", "baz");
AllocateAction action = new AllocateAction(includes, excludes, requires);
IndexNotFoundException thrownException = expectActionFailure(index, clusterState, null, action, settingsUpdater,
IndexNotFoundException.class);
assertEquals("Index not found when executing " + AllocateAction.NAME + " lifecycle action.", thrownException.getMessage());
assertEquals(index.getName(), thrownException.getIndex().getName());
}
private void assertSettingsUpdate(AllocateAction action, Settings.Builder existingSettings, Settings expectedSettings) {
IndexMetaData indexMetadata = IndexMetaData.builder(randomAlphaOfLengthBetween(1, 20)).settings(existingSettings)
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
Index index = indexMetadata.getIndex();
ImmutableOpenMap.Builder<String, IndexMetaData> indices = ImmutableOpenMap.<String, IndexMetaData> builder().fPut(index.getName(),
indexMetadata);
ClusterState clusterState = ClusterState.builder(ClusterState.EMPTY_STATE).metaData(MetaData.builder().indices(indices.build()))
.routingTable(RoutingTable.builder()
.add(IndexRoutingTable.builder(index).addShard(
TestShardRouting.newShardRouting(new ShardId(index, 0), "node1", true, ShardRoutingState.STARTED)))
.build())
.build();
BiConsumer<Settings, Listener> settingsUpdater = (s, l) -> {
assertEquals(expectedSettings, s);
l.onSuccess(false);
};
assertActionStatus(index, clusterState, null, action, settingsUpdater, false);
}
private void assertAllocateStatus(Index index, int shards, int replicas, AllocateAction action, Settings.Builder existingSettings,
Settings.Builder node1Settings, Settings.Builder node2Settings, IndexRoutingTable.Builder indexRoutingTable,
boolean expectComplete) {
IndexMetaData indexMetadata = IndexMetaData.builder(index.getName()).settings(existingSettings).numberOfShards(shards)
.numberOfReplicas(replicas).build();
ImmutableOpenMap.Builder<String, IndexMetaData> indices = ImmutableOpenMap.<String, IndexMetaData> builder().fPut(index.getName(),
indexMetadata);
ClusterState clusterState = ClusterState.builder(ClusterState.EMPTY_STATE).metaData(MetaData.builder().indices(indices.build()))
.nodes(DiscoveryNodes.builder()
.add(DiscoveryNode.createLocal(node1Settings.build(), new TransportAddress(TransportAddress.META_ADDRESS, 9200),
"node1"))
.add(DiscoveryNode.createLocal(node2Settings.build(), new TransportAddress(TransportAddress.META_ADDRESS, 9201),
"node2")))
.routingTable(RoutingTable.builder().add(indexRoutingTable).build()).build();
ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY,
Sets.newHashSet(FilterAllocationDecider.CLUSTER_ROUTING_INCLUDE_GROUP_SETTING,
FilterAllocationDecider.CLUSTER_ROUTING_EXCLUDE_GROUP_SETTING,
FilterAllocationDecider.CLUSTER_ROUTING_REQUIRE_GROUP_SETTING));
BiConsumer<Settings, Listener> settingsUpdater = (s, l) -> {
throw new AssertionError("Unexpected settings update");
};
assertActionStatus(index, clusterState, clusterSettings, action, settingsUpdater, expectComplete);
}
private void assertActionStatus(Index index, ClusterState clusterState, ClusterSettings clusterSettings, AllocateAction action,
BiConsumer<Settings, Listener> settingsUpdater, boolean expectComplete) {
SetOnce<Boolean> actionCompleted = new SetOnce<>();
action.execute(index, settingsUpdater, clusterState, clusterSettings, new Listener() {
@Override
public void onSuccess(boolean completed) {
actionCompleted.set(completed);
}
@Override
public void onFailure(Exception e) {
throw new AssertionError("Unexpected method call");
}
});
assertEquals(expectComplete, actionCompleted.get());
}
private <E> E expectActionFailure(Index index, ClusterState clusterState, ClusterSettings clusterSettings, AllocateAction action,
BiConsumer<Settings, Listener> settingsUpdater, Class<E> expectedExceptionType) {
SetOnce<E> exceptionThrown = new SetOnce<>();
action.execute(index, settingsUpdater, clusterState, clusterSettings, new Listener() {
@Override
public void onSuccess(boolean completed) {
throw new AssertionError("Unexpected method call");
}
@SuppressWarnings("unchecked")
@Override
public void onFailure(Exception e) {
assertThat(e, instanceOf(expectedExceptionType));
exceptionThrown.set((E) e);
}
});
return exceptionThrown.get();
}
private Map<String, String> randomMap(int minEntries, int maxEntries) {
Map<String, String> map = new HashMap<>();
int numIncludes = randomIntBetween(minEntries, maxEntries);
for (int i = 0; i < numIncludes; i++) {
map.put(randomAlphaOfLengthBetween(1, 20), randomAlphaOfLengthBetween(1, 20));
}
return map;
}
}

View File

@ -28,7 +28,7 @@ import static org.hamcrest.Matchers.not;
public class TimeseriesLifecycleTypeTests extends ESTestCase {
private static final AllocateAction TEST_ALLOCATE_ACTION = new AllocateAction();
private static final AllocateAction TEST_ALLOCATE_ACTION = new AllocateAction(null, null, null);
private static final DeleteAction TEST_DELETE_ACTION = new DeleteAction();
private static final ForceMergeAction TEST_FORCE_MERGE_ACTION = new ForceMergeAction(1);
private static final ReplicasAction TEST_REPLICAS_ACTION = new ReplicasAction(1);