diff --git a/docs/reference/modules/cluster/misc.asciidoc b/docs/reference/modules/cluster/misc.asciidoc index 3963312c0f4..4edcd34e00f 100644 --- a/docs/reference/modules/cluster/misc.asciidoc +++ b/docs/reference/modules/cluster/misc.asciidoc @@ -56,3 +56,29 @@ PUT /_cluster/settings } ------------------------------- // CONSOLE + + +[[persistent-tasks-allocation]] +==== Persistent Tasks Allocations + +Plugins can create a kind of tasks called persistent tasks. Those tasks are +usually long-live tasks and are stored in the cluster state, allowing the +tasks to be revived after a full cluster restart. + +Every time a persistent task is created, the master nodes takes care of +assigning the task to a node of the cluster, and the assigned node will then +pick up the task and execute it locally. The process of assigning persistent +tasks to nodes is controlled by the following property, which can be updated +dynamically: + +`cluster.persistent_tasks.allocation.enable`:: ++ +-- +Enable or disable allocation for persistent tasks: + +* `all` - (default) Allows persistent tasks to be assigned to nodes +* `none` - No allocations are allowed for any type of persistent task + +This setting does not affect the persistent tasks that are already being executed. +Only newly created persistent tasks, or tasks that must be reassigned (after a node +left the cluster, for example), are impacted by this setting. diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index 804340d63ed..bcfed3388e9 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -79,6 +79,7 @@ import org.elasticsearch.monitor.jvm.JvmService; import org.elasticsearch.monitor.os.OsService; import org.elasticsearch.monitor.process.ProcessService; import org.elasticsearch.node.Node; +import org.elasticsearch.persistent.decider.EnableAssignmentDecider; import org.elasticsearch.plugins.PluginsService; import org.elasticsearch.repositories.fs.FsRepository; import org.elasticsearch.rest.BaseRestHandler; @@ -420,6 +421,7 @@ public final class ClusterSettings extends AbstractScopedSettings { FastVectorHighlighter.SETTING_TV_HIGHLIGHT_MULTI_VALUE, Node.BREAKER_TYPE_KEY, OperationRouting.USE_ADAPTIVE_REPLICA_SELECTION_SETTING, - IndexGraveyard.SETTING_MAX_TOMBSTONES + IndexGraveyard.SETTING_MAX_TOMBSTONES, + EnableAssignmentDecider.CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING ))); } diff --git a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java index 9e064c3d209..cf44556ee5d 100644 --- a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java +++ b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java @@ -34,6 +34,8 @@ import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Assignment; import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; +import org.elasticsearch.persistent.decider.AssignmentDecision; +import org.elasticsearch.persistent.decider.EnableAssignmentDecider; import org.elasticsearch.tasks.Task; import java.util.Objects; @@ -45,12 +47,14 @@ public class PersistentTasksClusterService extends AbstractComponent implements private final ClusterService clusterService; private final PersistentTasksExecutorRegistry registry; + private final EnableAssignmentDecider decider; public PersistentTasksClusterService(Settings settings, PersistentTasksExecutorRegistry registry, ClusterService clusterService) { super(settings); this.clusterService = clusterService; clusterService.addListener(this); this.registry = registry; + this.decider = new EnableAssignmentDecider(settings, clusterService.getClusterSettings()); } /** @@ -224,6 +228,12 @@ public class PersistentTasksClusterService extends AbstractComponent implements final @Nullable Params taskParams, final ClusterState currentState) { PersistentTasksExecutor persistentTasksExecutor = registry.getPersistentTaskExecutorSafe(taskName); + + AssignmentDecision decision = decider.canAssign(); + if (decision.getType() == AssignmentDecision.Type.NO) { + return new Assignment(null, "persistent task [" + taskName + "] cannot be assigned [" + decision.getReason() + "]"); + } + return persistentTasksExecutor.getAssignment(taskParams, currentState); } @@ -249,7 +259,8 @@ public class PersistentTasksClusterService extends AbstractComponent implements /** * Returns true if the cluster state change(s) require to reassign some persistent tasks. It can happen in the following - * situations: a node left or is added, the routing table changed, the master node changed or the persistent tasks changed. + * situations: a node left or is added, the routing table changed, the master node changed, the metadata changed or the + * persistent tasks changed. */ boolean shouldReassignPersistentTasks(final ClusterChangedEvent event) { final PersistentTasksCustomMetaData tasks = event.state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE); @@ -259,7 +270,12 @@ public class PersistentTasksClusterService extends AbstractComponent implements boolean masterChanged = event.previousState().nodes().isLocalNodeElectedMaster() == false; - if (persistentTasksChanged(event) || event.nodesChanged() || event.routingTableChanged() || masterChanged) { + if (persistentTasksChanged(event) + || event.nodesChanged() + || event.routingTableChanged() + || event.metaDataChanged() + || masterChanged) { + for (PersistentTask task : tasks.tasks()) { if (needsReassignment(task.getAssignment(), event.state().nodes())) { Assignment assignment = createAssignment(task.getTaskName(), task.getParams(), event.state()); diff --git a/server/src/main/java/org/elasticsearch/persistent/decider/AssignmentDecision.java b/server/src/main/java/org/elasticsearch/persistent/decider/AssignmentDecision.java new file mode 100644 index 00000000000..eb8f851a68d --- /dev/null +++ b/server/src/main/java/org/elasticsearch/persistent/decider/AssignmentDecision.java @@ -0,0 +1,72 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.persistent.decider; + +import java.util.Locale; +import java.util.Objects; + +/** + * {@link AssignmentDecision} represents the decision made during the process of + * assigning a persistent task to a node of the cluster. + * + * @see EnableAssignmentDecider + */ +public final class AssignmentDecision { + + public static final AssignmentDecision YES = new AssignmentDecision(Type.YES, ""); + + private final Type type; + private final String reason; + + public AssignmentDecision(final Type type, final String reason) { + this.type = Objects.requireNonNull(type); + this.reason = Objects.requireNonNull(reason); + } + + public Type getType() { + return type; + } + + public String getReason() { + return reason; + } + + @Override + public String toString() { + return "assignment decision [type=" + type + ", reason=" + reason + "]"; + } + + public enum Type { + NO(0), YES(1); + + private final int id; + + Type(int id) { + this.id = id; + } + + public int getId() { + return id; + } + + public static Type resolve(final String s) { + return Type.valueOf(s.toUpperCase(Locale.ROOT)); + } + } +} diff --git a/server/src/main/java/org/elasticsearch/persistent/decider/EnableAssignmentDecider.java b/server/src/main/java/org/elasticsearch/persistent/decider/EnableAssignmentDecider.java new file mode 100644 index 00000000000..525e1379a40 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/persistent/decider/EnableAssignmentDecider.java @@ -0,0 +1,101 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.persistent.decider; + +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Settings; + +import java.util.Locale; + +import static org.elasticsearch.common.settings.Setting.Property.Dynamic; +import static org.elasticsearch.common.settings.Setting.Property.NodeScope; + +/** + * {@link EnableAssignmentDecider} is used to allow/disallow the persistent tasks + * to be assigned to cluster nodes. + *

