diff --git a/docs/reference/modules/cluster/misc.asciidoc b/docs/reference/modules/cluster/misc.asciidoc index 9e28da7391e..08568088baa 100644 --- a/docs/reference/modules/cluster/misc.asciidoc +++ b/docs/reference/modules/cluster/misc.asciidoc @@ -135,10 +135,10 @@ Plugins can create a kind of tasks called persistent tasks. Those tasks are usually long-live tasks and are stored in the cluster state, allowing the tasks to be revived after a full cluster restart. -Every time a persistent task is created, the master nodes takes care of +Every time a persistent task is created, the master node takes care of assigning the task to a node of the cluster, and the assigned node will then pick up the task and execute it locally. The process of assigning persistent -tasks to nodes is controlled by the following property, which can be updated +tasks to nodes is controlled by the following properties, which can be updated dynamically: `cluster.persistent_tasks.allocation.enable`:: @@ -153,3 +153,13 @@ This setting does not affect the persistent tasks that are already being execute Only newly created persistent tasks, or tasks that must be reassigned (after a node left the cluster, for example), are impacted by this setting. -- + +`cluster.persistent_tasks.allocation.recheck_interval`:: + + The master node will automatically check whether persistent tasks need to + be assigned when the cluster state changes significantly. However, there + may be other factors, such as memory usage, that affect whether persistent + tasks can be assigned to nodes but do not cause the cluster state to change. + This setting controls how often assignment checks are performed to react to + these factors. The default is 30 seconds. The minimum permitted value is 10 + seconds. diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index f150650948d..53fc07f53bb 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -92,6 +92,7 @@ import org.elasticsearch.monitor.jvm.JvmService; import org.elasticsearch.monitor.os.OsService; import org.elasticsearch.monitor.process.ProcessService; import org.elasticsearch.node.Node; +import org.elasticsearch.persistent.PersistentTasksClusterService; import org.elasticsearch.persistent.decider.EnableAssignmentDecider; import org.elasticsearch.plugins.PluginsService; import org.elasticsearch.repositories.fs.FsRepository; @@ -456,6 +457,7 @@ public final class ClusterSettings extends AbstractScopedSettings { Node.BREAKER_TYPE_KEY, OperationRouting.USE_ADAPTIVE_REPLICA_SELECTION_SETTING, IndexGraveyard.SETTING_MAX_TOMBSTONES, + PersistentTasksClusterService.CLUSTER_TASKS_ALLOCATION_RECHECK_INTERVAL_SETTING, EnableAssignmentDecider.CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING, PeerFinder.DISCOVERY_FIND_PEERS_INTERVAL_SETTING, PeerFinder.DISCOVERY_REQUEST_PEERS_TIMEOUT_SETTING, diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/AbstractAsyncTask.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/AbstractAsyncTask.java new file mode 100644 index 00000000000..e06e1a41907 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/AbstractAsyncTask.java @@ -0,0 +1,184 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.common.util.concurrent; + +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.threadpool.ThreadPool; + +import java.io.Closeable; +import java.util.Objects; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * A base class for tasks that need to repeat. + */ +public abstract class AbstractAsyncTask implements Runnable, Closeable { + + private final Logger logger; + private final ThreadPool threadPool; + private final AtomicBoolean closed = new AtomicBoolean(false); + private final boolean autoReschedule; + private volatile ScheduledFuture scheduledFuture; + private volatile boolean isScheduledOrRunning; + private volatile Exception lastThrownException; + private volatile TimeValue interval; + + protected AbstractAsyncTask(Logger logger, ThreadPool threadPool, TimeValue interval, boolean autoReschedule) { + this.logger = logger; + this.threadPool = threadPool; + this.interval = interval; + this.autoReschedule = autoReschedule; + } + + /** + * Change the interval between runs. + * If a future run is scheduled then this will reschedule it. + * @param interval The new interval between runs. + */ + public synchronized void setInterval(TimeValue interval) { + this.interval = interval; + if (scheduledFuture != null) { + rescheduleIfNecessary(); + } + } + + public TimeValue getInterval() { + return interval; + } + + /** + * Test any external conditions that determine whether the task + * should be scheduled. This method does *not* need to test if + * the task is closed, as being closed automatically prevents + * scheduling. + * @return Should the task be scheduled to run? + */ + protected abstract boolean mustReschedule(); + + /** + * Schedule the task to run after the configured interval if it + * is not closed and any further conditions imposed by derived + * classes are met. Any previously scheduled invocation is + * cancelled. + */ + public synchronized void rescheduleIfNecessary() { + if (isClosed()) { + return; + } + if (scheduledFuture != null) { + FutureUtils.cancel(scheduledFuture); + } + if (interval.millis() > 0 && mustReschedule()) { + if (logger.isTraceEnabled()) { + logger.trace("scheduling {} every {}", toString(), interval); + } + scheduledFuture = threadPool.schedule(interval, getThreadPool(), this); + isScheduledOrRunning = true; + } else { + logger.trace("scheduled {} disabled", toString()); + scheduledFuture = null; + isScheduledOrRunning = false; + } + } + + public boolean isScheduled() { + // Currently running counts as scheduled to avoid an oscillating return value + // from this method when a task is repeatedly running and rescheduling itself. + return isScheduledOrRunning; + } + + /** + * Cancel any scheduled run, but do not prevent subsequent restarts. + */ + public synchronized void cancel() { + FutureUtils.cancel(scheduledFuture); + scheduledFuture = null; + isScheduledOrRunning = false; + } + + /** + * Cancel any scheduled run + */ + @Override + public synchronized void close() { + if (closed.compareAndSet(false, true)) { + cancel(); + } + } + + public boolean isClosed() { + return this.closed.get(); + } + + @Override + public final void run() { + synchronized (this) { + scheduledFuture = null; + isScheduledOrRunning = autoReschedule; + } + try { + runInternal(); + } catch (Exception ex) { + if (lastThrownException == null || sameException(lastThrownException, ex) == false) { + // prevent the annoying fact of logging the same stuff all the time with an interval of 1 sec will spam all your logs + logger.warn( + () -> new ParameterizedMessage( + "failed to run task {} - suppressing re-occurring exceptions unless the exception changes", + toString()), + ex); + lastThrownException = ex; + } + } finally { + if (autoReschedule) { + rescheduleIfNecessary(); + } + } + } + + private static boolean sameException(Exception left, Exception right) { + if (left.getClass() == right.getClass()) { + if (Objects.equals(left.getMessage(), right.getMessage())) { + StackTraceElement[] stackTraceLeft = left.getStackTrace(); + StackTraceElement[] stackTraceRight = right.getStackTrace(); + if (stackTraceLeft.length == stackTraceRight.length) { + for (int i = 0; i < stackTraceLeft.length; i++) { + if (stackTraceLeft[i].equals(stackTraceRight[i]) == false) { + return false; + } + } + return true; + } + } + } + return false; + } + + protected abstract void runInternal(); + + /** + * Use the same threadpool by default. + * Derived classes can change this if required. + */ + protected String getThreadPool() { + return ThreadPool.Names.SAME; + } +} diff --git a/server/src/main/java/org/elasticsearch/index/IndexService.java b/server/src/main/java/org/elasticsearch/index/IndexService.java index c34a5228b7f..54bf5fa1aa1 100644 --- a/server/src/main/java/org/elasticsearch/index/IndexService.java +++ b/server/src/main/java/org/elasticsearch/index/IndexService.java @@ -38,8 +38,8 @@ import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.concurrent.AbstractAsyncTask; import org.elasticsearch.common.util.concurrent.AbstractRunnable; -import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.env.NodeEnvironment; @@ -87,7 +87,6 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; @@ -805,100 +804,18 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust } } - abstract static class BaseAsyncTask implements Runnable, Closeable { + abstract static class BaseAsyncTask extends AbstractAsyncTask { protected final IndexService indexService; - protected final ThreadPool threadPool; - private final TimeValue interval; - private ScheduledFuture scheduledFuture; - private final AtomicBoolean closed = new AtomicBoolean(false); - private volatile Exception lastThrownException; BaseAsyncTask(IndexService indexService, TimeValue interval) { + super(indexService.logger, indexService.threadPool, interval, true); this.indexService = indexService; - this.threadPool = indexService.getThreadPool(); - this.interval = interval; - onTaskCompletion(); + rescheduleIfNecessary(); } - boolean mustReschedule() { + protected boolean mustReschedule() { // don't re-schedule if its closed or if we don't have a single shard here..., we are done - return indexService.closed.get() == false - && closed.get() == false && interval.millis() > 0; - } - - private synchronized void onTaskCompletion() { - if (mustReschedule()) { - if (indexService.logger.isTraceEnabled()) { - indexService.logger.trace("scheduling {} every {}", toString(), interval); - } - this.scheduledFuture = threadPool.schedule(interval, getThreadPool(), BaseAsyncTask.this); - } else { - indexService.logger.trace("scheduled {} disabled", toString()); - this.scheduledFuture = null; - } - } - - boolean isScheduled() { - return scheduledFuture != null; - } - - @Override - public final void run() { - try { - runInternal(); - } catch (Exception ex) { - if (lastThrownException == null || sameException(lastThrownException, ex) == false) { - // prevent the annoying fact of logging the same stuff all the time with an interval of 1 sec will spam all your logs - indexService.logger.warn( - () -> new ParameterizedMessage( - "failed to run task {} - suppressing re-occurring exceptions unless the exception changes", - toString()), - ex); - lastThrownException = ex; - } - } finally { - onTaskCompletion(); - } - } - - private static boolean sameException(Exception left, Exception right) { - if (left.getClass() == right.getClass()) { - if (Objects.equals(left.getMessage(), right.getMessage())) { - StackTraceElement[] stackTraceLeft = left.getStackTrace(); - StackTraceElement[] stackTraceRight = right.getStackTrace(); - if (stackTraceLeft.length == stackTraceRight.length) { - for (int i = 0; i < stackTraceLeft.length; i++) { - if (stackTraceLeft[i].equals(stackTraceRight[i]) == false) { - return false; - } - } - return true; - } - } - } - return false; - } - - protected abstract void runInternal(); - - protected String getThreadPool() { - return ThreadPool.Names.SAME; - } - - @Override - public synchronized void close() { - if (closed.compareAndSet(false, true)) { - FutureUtils.cancel(scheduledFuture); - scheduledFuture = null; - } - } - - TimeValue getInterval() { - return interval; - } - - boolean isClosed() { - return this.closed.get(); + return indexService.closed.get() == false; } } diff --git a/server/src/main/java/org/elasticsearch/node/Node.java b/server/src/main/java/org/elasticsearch/node/Node.java index 6d91a348d3a..068b713cb8c 100644 --- a/server/src/main/java/org/elasticsearch/node/Node.java +++ b/server/src/main/java/org/elasticsearch/node/Node.java @@ -501,7 +501,8 @@ public class Node implements Closeable { final PersistentTasksExecutorRegistry registry = new PersistentTasksExecutorRegistry(tasksExecutors); final PersistentTasksClusterService persistentTasksClusterService = - new PersistentTasksClusterService(settings, registry, clusterService); + new PersistentTasksClusterService(settings, registry, clusterService, threadPool); + resourcesToClose.add(persistentTasksClusterService); final PersistentTasksService persistentTasksService = new PersistentTasksService(clusterService, threadPool, client); modules.add(b -> { diff --git a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java index 2adeb04e4ee..27aac8661ba 100644 --- a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java +++ b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java @@ -31,30 +31,55 @@ import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.AbstractAsyncTask; import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Assignment; import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; import org.elasticsearch.persistent.decider.AssignmentDecision; import org.elasticsearch.persistent.decider.EnableAssignmentDecider; +import org.elasticsearch.threadpool.ThreadPool; +import java.io.Closeable; import java.util.Objects; /** * Component that runs only on the master node and is responsible for assigning running tasks to nodes */ -public class PersistentTasksClusterService implements ClusterStateListener { +public class PersistentTasksClusterService implements ClusterStateListener, Closeable { + + public static final Setting CLUSTER_TASKS_ALLOCATION_RECHECK_INTERVAL_SETTING = + Setting.timeSetting("cluster.persistent_tasks.allocation.recheck_interval", TimeValue.timeValueSeconds(30), + TimeValue.timeValueSeconds(10), Setting.Property.Dynamic, Setting.Property.NodeScope); private static final Logger logger = LogManager.getLogger(PersistentTasksClusterService.class); private final ClusterService clusterService; private final PersistentTasksExecutorRegistry registry; private final EnableAssignmentDecider decider; + private final ThreadPool threadPool; + private final PeriodicRechecker periodicRechecker; - public PersistentTasksClusterService(Settings settings, PersistentTasksExecutorRegistry registry, ClusterService clusterService) { + public PersistentTasksClusterService(Settings settings, PersistentTasksExecutorRegistry registry, ClusterService clusterService, + ThreadPool threadPool) { this.clusterService = clusterService; - clusterService.addListener(this); this.registry = registry; this.decider = new EnableAssignmentDecider(settings, clusterService.getClusterSettings()); + this.threadPool = threadPool; + this.periodicRechecker = new PeriodicRechecker(CLUSTER_TASKS_ALLOCATION_RECHECK_INTERVAL_SETTING.get(settings)); + clusterService.addListener(this); + clusterService.getClusterSettings().addSettingsUpdateConsumer(CLUSTER_TASKS_ALLOCATION_RECHECK_INTERVAL_SETTING, + this::setRecheckInterval); + } + + void setRecheckInterval(TimeValue recheckInterval) { + periodicRechecker.setInterval(recheckInterval); + } + + @Override + public void close() { + periodicRechecker.close(); } /** @@ -91,7 +116,11 @@ public class PersistentTasksClusterService implements ClusterStateListener { public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { PersistentTasksCustomMetaData tasks = newState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); if (tasks != null) { - listener.onResponse(tasks.getTask(taskId)); + PersistentTask task = tasks.getTask(taskId); + listener.onResponse(task); + if (task != null && task.isAssigned() == false && periodicRechecker.isScheduled() == false) { + periodicRechecker.rescheduleIfNecessary(); + } } else { listener.onResponse(null); } @@ -155,7 +184,7 @@ public class PersistentTasksClusterService implements ClusterStateListener { public void removePersistentTask(String id, ActionListener> listener) { clusterService.submitStateUpdateTask("remove persistent task", new ClusterStateUpdateTask() { @Override - public ClusterState execute(ClusterState currentState) throws Exception { + public ClusterState execute(ClusterState currentState) { PersistentTasksCustomMetaData.Builder tasksInProgress = builder(currentState); if (tasksInProgress.hasTask(id)) { return update(currentState, tasksInProgress.removeTask(id)); @@ -243,22 +272,41 @@ public class PersistentTasksClusterService implements ClusterStateListener { public void clusterChanged(ClusterChangedEvent event) { if (event.localNodeMaster()) { if (shouldReassignPersistentTasks(event)) { + // We want to avoid a periodic check duplicating this work + periodicRechecker.cancel(); logger.trace("checking task reassignment for cluster state {}", event.state().getVersion()); - clusterService.submitStateUpdateTask("reassign persistent tasks", new ClusterStateUpdateTask() { - @Override - public ClusterState execute(ClusterState currentState) { - return reassignTasks(currentState); - } - - @Override - public void onFailure(String source, Exception e) { - logger.warn("failed to reassign persistent tasks", e); - } - }); + reassignPersistentTasks(); } } } + /** + * Submit a cluster state update to reassign any persistent tasks that need reassigning + */ + private void reassignPersistentTasks() { + clusterService.submitStateUpdateTask("reassign persistent tasks", new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) { + return reassignTasks(currentState); + } + + @Override + public void onFailure(String source, Exception e) { + logger.warn("failed to reassign persistent tasks", e); + // There must be a task that's worth rechecking because there was one + // that caused this method to be called and the method failed to assign it + periodicRechecker.rescheduleIfNecessary(); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + if (isAnyTaskUnassigned(newState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE))) { + periodicRechecker.rescheduleIfNecessary(); + } + } + }); + } + /** * Returns true if the cluster state change(s) require to reassign some persistent tasks. It can happen in the following * situations: a node left or is added, the routing table changed, the master node changed, the metadata changed or the @@ -290,6 +338,13 @@ public class PersistentTasksClusterService implements ClusterStateListener { return false; } + /** + * Returns true if any persistent task is unassigned. + */ + private boolean isAnyTaskUnassigned(final PersistentTasksCustomMetaData tasks) { + return tasks != null && tasks.tasks().stream().anyMatch(task -> task.getAssignment().isAssigned() == false); + } + /** * Evaluates the cluster state and tries to assign tasks to nodes. * @@ -347,4 +402,35 @@ public class PersistentTasksClusterService implements ClusterStateListener { return currentState; } } + + /** + * Class to periodically try to reassign unassigned persistent tasks. + */ + private class PeriodicRechecker extends AbstractAsyncTask { + + PeriodicRechecker(TimeValue recheckInterval) { + super(logger, threadPool, recheckInterval, false); + } + + @Override + protected boolean mustReschedule() { + return true; + } + + @Override + public void runInternal() { + if (clusterService.localNode().isMasterNode()) { + final ClusterState state = clusterService.state(); + logger.trace("periodic persistent task assignment check running for cluster state {}", state.getVersion()); + if (isAnyTaskUnassigned(state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE))) { + reassignPersistentTasks(); + } + } + } + + @Override + public String toString() { + return "persistent_task_recheck"; + } + } } diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/AbstractAsyncTaskTests.java b/server/src/test/java/org/elasticsearch/common/util/concurrent/AbstractAsyncTaskTests.java new file mode 100644 index 00000000000..3a1cab90f0d --- /dev/null +++ b/server/src/test/java/org/elasticsearch/common/util/concurrent/AbstractAsyncTaskTests.java @@ -0,0 +1,206 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.common.util.concurrent; + +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; +import org.junit.AfterClass; +import org.junit.BeforeClass; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; + +public class AbstractAsyncTaskTests extends ESTestCase { + + private static ThreadPool threadPool; + + @BeforeClass + public static void setUpThreadPool() { + threadPool = new TestThreadPool(AbstractAsyncTaskTests.class.getSimpleName()); + } + + @AfterClass + public static void tearDownThreadPool() { + terminate(threadPool); + } + + public void testAutoRepeat() throws Exception { + + boolean shouldRunThrowException = randomBoolean(); + final CyclicBarrier barrier1 = new CyclicBarrier(2); // 1 for runInternal plus 1 for the test sequence + final CyclicBarrier barrier2 = new CyclicBarrier(2); // 1 for runInternal plus 1 for the test sequence + final AtomicInteger count = new AtomicInteger(); + AbstractAsyncTask task = new AbstractAsyncTask(logger, threadPool, TimeValue.timeValueMillis(1), true) { + + @Override + protected boolean mustReschedule() { + return true; + } + + @Override + protected void runInternal() { + assertTrue("generic threadpool is configured", Thread.currentThread().getName().contains("[generic]")); + try { + barrier1.await(); + } catch (Exception e) { + fail("interrupted"); + } + count.incrementAndGet(); + try { + barrier2.await(); + } catch (Exception e) { + fail("interrupted"); + } + if (shouldRunThrowException) { + throw new RuntimeException("foo"); + } + } + + @Override + protected String getThreadPool() { + return ThreadPool.Names.GENERIC; + } + }; + + assertFalse(task.isScheduled()); + task.rescheduleIfNecessary(); + assertTrue(task.isScheduled()); + barrier1.await(); + assertTrue(task.isScheduled()); + barrier2.await(); + assertEquals(1, count.get()); + barrier1.reset(); + barrier2.reset(); + barrier1.await(); + assertTrue(task.isScheduled()); + task.close(); + barrier2.await(); + assertEquals(2, count.get()); + assertTrue(task.isClosed()); + assertFalse(task.isScheduled()); + assertEquals(2, count.get()); + } + + public void testManualRepeat() throws Exception { + + boolean shouldRunThrowException = randomBoolean(); + final CyclicBarrier barrier = new CyclicBarrier(2); // 1 for runInternal plus 1 for the test sequence + final AtomicInteger count = new AtomicInteger(); + AbstractAsyncTask task = new AbstractAsyncTask(logger, threadPool, TimeValue.timeValueMillis(1), false) { + + @Override + protected boolean mustReschedule() { + return true; + } + + @Override + protected void runInternal() { + assertTrue("generic threadpool is configured", Thread.currentThread().getName().contains("[generic]")); + count.incrementAndGet(); + try { + barrier.await(); + } catch (Exception e) { + fail("interrupted"); + } + if (shouldRunThrowException) { + throw new RuntimeException("foo"); + } + } + + @Override + protected String getThreadPool() { + return ThreadPool.Names.GENERIC; + } + }; + + assertFalse(task.isScheduled()); + task.rescheduleIfNecessary(); + barrier.await(); + assertEquals(1, count.get()); + assertFalse(task.isScheduled()); + barrier.reset(); + expectThrows(TimeoutException.class, () -> barrier.await(10, TimeUnit.MILLISECONDS)); + assertEquals(1, count.get()); + barrier.reset(); + task.rescheduleIfNecessary(); + barrier.await(); + assertEquals(2, count.get()); + assertFalse(task.isScheduled()); + assertFalse(task.isClosed()); + task.close(); + assertTrue(task.isClosed()); + } + + public void testCloseWithNoRun() { + + AbstractAsyncTask task = new AbstractAsyncTask(logger, threadPool, TimeValue.timeValueMinutes(10), true) { + + @Override + protected boolean mustReschedule() { + return true; + } + + @Override + protected void runInternal() { + } + }; + + assertFalse(task.isScheduled()); + task.rescheduleIfNecessary(); + assertTrue(task.isScheduled()); + task.close(); + assertTrue(task.isClosed()); + assertFalse(task.isScheduled()); + } + + public void testChangeInterval() throws Exception { + + final CountDownLatch latch = new CountDownLatch(2); + + AbstractAsyncTask task = new AbstractAsyncTask(logger, threadPool, TimeValue.timeValueHours(1), true) { + + @Override + protected boolean mustReschedule() { + return latch.getCount() > 0; + } + + @Override + protected void runInternal() { + latch.countDown(); + } + }; + + assertFalse(task.isScheduled()); + task.rescheduleIfNecessary(); + assertTrue(task.isScheduled()); + task.setInterval(TimeValue.timeValueMillis(1)); + assertTrue(task.isScheduled()); + // This should only take 2 milliseconds in ideal conditions, but allow 10 seconds in case of VM stalls + assertTrue(latch.await(10, TimeUnit.SECONDS)); + assertBusy(() -> assertFalse(task.isScheduled())); + task.close(); + assertFalse(task.isScheduled()); + assertTrue(task.isClosed()); + } +} diff --git a/server/src/test/java/org/elasticsearch/index/IndexServiceTests.java b/server/src/test/java/org/elasticsearch/index/IndexServiceTests.java index 52513ce7a8b..a47d4db2a25 100644 --- a/server/src/test/java/org/elasticsearch/index/IndexServiceTests.java +++ b/server/src/test/java/org/elasticsearch/index/IndexServiceTests.java @@ -136,7 +136,6 @@ public class IndexServiceTests extends ESSingleNodeTestCase { assertNotSame(refreshTask, indexService.getRefreshTask()); assertTrue(refreshTask.isClosed()); assertFalse(refreshTask.isScheduled()); - assertFalse(indexService.getRefreshTask().mustReschedule()); // set it to 100ms client().admin().indices().prepareUpdateSettings("test") diff --git a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksClusterServiceTests.java b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksClusterServiceTests.java index 3eec748808e..ebf77d1e803 100644 --- a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksClusterServiceTests.java +++ b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksClusterServiceTests.java @@ -24,6 +24,7 @@ import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -32,6 +33,7 @@ import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Assignment; import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestParams; @@ -51,6 +53,8 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; import static java.util.Collections.emptyMap; @@ -63,6 +67,11 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class PersistentTasksClusterServiceTests extends ESTestCase { @@ -71,6 +80,8 @@ public class PersistentTasksClusterServiceTests extends ESTestCase { /** Needed by {@link PersistentTasksClusterService} **/ private ClusterService clusterService; + private volatile boolean nonClusterStateCondition; + @BeforeClass public static void setUpThreadPool() { threadPool = new TestThreadPool(PersistentTasksClusterServiceTests.class.getSimpleName()); @@ -83,7 +94,7 @@ public class PersistentTasksClusterServiceTests extends ESTestCase { } @AfterClass - public static void tearDownThreadPool() throws Exception { + public static void tearDownThreadPool() { terminate(threadPool); } @@ -177,7 +188,7 @@ public class PersistentTasksClusterServiceTests extends ESTestCase { addTestNodes(nodes, randomIntBetween(1, 10)); int numberOfTasks = randomIntBetween(2, 40); for (int i = 0; i < numberOfTasks; i++) { - addTask(tasks, "assign_one", randomBoolean() ? null : "no_longer_exits"); + addTask(tasks, "assign_one", randomBoolean() ? null : "no_longer_exists"); } MetaData.Builder metaData = MetaData.builder(clusterState.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, tasks.build()); @@ -186,7 +197,42 @@ public class PersistentTasksClusterServiceTests extends ESTestCase { PersistentTasksCustomMetaData tasksInProgress = newClusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); assertThat(tasksInProgress, notNullValue()); + } + public void testNonClusterStateConditionAssignment() { + ClusterState clusterState = initialState(); + ClusterState.Builder builder = ClusterState.builder(clusterState); + PersistentTasksCustomMetaData.Builder tasks = PersistentTasksCustomMetaData.builder( + clusterState.metaData().custom(PersistentTasksCustomMetaData.TYPE)); + DiscoveryNodes.Builder nodes = DiscoveryNodes.builder(clusterState.nodes()); + addTestNodes(nodes, randomIntBetween(1, 3)); + addTask(tasks, "assign_based_on_non_cluster_state_condition", null); + MetaData.Builder metaData = MetaData.builder(clusterState.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, tasks.build()); + clusterState = builder.metaData(metaData).nodes(nodes).build(); + + nonClusterStateCondition = false; + ClusterState newClusterState = reassign(clusterState); + + PersistentTasksCustomMetaData tasksInProgress = newClusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); + assertThat(tasksInProgress, notNullValue()); + for (PersistentTask task : tasksInProgress.tasks()) { + assertThat(task.getExecutorNode(), nullValue()); + assertThat(task.isAssigned(), equalTo(false)); + assertThat(task.getAssignment().getExplanation(), equalTo("non-cluster state condition prevents assignment")); + } + assertThat(tasksInProgress.tasks().size(), equalTo(1)); + + nonClusterStateCondition = true; + ClusterState finalClusterState = reassign(newClusterState); + + tasksInProgress = finalClusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); + assertThat(tasksInProgress, notNullValue()); + for (PersistentTask task : tasksInProgress.tasks()) { + assertThat(task.getExecutorNode(), notNullValue()); + assertThat(task.isAssigned(), equalTo(true)); + assertThat(task.getAssignment().getExplanation(), equalTo("test assignment")); + } + assertThat(tasksInProgress.tasks().size(), equalTo(1)); } public void testReassignTasks() { @@ -201,14 +247,14 @@ public class PersistentTasksClusterServiceTests extends ESTestCase { switch (randomInt(2)) { case 0: // add an unassigned task that should get assigned because it's assigned to a non-existing node or unassigned - addTask(tasks, "assign_me", randomBoolean() ? null : "no_longer_exits"); + addTask(tasks, "assign_me", randomBoolean() ? null : "no_longer_exists"); break; case 1: // add a task assigned to non-existing node that should not get assigned - addTask(tasks, "dont_assign_me", randomBoolean() ? null : "no_longer_exits"); + addTask(tasks, "dont_assign_me", randomBoolean() ? null : "no_longer_exists"); break; case 2: - addTask(tasks, "assign_one", randomBoolean() ? null : "no_longer_exits"); + addTask(tasks, "assign_one", randomBoolean() ? null : "no_longer_exists"); break; } @@ -368,6 +414,80 @@ public class PersistentTasksClusterServiceTests extends ESTestCase { assertFalse(needsReassignment(new Assignment("_node_1", "assigned"), nodes)); } + public void testPeriodicRecheck() throws Exception { + ClusterState initialState = initialState(); + ClusterState.Builder builder = ClusterState.builder(initialState); + PersistentTasksCustomMetaData.Builder tasks = PersistentTasksCustomMetaData.builder( + initialState.metaData().custom(PersistentTasksCustomMetaData.TYPE)); + DiscoveryNodes.Builder nodes = DiscoveryNodes.builder(initialState.nodes()); + addTestNodes(nodes, randomIntBetween(1, 3)); + addTask(tasks, "assign_based_on_non_cluster_state_condition", null); + MetaData.Builder metaData = MetaData.builder(initialState.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, tasks.build()); + ClusterState clusterState = builder.metaData(metaData).nodes(nodes).build(); + + nonClusterStateCondition = false; + + boolean shouldSimulateFailure = randomBoolean(); + ClusterService recheckTestClusterService = createRecheckTestClusterService(clusterState, shouldSimulateFailure); + PersistentTasksClusterService service = createService(recheckTestClusterService, + (params, currentState) -> assignBasedOnNonClusterStateCondition(currentState.nodes())); + + ClusterChangedEvent event = new ClusterChangedEvent("test", clusterState, initialState); + service.clusterChanged(event); + ClusterState newClusterState = recheckTestClusterService.state(); + + { + PersistentTasksCustomMetaData tasksInProgress = newClusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); + assertThat(tasksInProgress, notNullValue()); + for (PersistentTask task : tasksInProgress.tasks()) { + assertThat(task.getExecutorNode(), nullValue()); + assertThat(task.isAssigned(), equalTo(false)); + assertThat(task.getAssignment().getExplanation(), equalTo(shouldSimulateFailure ? + "explanation: assign_based_on_non_cluster_state_condition" : "non-cluster state condition prevents assignment")); + } + assertThat(tasksInProgress.tasks().size(), equalTo(1)); + } + + nonClusterStateCondition = true; + service.setRecheckInterval(TimeValue.timeValueMillis(1)); + + assertBusy(() -> { + PersistentTasksCustomMetaData tasksInProgress = + recheckTestClusterService.state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE); + assertThat(tasksInProgress, notNullValue()); + for (PersistentTask task : tasksInProgress.tasks()) { + assertThat(task.getExecutorNode(), notNullValue()); + assertThat(task.isAssigned(), equalTo(true)); + assertThat(task.getAssignment().getExplanation(), equalTo("test assignment")); + } + assertThat(tasksInProgress.tasks().size(), equalTo(1)); + }); + } + + private ClusterService createRecheckTestClusterService(ClusterState initialState, boolean shouldSimulateFailure) { + AtomicBoolean testFailureNextTime = new AtomicBoolean(shouldSimulateFailure); + AtomicReference state = new AtomicReference<>(initialState); + ClusterService recheckTestClusterService = mock(ClusterService.class); + when(recheckTestClusterService.getClusterSettings()).thenReturn(clusterService.getClusterSettings()); + doAnswer(invocationOnMock -> state.get().getNodes().getLocalNode()).when(recheckTestClusterService).localNode(); + doAnswer(invocationOnMock -> state.get()).when(recheckTestClusterService).state(); + doAnswer(invocationOnMock -> { + @SuppressWarnings("unchecked") + ClusterStateUpdateTask task = (ClusterStateUpdateTask) invocationOnMock.getArguments()[1]; + ClusterState before = state.get(); + ClusterState after = task.execute(before); + if (testFailureNextTime.compareAndSet(true, false)) { + task.onFailure("testing failure", new RuntimeException("foo")); + } else { + state.set(after); + task.clusterStateProcessed("test", before, after); + } + return null; + }).when(recheckTestClusterService).submitStateUpdateTask(anyString(), any(ClusterStateUpdateTask.class)); + + return recheckTestClusterService; + } + private void addTestNodes(DiscoveryNodes.Builder nodes, int nonLocalNodesCount) { for (int i = 0; i < nonLocalNodesCount; i++) { nodes.add(new DiscoveryNode("other_node_" + i, buildNewFakeTransportAddress(), Version.CURRENT)); @@ -387,6 +507,8 @@ public class PersistentTasksClusterServiceTests extends ESTestCase { return null; case "assign_one": return assignOnlyOneTaskAtATime(currentState); + case "assign_based_on_non_cluster_state_condition": + return assignBasedOnNonClusterStateCondition(currentState.nodes()); default: fail("unknown param " + testParams.getTestParam()); } @@ -408,6 +530,14 @@ public class PersistentTasksClusterServiceTests extends ESTestCase { } } + private Assignment assignBasedOnNonClusterStateCondition(DiscoveryNodes nodes) { + if (nonClusterStateCondition) { + return randomNodeAssignment(nodes); + } else { + return new Assignment(null, "non-cluster state condition prevents assignment"); + } + } + private Assignment randomNodeAssignment(DiscoveryNodes nodes) { if (nodes.getNodes().isEmpty()) { return NO_NODE_FOUND; @@ -623,6 +753,7 @@ public class PersistentTasksClusterServiceTests extends ESTestCase { nodes.masterNodeId("this_node"); return ClusterState.builder(ClusterName.DEFAULT) + .nodes(nodes) .metaData(metaData) .routingTable(routingTable.build()) .build(); @@ -640,6 +771,11 @@ public class PersistentTasksClusterServiceTests extends ESTestCase { /** Creates a PersistentTasksClusterService with a single PersistentTasksExecutor implemented by a BiFunction **/ private

PersistentTasksClusterService createService(final BiFunction fn) { + return createService(clusterService, fn); + } + + private

PersistentTasksClusterService createService(ClusterService clusterService, + final BiFunction fn) { PersistentTasksExecutorRegistry registry = new PersistentTasksExecutorRegistry( singleton(new PersistentTasksExecutor

(TestPersistentTasksExecutor.NAME, null) { @Override @@ -652,6 +788,6 @@ public class PersistentTasksClusterServiceTests extends ESTestCase { throw new UnsupportedOperationException(); } })); - return new PersistentTasksClusterService(Settings.EMPTY, registry, clusterService); + return new PersistentTasksClusterService(Settings.EMPTY, registry, clusterService, threadPool); } } diff --git a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksDecidersTestCase.java b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksDecidersTestCase.java index 7f2dada7c4c..2007c350b55 100644 --- a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksDecidersTestCase.java +++ b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksDecidersTestCase.java @@ -69,11 +69,12 @@ public abstract class PersistentTasksDecidersTestCase extends ESTestCase { }; } }; - persistentTasksClusterService = new PersistentTasksClusterService(clusterService.getSettings(), registry, clusterService); + persistentTasksClusterService = new PersistentTasksClusterService(clusterService.getSettings(), registry, clusterService, + threadPool); } @AfterClass - public static void tearDownThreadPool() throws Exception { + public static void tearDownThreadPool() { terminate(threadPool); } diff --git a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksExecutorIT.java b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksExecutorIT.java index e746ff71627..8f6393986da 100644 --- a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksExecutorIT.java +++ b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksExecutorIT.java @@ -36,6 +36,7 @@ import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestPersistentTask import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestParams; import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestTasksRequestBuilder; import org.junit.After; +import org.junit.Before; import java.util.Collection; import java.util.Collections; @@ -45,6 +46,7 @@ import java.util.Objects; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertThrows; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, minNumDataNodes = 2) @@ -64,6 +66,11 @@ public class PersistentTasksExecutorIT extends ESIntegTestCase { return true; } + @Before + public void resetNonClusterStateCondition() { + TestPersistentTasksExecutor.setNonClusterStateCondition(true); + } + @After public void cleanup() throws Exception { assertNoRunningTasks(); @@ -173,6 +180,42 @@ public class PersistentTasksExecutorIT extends ESIntegTestCase { assertEquals(removeFuture.get().getId(), taskId); } + public void testPersistentActionWithNonClusterStateCondition() throws Exception { + PersistentTasksClusterService persistentTasksClusterService = + internalCluster().getInstance(PersistentTasksClusterService.class, internalCluster().getMasterName()); + // Speed up rechecks to a rate that is quicker than what settings would allow + persistentTasksClusterService.setRecheckInterval(TimeValue.timeValueMillis(1)); + + TestPersistentTasksExecutor.setNonClusterStateCondition(false); + + PersistentTasksService persistentTasksService = internalCluster().getInstance(PersistentTasksService.class); + PlainActionFuture> future = new PlainActionFuture<>(); + TestParams testParams = new TestParams("Blah"); + persistentTasksService.sendStartRequest(UUIDs.base64UUID(), TestPersistentTasksExecutor.NAME, testParams, future); + String taskId = future.get().getId(); + + assertThat(client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]").get().getTasks(), + empty()); + + TestPersistentTasksExecutor.setNonClusterStateCondition(true); + + assertBusy(() -> { + // Wait for the task to start + assertThat(client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]").get().getTasks() + .size(), equalTo(1)); + }); + TaskInfo taskInfo = client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]") + .get().getTasks().get(0); + + // Verifying the the task can now be assigned + assertThat(taskInfo.getTaskId().getNodeId(), notNullValue()); + + // Remove the persistent task + PlainActionFuture> removeFuture = new PlainActionFuture<>(); + persistentTasksService.sendRemoveRequest(taskId, removeFuture); + assertEquals(removeFuture.get().getId(), taskId); + } + public void testPersistentActionStatusUpdate() throws Exception { PersistentTasksService persistentTasksService = internalCluster().getInstance(PersistentTasksService.class); PlainActionFuture> future = new PlainActionFuture<>(); @@ -277,8 +320,6 @@ public class PersistentTasksExecutorIT extends ESIntegTestCase { assertThat(client().admin().cluster().prepareCancelTasks().setTaskId(taskId) .get().getTasks().size(), equalTo(1)); } - - } private void assertNoRunningTasks() throws Exception { diff --git a/server/src/test/java/org/elasticsearch/persistent/TestPersistentTasksPlugin.java b/server/src/test/java/org/elasticsearch/persistent/TestPersistentTasksPlugin.java index 352e8108c38..151129c0cc1 100644 --- a/server/src/test/java/org/elasticsearch/persistent/TestPersistentTasksPlugin.java +++ b/server/src/test/java/org/elasticsearch/persistent/TestPersistentTasksPlugin.java @@ -298,13 +298,22 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin, P public static final String NAME = "cluster:admin/persistent/test"; private final ClusterService clusterService; + private static volatile boolean nonClusterStateCondition = true; + public TestPersistentTasksExecutor(ClusterService clusterService) { super(NAME, ThreadPool.Names.GENERIC); this.clusterService = clusterService; } + public static void setNonClusterStateCondition(boolean nonClusterStateCondition) { + TestPersistentTasksExecutor.nonClusterStateCondition = nonClusterStateCondition; + } + @Override public Assignment getAssignment(TestParams params, ClusterState clusterState) { + if (nonClusterStateCondition == false) { + return new Assignment(null, "non cluster state condition prevents assignment"); + } if (params == null || params.getExecutorNodeAttr() == null) { return super.getAssignment(params, clusterState); } else { @@ -315,7 +324,6 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin, P } else { return NO_NODE_FOUND; } - } }