From cd412893965e244cc708dae25f8d5bf44baac723 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Fri, 18 Jan 2019 14:48:20 -0700 Subject: [PATCH] Add local session timeouts to leader node (#37438) This is related to #35975. This commit adds timeout functionality to the local session on a leader node. When a session is started, a timeout is scheduled using a repeatable runnable. If the session is not accessed in between two runs the session is closed. When the sssion is closed, the repeating task is cancelled. Additionally, this commit moves session uuid generation to the leader cluster. And renames the PutCcrRestoreSessionRequest to StartCcrRestoreSessionRequest to reflect that change. --- .../java/org/elasticsearch/xpack/ccr/Ccr.java | 4 +- .../elasticsearch/xpack/ccr/CcrSettings.java | 19 +++ .../repository/CcrRestoreSourceService.java | 102 +++++++++----- .../CcrRestoreSourceServiceTests.java | 133 ++++++++++++------ 4 files changed, 177 insertions(+), 81 deletions(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java index 6ff0460d51b..4dd167a6568 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java @@ -161,10 +161,10 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E return emptyList(); } - CcrRestoreSourceService restoreSourceService = new CcrRestoreSourceService(); - this.restoreSourceService.set(restoreSourceService); CcrSettings ccrSettings = new CcrSettings(settings, clusterService.getClusterSettings()); this.ccrSettings.set(ccrSettings); + CcrRestoreSourceService restoreSourceService = new CcrRestoreSourceService(threadPool, ccrSettings); + this.restoreSourceService.set(restoreSourceService); return Arrays.asList( ccrLicenseChecker, restoreSourceService, diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrSettings.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrSettings.java index fe0eb7853e3..26089ab4695 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrSettings.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrSettings.java @@ -43,6 +43,14 @@ public final class CcrSettings { Setting.byteSizeSetting("ccr.indices.recovery.max_bytes_per_sec", new ByteSizeValue(40, ByteSizeUnit.MB), Setting.Property.Dynamic, Setting.Property.NodeScope); + /** + * The leader must open resources for a ccr recovery. If there is no activity for this interval of time, + * the leader will close the restore session. + */ + public static final Setting INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING = + Setting.timeSetting("ccr.indices.recovery.recovery_activity_timeout", TimeValue.timeValueSeconds(60), + Setting.Property.Dynamic, Setting.Property.NodeScope); + /** * The settings defined by CCR. * @@ -53,22 +61,33 @@ public final class CcrSettings { XPackSettings.CCR_ENABLED_SETTING, CCR_FOLLOWING_INDEX_SETTING, RECOVERY_MAX_BYTES_PER_SECOND, + INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING, CCR_AUTO_FOLLOW_WAIT_FOR_METADATA_TIMEOUT); } private final CombinedRateLimiter ccrRateLimiter; + private volatile TimeValue recoveryActivityTimeout; public CcrSettings(Settings settings, ClusterSettings clusterSettings) { + this.recoveryActivityTimeout = INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING.get(settings); this.ccrRateLimiter = new CombinedRateLimiter(RECOVERY_MAX_BYTES_PER_SECOND.get(settings)); clusterSettings.addSettingsUpdateConsumer(RECOVERY_MAX_BYTES_PER_SECOND, this::setMaxBytesPerSec); + clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING, this::setRecoveryActivityTimeout); } private void setMaxBytesPerSec(ByteSizeValue maxBytesPerSec) { ccrRateLimiter.setMBPerSec(maxBytesPerSec); } + private void setRecoveryActivityTimeout(TimeValue recoveryActivityTimeout) { + this.recoveryActivityTimeout = recoveryActivityTimeout; + } + public CombinedRateLimiter getRateLimiter() { return ccrRateLimiter; } + public TimeValue getRecoveryActivityTimeout() { + return recoveryActivityTimeout; + } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceService.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceService.java index 785600dd5f8..1c7f9f95adb 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceService.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceService.java @@ -17,6 +17,7 @@ import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRefCounted; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.KeyedLock; @@ -28,6 +29,9 @@ import org.elasticsearch.index.shard.IndexShardClosedException; import org.elasticsearch.index.shard.IndexShardState; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.store.Store; +import org.elasticsearch.threadpool.Scheduler; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.ccr.CcrSettings; import java.io.Closeable; import java.io.IOException; @@ -45,8 +49,14 @@ public class CcrRestoreSourceService extends AbstractLifecycleComponent implemen private final Map onGoingRestores = ConcurrentCollections.newConcurrentMap(); private final Map> sessionsForShard = new HashMap<>(); - private final CopyOnWriteArrayList> openSessionListeners = new CopyOnWriteArrayList<>(); private final CopyOnWriteArrayList> closeSessionListeners = new CopyOnWriteArrayList<>(); + private final ThreadPool threadPool; + private final CcrSettings ccrSettings; + + public CcrRestoreSourceService(ThreadPool threadPool, CcrSettings ccrSettings) { + this.threadPool = threadPool; + this.ccrSettings = ccrSettings; + } @Override public synchronized void afterIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard, Settings indexSettings) { @@ -81,26 +91,10 @@ public class CcrRestoreSourceService extends AbstractLifecycleComponent implemen // TODO: The listeners are for testing. Once end-to-end file restore is implemented and can be tested, // these should be removed. - public void addOpenSessionListener(Consumer listener) { - openSessionListeners.add(listener); - } - public void addCloseSessionListener(Consumer listener) { closeSessionListeners.add(listener); } - // default visibility for testing - synchronized HashSet getSessionsForShard(IndexShard indexShard) { - return sessionsForShard.get(indexShard); - } - - // default visibility for testing - synchronized RestoreSession getOngoingRestore(String sessionUUID) { - return onGoingRestores.get(sessionUUID); - } - - // TODO: Add a local timeout for the session. This timeout might might be for the entire session to be - // complete. Or it could be for session to have been touched. public synchronized Store.MetadataSnapshot openSession(String sessionUUID, IndexShard indexShard) throws IOException { boolean success = false; RestoreSession restore = null; @@ -113,9 +107,8 @@ public class CcrRestoreSourceService extends AbstractLifecycleComponent implemen if (indexShard.state() == IndexShardState.CLOSED) { throw new IndexShardClosedException(indexShard.shardId(), "cannot open ccr restore session if shard closed"); } - restore = new RestoreSession(sessionUUID, indexShard, indexShard.acquireSafeIndexCommit()); + restore = new RestoreSession(sessionUUID, indexShard, indexShard.acquireSafeIndexCommit(), scheduleTimeout(sessionUUID)); onGoingRestores.put(sessionUUID, restore); - openSessionListeners.forEach(c -> c.accept(sessionUUID)); HashSet sessions = sessionsForShard.computeIfAbsent(indexShard, (s) -> new HashSet<>()); sessions.add(sessionUUID); } @@ -133,25 +126,7 @@ public class CcrRestoreSourceService extends AbstractLifecycleComponent implemen } public void closeSession(String sessionUUID) { - final RestoreSession restore; - synchronized (this) { - closeSessionListeners.forEach(c -> c.accept(sessionUUID)); - restore = onGoingRestores.remove(sessionUUID); - if (restore == null) { - logger.debug("could not close session [{}] because session not found", sessionUUID); - throw new IllegalArgumentException("session [" + sessionUUID + "] not found"); - } - HashSet sessions = sessionsForShard.get(restore.indexShard); - assert sessions != null : "No session UUIDs for shard even though one [" + sessionUUID + "] is active in ongoing restores"; - if (sessions != null) { - boolean removed = sessions.remove(sessionUUID); - assert removed : "No session found for UUID [" + sessionUUID +"]"; - if (sessions.isEmpty()) { - sessionsForShard.remove(restore.indexShard); - } - } - } - restore.decRef(); + internalCloseSession(sessionUUID, true); } public synchronized SessionReader getSessionReader(String sessionUUID) { @@ -160,22 +135,70 @@ public class CcrRestoreSourceService extends AbstractLifecycleComponent implemen logger.debug("could not get session [{}] because session not found", sessionUUID); throw new IllegalArgumentException("session [" + sessionUUID + "] not found"); } + restore.idle = false; return new SessionReader(restore); } + private void internalCloseSession(String sessionUUID, boolean throwIfSessionMissing) { + final RestoreSession restore; + synchronized (this) { + restore = onGoingRestores.remove(sessionUUID); + if (restore == null) { + if (throwIfSessionMissing) { + logger.debug("could not close session [{}] because session not found", sessionUUID); + throw new IllegalArgumentException("session [" + sessionUUID + "] not found"); + } else { + return; + } + } + HashSet sessions = sessionsForShard.get(restore.indexShard); + assert sessions != null : "No session UUIDs for shard even though one [" + sessionUUID + "] is active in ongoing restores"; + if (sessions != null) { + boolean removed = sessions.remove(sessionUUID); + assert removed : "No session found for UUID [" + sessionUUID + "]"; + if (sessions.isEmpty()) { + sessionsForShard.remove(restore.indexShard); + } + } + } + closeSessionListeners.forEach(c -> c.accept(sessionUUID)); + restore.decRef(); + + } + + private Scheduler.Cancellable scheduleTimeout(String sessionUUID) { + TimeValue idleTimeout = ccrSettings.getRecoveryActivityTimeout(); + return threadPool.scheduleWithFixedDelay(() -> maybeTimeout(sessionUUID), idleTimeout, ThreadPool.Names.GENERIC); + } + + private void maybeTimeout(String sessionUUID) { + RestoreSession restoreSession = onGoingRestores.get(sessionUUID); + if (restoreSession != null) { + if (restoreSession.idle) { + internalCloseSession(sessionUUID, false); + } else { + restoreSession.idle = true; + } + } + } + private static class RestoreSession extends AbstractRefCounted { private final String sessionUUID; private final IndexShard indexShard; private final Engine.IndexCommitRef commitRef; + private final Scheduler.Cancellable timeoutTask; private final KeyedLock keyedLock = new KeyedLock<>(); private final Map cachedInputs = new ConcurrentHashMap<>(); + private volatile boolean idle = false; - private RestoreSession(String sessionUUID, IndexShard indexShard, Engine.IndexCommitRef commitRef) { + private RestoreSession(String sessionUUID, IndexShard indexShard, Engine.IndexCommitRef commitRef, + Scheduler.Cancellable timeoutTask) { super("restore-session"); this.sessionUUID = sessionUUID; this.indexShard = indexShard; this.commitRef = commitRef; + this.timeoutTask = timeoutTask; } private Store.MetadataSnapshot getMetaData() throws IOException { @@ -223,6 +246,7 @@ public class CcrRestoreSourceService extends AbstractLifecycleComponent implemen protected void closeInternal() { logger.debug("closing session [{}] for shard [{}]", sessionUUID, indexShard.shardId()); assert keyedLock.hasLockedKeys() == false : "Should not hold any file locks when closing"; + timeoutTask.cancel(); IOUtils.closeWhileHandlingException(cachedInputs.values()); } } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceServiceTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceServiceTests.java index c0b7863edf2..5f352788d95 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceServiceTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceServiceTests.java @@ -8,28 +8,41 @@ package org.elasticsearch.xpack.ccr.repository; import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; +import org.elasticsearch.cluster.coordination.DeterministicTaskQueue; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.shard.IllegalIndexShardStateException; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardTestCase; import org.elasticsearch.index.store.StoreFileMetaData; +import org.elasticsearch.xpack.ccr.CcrSettings; import org.junit.Before; import java.io.IOException; import java.util.ArrayList; -import java.util.HashSet; +import java.util.Set; + +import static org.elasticsearch.node.Node.NODE_NAME_SETTING; public class CcrRestoreSourceServiceTests extends IndexShardTestCase { private CcrRestoreSourceService restoreSourceService; + private DeterministicTaskQueue taskQueue; @Before public void setUp() throws Exception { super.setUp(); - restoreSourceService = new CcrRestoreSourceService(); + Settings settings = Settings.builder().put(NODE_NAME_SETTING.getKey(), "node").build(); + taskQueue = new DeterministicTaskQueue(settings, random()); + Set> registeredSettings = Sets.newHashSet(CcrSettings.INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING, + CcrSettings.RECOVERY_MAX_BYTES_PER_SECOND); + ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, registeredSettings); + restoreSourceService = new CcrRestoreSourceService(taskQueue.getThreadPool(), new CcrSettings(Settings.EMPTY, clusterSettings)); } public void testOpenSession() throws IOException { @@ -39,22 +52,21 @@ public class CcrRestoreSourceServiceTests extends IndexShardTestCase { final String sessionUUID2 = UUIDs.randomBase64UUID(); final String sessionUUID3 = UUIDs.randomBase64UUID(); - assertNull(restoreSourceService.getSessionsForShard(indexShard1)); + restoreSourceService.openSession(sessionUUID1, indexShard1); + restoreSourceService.openSession(sessionUUID2, indexShard1); - assertNotNull(restoreSourceService.openSession(sessionUUID1, indexShard1)); - HashSet sessionsForShard = restoreSourceService.getSessionsForShard(indexShard1); - assertEquals(1, sessionsForShard.size()); - assertTrue(sessionsForShard.contains(sessionUUID1)); - assertNotNull(restoreSourceService.openSession(sessionUUID2, indexShard1)); - sessionsForShard = restoreSourceService.getSessionsForShard(indexShard1); - assertEquals(2, sessionsForShard.size()); - assertTrue(sessionsForShard.contains(sessionUUID2)); + try (CcrRestoreSourceService.SessionReader reader1 = restoreSourceService.getSessionReader(sessionUUID1); + CcrRestoreSourceService.SessionReader reader2 = restoreSourceService.getSessionReader(sessionUUID2)) { + // Would throw exception if missing + } - assertNull(restoreSourceService.getSessionsForShard(indexShard2)); - assertNotNull(restoreSourceService.openSession(sessionUUID3, indexShard2)); - sessionsForShard = restoreSourceService.getSessionsForShard(indexShard2); - assertEquals(1, sessionsForShard.size()); - assertTrue(sessionsForShard.contains(sessionUUID3)); + restoreSourceService.openSession(sessionUUID3, indexShard2); + + try (CcrRestoreSourceService.SessionReader reader1 = restoreSourceService.getSessionReader(sessionUUID1); + CcrRestoreSourceService.SessionReader reader2 = restoreSourceService.getSessionReader(sessionUUID2); + CcrRestoreSourceService.SessionReader reader3 = restoreSourceService.getSessionReader(sessionUUID3)) { + // Would throw exception if missing + } restoreSourceService.closeSession(sessionUUID1); restoreSourceService.closeSession(sessionUUID2); @@ -68,7 +80,6 @@ public class CcrRestoreSourceServiceTests extends IndexShardTestCase { closeShards(indexShard); String sessionUUID = UUIDs.randomBase64UUID(); expectThrows(IllegalIndexShardStateException.class, () -> restoreSourceService.openSession(sessionUUID, indexShard)); - assertNull(restoreSourceService.getOngoingRestore(sessionUUID)); } public void testCloseSession() throws IOException { @@ -82,25 +93,26 @@ public class CcrRestoreSourceServiceTests extends IndexShardTestCase { restoreSourceService.openSession(sessionUUID2, indexShard1); restoreSourceService.openSession(sessionUUID3, indexShard2); - assertEquals(2, restoreSourceService.getSessionsForShard(indexShard1).size()); - assertEquals(1, restoreSourceService.getSessionsForShard(indexShard2).size()); - assertNotNull(restoreSourceService.getOngoingRestore(sessionUUID1)); - assertNotNull(restoreSourceService.getOngoingRestore(sessionUUID2)); - assertNotNull(restoreSourceService.getOngoingRestore(sessionUUID3)); + try (CcrRestoreSourceService.SessionReader reader1 = restoreSourceService.getSessionReader(sessionUUID1); + CcrRestoreSourceService.SessionReader reader2 = restoreSourceService.getSessionReader(sessionUUID2); + CcrRestoreSourceService.SessionReader reader3 = restoreSourceService.getSessionReader(sessionUUID3)) { + // Would throw exception if missing + } + + assertTrue(taskQueue.hasDeferredTasks()); restoreSourceService.closeSession(sessionUUID1); - assertEquals(1, restoreSourceService.getSessionsForShard(indexShard1).size()); - assertNull(restoreSourceService.getOngoingRestore(sessionUUID1)); - assertFalse(restoreSourceService.getSessionsForShard(indexShard1).contains(sessionUUID1)); - assertTrue(restoreSourceService.getSessionsForShard(indexShard1).contains(sessionUUID2)); + expectThrows(IllegalArgumentException.class, () -> restoreSourceService.getSessionReader(sessionUUID1)); restoreSourceService.closeSession(sessionUUID2); - assertNull(restoreSourceService.getSessionsForShard(indexShard1)); - assertNull(restoreSourceService.getOngoingRestore(sessionUUID2)); + expectThrows(IllegalArgumentException.class, () -> restoreSourceService.getSessionReader(sessionUUID2)); restoreSourceService.closeSession(sessionUUID3); - assertNull(restoreSourceService.getSessionsForShard(indexShard2)); - assertNull(restoreSourceService.getOngoingRestore(sessionUUID3)); + expectThrows(IllegalArgumentException.class, () -> restoreSourceService.getSessionReader(sessionUUID3)); + + taskQueue.runAllTasks(); + // The tasks will not be rescheduled as the sessions are closed. + assertFalse(taskQueue.hasDeferredTasks()); closeShards(indexShard1, indexShard2); } @@ -116,14 +128,20 @@ public class CcrRestoreSourceServiceTests extends IndexShardTestCase { restoreSourceService.openSession(sessionUUID2, indexShard1); restoreSourceService.openSession(sessionUUID3, indexShard2); - assertEquals(2, restoreSourceService.getSessionsForShard(indexShard1).size()); - assertEquals(1, restoreSourceService.getSessionsForShard(indexShard2).size()); + try (CcrRestoreSourceService.SessionReader reader1 = restoreSourceService.getSessionReader(sessionUUID1); + CcrRestoreSourceService.SessionReader reader2 = restoreSourceService.getSessionReader(sessionUUID2); + CcrRestoreSourceService.SessionReader reader3 = restoreSourceService.getSessionReader(sessionUUID3)) { + // Would throw exception if missing + } restoreSourceService.afterIndexShardClosed(indexShard1.shardId(), indexShard1, Settings.EMPTY); - assertNull(restoreSourceService.getSessionsForShard(indexShard1)); - assertNull(restoreSourceService.getOngoingRestore(sessionUUID1)); - assertNull(restoreSourceService.getOngoingRestore(sessionUUID2)); + expectThrows(IllegalArgumentException.class, () -> restoreSourceService.getSessionReader(sessionUUID1)); + expectThrows(IllegalArgumentException.class, () -> restoreSourceService.getSessionReader(sessionUUID2)); + + try (CcrRestoreSourceService.SessionReader reader3 = restoreSourceService.getSessionReader(sessionUUID3)) { + // Would throw exception if missing + } restoreSourceService.closeSession(sessionUUID3); closeShards(indexShard1, indexShard2); @@ -167,24 +185,59 @@ public class CcrRestoreSourceServiceTests extends IndexShardTestCase { indexDoc(indexShard, "_doc", Integer.toString(i)); flushShard(indexShard, true); } - final String sessionUUID1 = UUIDs.randomBase64UUID(); + final String sessionUUID = UUIDs.randomBase64UUID(); - restoreSourceService.openSession(sessionUUID1, indexShard); + restoreSourceService.openSession(sessionUUID, indexShard); ArrayList files = new ArrayList<>(); indexShard.snapshotStoreMetadata().forEach(files::add); - try (CcrRestoreSourceService.SessionReader sessionReader = restoreSourceService.getSessionReader(sessionUUID1)) { + try (CcrRestoreSourceService.SessionReader sessionReader = restoreSourceService.getSessionReader(sessionUUID)) { sessionReader.readFileBytes(files.get(0).name(), new BytesArray(new byte[10])); } // Request a second file to ensure that original file is not leaked - try (CcrRestoreSourceService.SessionReader sessionReader = restoreSourceService.getSessionReader(sessionUUID1)) { + try (CcrRestoreSourceService.SessionReader sessionReader = restoreSourceService.getSessionReader(sessionUUID)) { sessionReader.readFileBytes(files.get(1).name(), new BytesArray(new byte[10])); } - restoreSourceService.closeSession(sessionUUID1); + restoreSourceService.closeSession(sessionUUID); closeShards(indexShard); // Exception will be thrown if file is not closed. } + + public void testSessionCanTimeout() throws Exception { + IndexShard indexShard = newStartedShard(true); + + final String sessionUUID = UUIDs.randomBase64UUID(); + + restoreSourceService.openSession(sessionUUID, indexShard); + + // Session starts as not idle. First task will mark it as idle + assertTrue(taskQueue.hasDeferredTasks()); + taskQueue.advanceTime(); + taskQueue.runAllRunnableTasks(); + // Task is still scheduled + assertTrue(taskQueue.hasDeferredTasks()); + + // Accessing session marks it as not-idle + try (CcrRestoreSourceService.SessionReader reader = restoreSourceService.getSessionReader(sessionUUID)) { + // Check session exists + } + + assertTrue(taskQueue.hasDeferredTasks()); + taskQueue.advanceTime(); + taskQueue.runAllRunnableTasks(); + // Task is still scheduled + assertTrue(taskQueue.hasDeferredTasks()); + + taskQueue.advanceTime(); + taskQueue.runAllRunnableTasks(); + // Task is cancelled when the session times out + assertFalse(taskQueue.hasDeferredTasks()); + + expectThrows(IllegalArgumentException.class, () -> restoreSourceService.getSessionReader(sessionUUID)); + + closeShards(indexShard); + } }