+ * Allocation settings can have the following values (non-casesensitive): + *

+ * + * @see Allocation + */ +public class EnableAssignmentDecider { + + public static final Setting CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING = + new Setting<>("cluster.persistent_tasks.allocation.enable", Allocation.ALL.toString(), Allocation::fromString, Dynamic, NodeScope); + + private volatile Allocation enableAssignment; + + public EnableAssignmentDecider(final Settings settings, final ClusterSettings clusterSettings) { + this.enableAssignment = CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING.get(settings); + clusterSettings.addSettingsUpdateConsumer(CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING, this::setEnableAssignment); + } + + public void setEnableAssignment(final Allocation enableAssignment) { + this.enableAssignment = enableAssignment; + } + + /** + * Returns a {@link AssignmentDecision} whether the given persistent task can be assigned + * to a node of the cluster. The decision depends on the current value of the setting + * {@link EnableAssignmentDecider#CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING}. + * + * @return the {@link AssignmentDecision} + */ + public AssignmentDecision canAssign() { + if (enableAssignment == Allocation.NONE) { + return new AssignmentDecision(AssignmentDecision.Type.NO, "no persistent task assignments are allowed due to cluster settings"); + } + return AssignmentDecision.YES; + } + + /** + * Allocation values or rather their string representation to be used used with + * {@link EnableAssignmentDecider#CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING} + * via cluster settings. + */ + public enum Allocation { + + NONE, + ALL; + + public static Allocation fromString(final String strValue) { + if (strValue == null) { + return null; + } else { + String value = strValue.toUpperCase(Locale.ROOT); + try { + return valueOf(value); + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException("Illegal value [" + value + "] for [" + + CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING.getKey() + "]"); + } + } + } + + @Override + public String toString() { + return name().toLowerCase(Locale.ROOT); + } + } +} diff --git a/server/src/main/java/org/elasticsearch/persistent/package-info.java b/server/src/main/java/org/elasticsearch/persistent/package-info.java index f948e3ace44..3e71716e606 100644 --- a/server/src/main/java/org/elasticsearch/persistent/package-info.java +++ b/server/src/main/java/org/elasticsearch/persistent/package-info.java @@ -30,7 +30,7 @@ * task. *

* 2. The master node updates the {@link org.elasticsearch.persistent.PersistentTasksCustomMetaData} in the cluster state to indicate - * that there is a new persistent task is running in the system. + * that there is a new persistent task running in the system. *

* 3. The {@link org.elasticsearch.persistent.PersistentTasksNodeService} running on every node in the cluster monitors changes in * the cluster state and starts execution of all new tasks assigned to the node it is running on. diff --git a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksClusterServiceTests.java b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksClusterServiceTests.java index e470c5028aa..916fdee2136 100644 --- a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksClusterServiceTests.java +++ b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksClusterServiceTests.java @@ -36,9 +36,16 @@ import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Assignment; import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestParams; import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestPersistentTasksExecutor; +import org.elasticsearch.persistent.decider.EnableAssignmentDecider; import org.elasticsearch.tasks.Task; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.VersionUtils; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; import java.util.ArrayList; import java.util.Arrays; @@ -52,14 +59,41 @@ import static java.util.Collections.singleton; import static org.elasticsearch.persistent.PersistentTasksClusterService.needsReassignment; import static org.elasticsearch.persistent.PersistentTasksClusterService.persistentTasksChanged; import static org.elasticsearch.persistent.PersistentTasksExecutor.NO_NODE_FOUND; +import static org.elasticsearch.test.ClusterServiceUtils.createClusterService; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; -import static org.mockito.Mockito.mock; public class PersistentTasksClusterServiceTests extends ESTestCase { + /** Needed by {@link ClusterService} **/ + private static ThreadPool threadPool; + /** Needed by {@link PersistentTasksClusterService} **/ + private ClusterService clusterService; + + @BeforeClass + public static void setUpThreadPool() { + threadPool = new TestThreadPool(PersistentTasksClusterServiceTests.class.getSimpleName()); + } + + @Before + public void setUp() throws Exception { + super.setUp(); + clusterService = createClusterService(threadPool); + } + + @AfterClass + public static void tearDownThreadPool() throws Exception { + terminate(threadPool); + } + + @After + public void tearDown() throws Exception { + super.tearDown(); + clusterService.close(); + } + public void testReassignmentRequired() { final PersistentTasksClusterService service = createService((params, clusterState) -> "never_assign".equals(((TestParams) params).getTestParam()) ? NO_NODE_FOUND : randomNodeAssignment(clusterState.nodes()) @@ -81,6 +115,55 @@ public class PersistentTasksClusterServiceTests extends ESTestCase { } } + public void testReassignmentRequiredOnMetadataChanges() { + EnableAssignmentDecider.Allocation allocation = randomFrom(EnableAssignmentDecider.Allocation.values()); + + DiscoveryNodes nodes = DiscoveryNodes.builder() + .add(new DiscoveryNode("_node", buildNewFakeTransportAddress(), Version.CURRENT)) + .localNodeId("_node") + .masterNodeId("_node") + .build(); + + boolean unassigned = randomBoolean(); + PersistentTasksCustomMetaData tasks = PersistentTasksCustomMetaData.builder() + .addTask("_task_1", TestPersistentTasksExecutor.NAME, null, new Assignment(unassigned ? null : "_node", "_reason")) + .build(); + + MetaData metaData = MetaData.builder() + .putCustom(PersistentTasksCustomMetaData.TYPE, tasks) + .persistentSettings(Settings.builder() + .put(EnableAssignmentDecider.CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING.getKey(), allocation.toString()) + .build()) + .build(); + + ClusterState previous = ClusterState.builder(new ClusterName("_name")) + .nodes(nodes) + .metaData(metaData) + .build(); + + ClusterState current; + + final boolean changed = randomBoolean(); + if (changed) { + allocation = randomValueOtherThan(allocation, () -> randomFrom(EnableAssignmentDecider.Allocation.values())); + + current = ClusterState.builder(previous) + .metaData(MetaData.builder(previous.metaData()) + .persistentSettings(Settings.builder() + .put(EnableAssignmentDecider.CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING.getKey(), allocation.toString()) + .build()) + .build()) + .build(); + } else { + current = ClusterState.builder(previous).build(); + } + + final ClusterChangedEvent event = new ClusterChangedEvent("test", current, previous); + + final PersistentTasksClusterService service = createService((params, clusterState) -> randomNodeAssignment(clusterState.nodes())); + assertThat(dumpEvent(event), service.shouldReassignPersistentTasks(event), equalTo(changed && unassigned)); + } + public void testReassignTasksWithNoTasks() { ClusterState clusterState = initialState(); assertThat(reassign(clusterState).metaData().custom(PersistentTasksCustomMetaData.TYPE), nullValue()); @@ -527,7 +610,6 @@ public class PersistentTasksClusterServiceTests extends ESTestCase { Version.CURRENT); } - private ClusterState initialState() { MetaData.Builder metaData = MetaData.builder(); RoutingTable.Builder routingTable = RoutingTable.builder(); @@ -558,7 +640,7 @@ public class PersistentTasksClusterServiceTests extends ESTestCase { } /** Creates a PersistentTasksClusterService with a single PersistentTasksExecutor implemented by a BiFunction **/ - static

PersistentTasksClusterService createService(final BiFunction fn) { + private

PersistentTasksClusterService createService(final BiFunction fn) { PersistentTasksExecutorRegistry registry = new PersistentTasksExecutorRegistry(Settings.EMPTY, singleton(new PersistentTasksExecutor

(Settings.EMPTY, TestPersistentTasksExecutor.NAME, null) { @Override @@ -571,6 +653,6 @@ public class PersistentTasksClusterServiceTests extends ESTestCase { throw new UnsupportedOperationException(); } })); - return new PersistentTasksClusterService(Settings.EMPTY, registry, mock(ClusterService.class)); + return new PersistentTasksClusterService(Settings.EMPTY, registry, clusterService); } } diff --git a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksDecidersTestCase.java b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksDecidersTestCase.java new file mode 100644 index 00000000000..356e518198c --- /dev/null +++ b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksDecidersTestCase.java @@ -0,0 +1,134 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.persistent; + +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; + +import java.util.function.Predicate; + +import static java.util.Collections.emptyList; +import static org.elasticsearch.test.ClusterServiceUtils.createClusterService; + +public abstract class PersistentTasksDecidersTestCase extends ESTestCase { + + /** Needed by {@link ClusterService} **/ + private static ThreadPool threadPool; + /** Needed by {@link PersistentTasksClusterService} **/ + private ClusterService clusterService; + + private PersistentTasksClusterService persistentTasksClusterService; + + @BeforeClass + public static void setUpThreadPool() { + threadPool = new TestThreadPool(getTestClass().getSimpleName()); + } + + @Before + public void setUp() throws Exception { + super.setUp(); + clusterService = createClusterService(threadPool); + PersistentTasksExecutorRegistry registry = new PersistentTasksExecutorRegistry(clusterService.getSettings(), emptyList()) { + @Override + public PersistentTasksExecutor getPersistentTaskExecutorSafe(String taskName) { + return new PersistentTasksExecutor(clusterService.getSettings(), taskName, null) { + @Override + protected void nodeOperation(AllocatedPersistentTask task, Params params, Task.Status status) { + logger.debug("Executing task {}", task); + } + }; + } + }; + persistentTasksClusterService = new PersistentTasksClusterService(clusterService.getSettings(), registry, clusterService); + } + + @AfterClass + public static void tearDownThreadPool() throws Exception { + terminate(threadPool); + } + + @After + public void tearDown() throws Exception { + super.tearDown(); + clusterService.close(); + } + + protected ClusterState reassign(final ClusterState clusterState) { + return persistentTasksClusterService.reassignTasks(clusterState); + } + + protected void updateSettings(final Settings settings) { + ClusterSettings clusterSettings = clusterService.getClusterSettings(); + Settings.Builder updated = Settings.builder(); + clusterSettings.updateDynamicSettings(settings, updated, Settings.builder(), getTestClass().getName()); + clusterSettings.applySettings(updated.build()); + } + + protected static ClusterState createClusterStateWithTasks(final int nbNodes, final int nbTasks) { + DiscoveryNodes.Builder nodes = DiscoveryNodes.builder(); + for (int i = 0; i < nbNodes; i++) { + nodes.add(new DiscoveryNode("_node_" + i, buildNewFakeTransportAddress(), Version.CURRENT)); + } + + PersistentTasksCustomMetaData.Builder tasks = PersistentTasksCustomMetaData.builder(); + for (int i = 0; i < nbTasks; i++) { + tasks.addTask("_task_" + i, "test", null, new PersistentTasksCustomMetaData.Assignment(null, "initialized")); + } + + MetaData metaData = MetaData.builder() + .putCustom(PersistentTasksCustomMetaData.TYPE, tasks.build()) + .build(); + + return ClusterState.builder(ClusterName.DEFAULT).nodes(nodes).metaData(metaData).build(); + } + + /** Asserts that the given cluster state contains nbTasks tasks that are assigned **/ + protected static void assertNbAssignedTasks(final long nbTasks, final ClusterState clusterState) { + assertPersistentTasks(nbTasks, clusterState, PersistentTasksCustomMetaData.PersistentTask::isAssigned); + } + + /** Asserts that the given cluster state contains nbTasks tasks that are NOT assigned **/ + protected static void assertNbUnassignedTasks(final long nbTasks, final ClusterState clusterState) { + assertPersistentTasks(nbTasks, clusterState, task -> task.isAssigned() == false); + } + + /** Asserts that the cluster state contains nbTasks tasks that verify the given predicate **/ + protected static void assertPersistentTasks(final long nbTasks, + final ClusterState clusterState, + final Predicate predicate) { + PersistentTasksCustomMetaData tasks = clusterState.metaData().custom(PersistentTasksCustomMetaData.TYPE); + assertNotNull("Persistent tasks must be not null", tasks); + assertEquals(nbTasks, tasks.tasks().stream().filter(predicate).count()); + } +} diff --git a/server/src/test/java/org/elasticsearch/persistent/decider/AssignmentDecisionTests.java b/server/src/test/java/org/elasticsearch/persistent/decider/AssignmentDecisionTests.java new file mode 100644 index 00000000000..3fa580e726a --- /dev/null +++ b/server/src/test/java/org/elasticsearch/persistent/decider/AssignmentDecisionTests.java @@ -0,0 +1,33 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.persistent.decider; + +import org.elasticsearch.test.ESTestCase; + +public class AssignmentDecisionTests extends ESTestCase { + + public void testConstantsTypes() { + assertEquals(AssignmentDecision.Type.YES, AssignmentDecision.YES.getType()); + } + + public void testResolveFromType() { + final AssignmentDecision.Type expected = randomFrom(AssignmentDecision.Type.values()); + assertEquals(expected, AssignmentDecision.Type.resolve(expected.toString())); + } +} diff --git a/server/src/test/java/org/elasticsearch/persistent/decider/EnableAssignmentDeciderIT.java b/server/src/test/java/org/elasticsearch/persistent/decider/EnableAssignmentDeciderIT.java new file mode 100644 index 00000000000..15d12fb1ce9 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/persistent/decider/EnableAssignmentDeciderIT.java @@ -0,0 +1,173 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.persistent.decider; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; +import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.persistent.PersistentTaskParams; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; +import org.elasticsearch.persistent.PersistentTasksService; +import org.elasticsearch.persistent.TestPersistentTasksPlugin; +import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestParams; +import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestPersistentTasksExecutor; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.test.ESIntegTestCase; + +import java.util.Collection; +import java.util.concurrent.CountDownLatch; + +import static java.util.Collections.singletonList; +import static org.elasticsearch.persistent.decider.EnableAssignmentDecider.Allocation; +import static org.elasticsearch.persistent.decider.EnableAssignmentDecider.CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.equalTo; + +@ESIntegTestCase.ClusterScope(minNumDataNodes = 1) +public class EnableAssignmentDeciderIT extends ESIntegTestCase { + + @Override + protected Collection> nodePlugins() { + return singletonList(TestPersistentTasksPlugin.class); + } + + @Override + protected Collection> transportClientPlugins() { + return nodePlugins(); + } + + @Override + protected boolean ignoreExternalCluster() { + return true; + } + + /** + * Test that the {@link EnableAssignmentDecider#CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING} setting correctly + * prevents persistent tasks to be assigned after a cluster restart. + */ + public void testEnableAssignmentAfterRestart() throws Exception { + final int numberOfTasks = randomIntBetween(1, 10); + logger.trace("creating {} persistent tasks", numberOfTasks); + + final CountDownLatch latch = new CountDownLatch(numberOfTasks); + for (int i = 0; i < numberOfTasks; i++) { + PersistentTasksService service = internalCluster().getInstance(PersistentTasksService.class); + service.startPersistentTask("task_" + i, TestPersistentTasksExecutor.NAME, randomTaskParams(), + new ActionListener>() { + @Override + public void onResponse(PersistentTask task) { + latch.countDown(); + } + + @Override + public void onFailure(Exception e) { + latch.countDown(); + } + }); + } + latch.await(); + + ClusterService clusterService = internalCluster().clusterService(internalCluster().getMasterName()); + PersistentTasksCustomMetaData tasks = clusterService.state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE); + assertEquals(numberOfTasks, tasks.tasks().stream().filter(t -> TestPersistentTasksExecutor.NAME.equals(t.getTaskName())).count()); + + logger.trace("waiting for the tasks to be running"); + assertBusy(() -> { + ListTasksResponse listTasks = client().admin().cluster().prepareListTasks() + .setActions(TestPersistentTasksExecutor.NAME + "[c]") + .get(); + assertThat(listTasks.getTasks().size(), equalTo(numberOfTasks)); + }); + + try { + logger.trace("disable persistent tasks assignment"); + disablePersistentTasksAssignment(); + + logger.trace("restart the cluster"); + internalCluster().fullRestart(); + ensureYellow(); + + logger.trace("persistent tasks assignment is still disabled"); + assertEnableAssignmentSetting(Allocation.NONE); + + logger.trace("persistent tasks are not assigned"); + tasks = internalCluster().clusterService().state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE); + assertEquals(numberOfTasks, tasks.tasks().stream() + .filter(t -> TestPersistentTasksExecutor.NAME.equals(t.getTaskName())) + .filter(t -> t.isAssigned() == false) + .count()); + + ListTasksResponse runningTasks = client().admin().cluster().prepareListTasks() + .setActions(TestPersistentTasksExecutor.NAME + "[c]") + .get(); + assertThat(runningTasks.getTasks().size(), equalTo(0)); + + logger.trace("enable persistent tasks assignment"); + if (randomBoolean()) { + enablePersistentTasksAssignment(); + } else { + resetPersistentTasksAssignment(); + } + + assertBusy(() -> { + ListTasksResponse listTasks = client().admin().cluster().prepareListTasks() + .setActions(TestPersistentTasksExecutor.NAME + "[c]") + .get(); + assertThat(listTasks.getTasks().size(), equalTo(numberOfTasks)); + }); + + } finally { + resetPersistentTasksAssignment(); + } + } + + private void assertEnableAssignmentSetting(final Allocation expected) { + ClusterStateResponse clusterStateResponse = client().admin().cluster().prepareState().clear().setMetaData(true).get(); + Settings settings = clusterStateResponse.getState().getMetaData().settings(); + + String value = settings.get(CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING.getKey()); + assertThat(Allocation.fromString(value), equalTo(expected)); + } + + private void disablePersistentTasksAssignment() { + Settings.Builder settings = Settings.builder().put(CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING.getKey(), Allocation.NONE); + assertAcked(client().admin().cluster().prepareUpdateSettings().setPersistentSettings(settings)); + } + + private void enablePersistentTasksAssignment() { + Settings.Builder settings = Settings.builder().put(CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING.getKey(), Allocation.ALL); + assertAcked(client().admin().cluster().prepareUpdateSettings().setPersistentSettings(settings)); + } + + private void resetPersistentTasksAssignment() { + Settings.Builder settings = Settings.builder().putNull(CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING.getKey()); + assertAcked(client().admin().cluster().prepareUpdateSettings().setPersistentSettings(settings)); + } + + /** Returns a random task parameter **/ + private static PersistentTaskParams randomTaskParams() { + if (randomBoolean()) { + return null; + } + return new TestParams(randomAlphaOfLength(10)); + } +} diff --git a/server/src/test/java/org/elasticsearch/persistent/decider/EnableAssignmentDeciderTests.java b/server/src/test/java/org/elasticsearch/persistent/decider/EnableAssignmentDeciderTests.java new file mode 100644 index 00000000000..7aedde1ab9b --- /dev/null +++ b/server/src/test/java/org/elasticsearch/persistent/decider/EnableAssignmentDeciderTests.java @@ -0,0 +1,52 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.persistent.decider; + +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.persistent.PersistentTasksDecidersTestCase; + +public class EnableAssignmentDeciderTests extends PersistentTasksDecidersTestCase { + + public void testAllocationValues() { + final String all = randomFrom("all", "All", "ALL"); + assertEquals(EnableAssignmentDecider.Allocation.ALL, EnableAssignmentDecider.Allocation.fromString(all)); + + final String none = randomFrom("none", "None", "NONE"); + assertEquals(EnableAssignmentDecider.Allocation.NONE, EnableAssignmentDecider.Allocation.fromString(none)); + } + + public void testEnableAssignment() { + final int nbTasks = randomIntBetween(1, 10); + final int nbNodes = randomIntBetween(1, 5); + final EnableAssignmentDecider.Allocation allocation = randomFrom(EnableAssignmentDecider.Allocation.values()); + + Settings settings = Settings.builder() + .put(EnableAssignmentDecider.CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING.getKey(), allocation.toString()) + .build(); + updateSettings(settings); + + ClusterState clusterState = reassign(createClusterStateWithTasks(nbNodes, nbTasks)); + if (allocation == EnableAssignmentDecider.Allocation.ALL) { + assertNbAssignedTasks(nbTasks, clusterState); + } else { + assertNbUnassignedTasks(nbTasks, clusterState); + } + } +}