Add local session timeouts to leader node (#37438)

This is related to #35975. This commit adds timeout functionality to
the local session on a leader node. When a session is started, a timeout
is scheduled using a repeatable runnable. If the session is not accessed
in between two runs the session is closed. When the sssion is closed,
the repeating task is cancelled.

Additionally, this commit moves session uuid generation to the leader
cluster. And renames the PutCcrRestoreSessionRequest to
StartCcrRestoreSessionRequest to reflect that change.
This commit is contained in:
Tim Brooks 2019-01-18 14:48:20 -07:00 committed by GitHub
parent adae233f77
commit cd41289396
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 177 additions and 81 deletions

View File

@ -161,10 +161,10 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E
return emptyList();
}
CcrRestoreSourceService restoreSourceService = new CcrRestoreSourceService();
this.restoreSourceService.set(restoreSourceService);
CcrSettings ccrSettings = new CcrSettings(settings, clusterService.getClusterSettings());
this.ccrSettings.set(ccrSettings);
CcrRestoreSourceService restoreSourceService = new CcrRestoreSourceService(threadPool, ccrSettings);
this.restoreSourceService.set(restoreSourceService);
return Arrays.asList(
ccrLicenseChecker,
restoreSourceService,

View File

@ -43,6 +43,14 @@ public final class CcrSettings {
Setting.byteSizeSetting("ccr.indices.recovery.max_bytes_per_sec", new ByteSizeValue(40, ByteSizeUnit.MB),
Setting.Property.Dynamic, Setting.Property.NodeScope);
/**
* The leader must open resources for a ccr recovery. If there is no activity for this interval of time,
* the leader will close the restore session.
*/
public static final Setting<TimeValue> INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING =
Setting.timeSetting("ccr.indices.recovery.recovery_activity_timeout", TimeValue.timeValueSeconds(60),
Setting.Property.Dynamic, Setting.Property.NodeScope);
/**
* The settings defined by CCR.
*
@ -53,22 +61,33 @@ public final class CcrSettings {
XPackSettings.CCR_ENABLED_SETTING,
CCR_FOLLOWING_INDEX_SETTING,
RECOVERY_MAX_BYTES_PER_SECOND,
INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING,
CCR_AUTO_FOLLOW_WAIT_FOR_METADATA_TIMEOUT);
}
private final CombinedRateLimiter ccrRateLimiter;
private volatile TimeValue recoveryActivityTimeout;
public CcrSettings(Settings settings, ClusterSettings clusterSettings) {
this.recoveryActivityTimeout = INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING.get(settings);
this.ccrRateLimiter = new CombinedRateLimiter(RECOVERY_MAX_BYTES_PER_SECOND.get(settings));
clusterSettings.addSettingsUpdateConsumer(RECOVERY_MAX_BYTES_PER_SECOND, this::setMaxBytesPerSec);
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING, this::setRecoveryActivityTimeout);
}
private void setMaxBytesPerSec(ByteSizeValue maxBytesPerSec) {
ccrRateLimiter.setMBPerSec(maxBytesPerSec);
}
private void setRecoveryActivityTimeout(TimeValue recoveryActivityTimeout) {
this.recoveryActivityTimeout = recoveryActivityTimeout;
}
public CombinedRateLimiter getRateLimiter() {
return ccrRateLimiter;
}
public TimeValue getRecoveryActivityTimeout() {
return recoveryActivityTimeout;
}
}

View File

@ -17,6 +17,7 @@ import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRefCounted;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.KeyedLock;
@ -28,6 +29,9 @@ import org.elasticsearch.index.shard.IndexShardClosedException;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ccr.CcrSettings;
import java.io.Closeable;
import java.io.IOException;
@ -45,8 +49,14 @@ public class CcrRestoreSourceService extends AbstractLifecycleComponent implemen
private final Map<String, RestoreSession> onGoingRestores = ConcurrentCollections.newConcurrentMap();
private final Map<IndexShard, HashSet<String>> sessionsForShard = new HashMap<>();
private final CopyOnWriteArrayList<Consumer<String>> openSessionListeners = new CopyOnWriteArrayList<>();
private final CopyOnWriteArrayList<Consumer<String>> closeSessionListeners = new CopyOnWriteArrayList<>();
private final ThreadPool threadPool;
private final CcrSettings ccrSettings;
public CcrRestoreSourceService(ThreadPool threadPool, CcrSettings ccrSettings) {
this.threadPool = threadPool;
this.ccrSettings = ccrSettings;
}
@Override
public synchronized void afterIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard, Settings indexSettings) {
@ -81,26 +91,10 @@ public class CcrRestoreSourceService extends AbstractLifecycleComponent implemen
// TODO: The listeners are for testing. Once end-to-end file restore is implemented and can be tested,
// these should be removed.
public void addOpenSessionListener(Consumer<String> listener) {
openSessionListeners.add(listener);
}
public void addCloseSessionListener(Consumer<String> listener) {
closeSessionListeners.add(listener);
}
// default visibility for testing
synchronized HashSet<String> getSessionsForShard(IndexShard indexShard) {
return sessionsForShard.get(indexShard);
}
// default visibility for testing
synchronized RestoreSession getOngoingRestore(String sessionUUID) {
return onGoingRestores.get(sessionUUID);
}
// TODO: Add a local timeout for the session. This timeout might might be for the entire session to be
// complete. Or it could be for session to have been touched.
public synchronized Store.MetadataSnapshot openSession(String sessionUUID, IndexShard indexShard) throws IOException {
boolean success = false;
RestoreSession restore = null;
@ -113,9 +107,8 @@ public class CcrRestoreSourceService extends AbstractLifecycleComponent implemen
if (indexShard.state() == IndexShardState.CLOSED) {
throw new IndexShardClosedException(indexShard.shardId(), "cannot open ccr restore session if shard closed");
}
restore = new RestoreSession(sessionUUID, indexShard, indexShard.acquireSafeIndexCommit());
restore = new RestoreSession(sessionUUID, indexShard, indexShard.acquireSafeIndexCommit(), scheduleTimeout(sessionUUID));
onGoingRestores.put(sessionUUID, restore);
openSessionListeners.forEach(c -> c.accept(sessionUUID));
HashSet<String> sessions = sessionsForShard.computeIfAbsent(indexShard, (s) -> new HashSet<>());
sessions.add(sessionUUID);
}
@ -133,25 +126,7 @@ public class CcrRestoreSourceService extends AbstractLifecycleComponent implemen
}
public void closeSession(String sessionUUID) {
final RestoreSession restore;
synchronized (this) {
closeSessionListeners.forEach(c -> c.accept(sessionUUID));
restore = onGoingRestores.remove(sessionUUID);
if (restore == null) {
logger.debug("could not close session [{}] because session not found", sessionUUID);
throw new IllegalArgumentException("session [" + sessionUUID + "] not found");
}
HashSet<String> sessions = sessionsForShard.get(restore.indexShard);
assert sessions != null : "No session UUIDs for shard even though one [" + sessionUUID + "] is active in ongoing restores";
if (sessions != null) {
boolean removed = sessions.remove(sessionUUID);
assert removed : "No session found for UUID [" + sessionUUID +"]";
if (sessions.isEmpty()) {
sessionsForShard.remove(restore.indexShard);
}
}
}
restore.decRef();
internalCloseSession(sessionUUID, true);
}
public synchronized SessionReader getSessionReader(String sessionUUID) {
@ -160,22 +135,70 @@ public class CcrRestoreSourceService extends AbstractLifecycleComponent implemen
logger.debug("could not get session [{}] because session not found", sessionUUID);
throw new IllegalArgumentException("session [" + sessionUUID + "] not found");
}
restore.idle = false;
return new SessionReader(restore);
}
private void internalCloseSession(String sessionUUID, boolean throwIfSessionMissing) {
final RestoreSession restore;
synchronized (this) {
restore = onGoingRestores.remove(sessionUUID);
if (restore == null) {
if (throwIfSessionMissing) {
logger.debug("could not close session [{}] because session not found", sessionUUID);
throw new IllegalArgumentException("session [" + sessionUUID + "] not found");
} else {
return;
}
}
HashSet<String> sessions = sessionsForShard.get(restore.indexShard);
assert sessions != null : "No session UUIDs for shard even though one [" + sessionUUID + "] is active in ongoing restores";
if (sessions != null) {
boolean removed = sessions.remove(sessionUUID);
assert removed : "No session found for UUID [" + sessionUUID + "]";
if (sessions.isEmpty()) {
sessionsForShard.remove(restore.indexShard);
}
}
}
closeSessionListeners.forEach(c -> c.accept(sessionUUID));
restore.decRef();
}
private Scheduler.Cancellable scheduleTimeout(String sessionUUID) {
TimeValue idleTimeout = ccrSettings.getRecoveryActivityTimeout();
return threadPool.scheduleWithFixedDelay(() -> maybeTimeout(sessionUUID), idleTimeout, ThreadPool.Names.GENERIC);
}
private void maybeTimeout(String sessionUUID) {
RestoreSession restoreSession = onGoingRestores.get(sessionUUID);
if (restoreSession != null) {
if (restoreSession.idle) {
internalCloseSession(sessionUUID, false);
} else {
restoreSession.idle = true;
}
}
}
private static class RestoreSession extends AbstractRefCounted {
private final String sessionUUID;
private final IndexShard indexShard;
private final Engine.IndexCommitRef commitRef;
private final Scheduler.Cancellable timeoutTask;
private final KeyedLock<String> keyedLock = new KeyedLock<>();
private final Map<String, IndexInput> cachedInputs = new ConcurrentHashMap<>();
private volatile boolean idle = false;
private RestoreSession(String sessionUUID, IndexShard indexShard, Engine.IndexCommitRef commitRef) {
private RestoreSession(String sessionUUID, IndexShard indexShard, Engine.IndexCommitRef commitRef,
Scheduler.Cancellable timeoutTask) {
super("restore-session");
this.sessionUUID = sessionUUID;
this.indexShard = indexShard;
this.commitRef = commitRef;
this.timeoutTask = timeoutTask;
}
private Store.MetadataSnapshot getMetaData() throws IOException {
@ -223,6 +246,7 @@ public class CcrRestoreSourceService extends AbstractLifecycleComponent implemen
protected void closeInternal() {
logger.debug("closing session [{}] for shard [{}]", sessionUUID, indexShard.shardId());
assert keyedLock.hasLockedKeys() == false : "Should not hold any file locks when closing";
timeoutTask.cancel();
IOUtils.closeWhileHandlingException(cachedInputs.values());
}
}

View File

@ -8,28 +8,41 @@ package org.elasticsearch.xpack.ccr.repository;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.elasticsearch.cluster.coordination.DeterministicTaskQueue;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.IllegalIndexShardStateException;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardTestCase;
import org.elasticsearch.index.store.StoreFileMetaData;
import org.elasticsearch.xpack.ccr.CcrSettings;
import org.junit.Before;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Set;
import static org.elasticsearch.node.Node.NODE_NAME_SETTING;
public class CcrRestoreSourceServiceTests extends IndexShardTestCase {
private CcrRestoreSourceService restoreSourceService;
private DeterministicTaskQueue taskQueue;
@Before
public void setUp() throws Exception {
super.setUp();
restoreSourceService = new CcrRestoreSourceService();
Settings settings = Settings.builder().put(NODE_NAME_SETTING.getKey(), "node").build();
taskQueue = new DeterministicTaskQueue(settings, random());
Set<Setting<?>> registeredSettings = Sets.newHashSet(CcrSettings.INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING,
CcrSettings.RECOVERY_MAX_BYTES_PER_SECOND);
ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, registeredSettings);
restoreSourceService = new CcrRestoreSourceService(taskQueue.getThreadPool(), new CcrSettings(Settings.EMPTY, clusterSettings));
}
public void testOpenSession() throws IOException {
@ -39,22 +52,21 @@ public class CcrRestoreSourceServiceTests extends IndexShardTestCase {
final String sessionUUID2 = UUIDs.randomBase64UUID();
final String sessionUUID3 = UUIDs.randomBase64UUID();
assertNull(restoreSourceService.getSessionsForShard(indexShard1));
restoreSourceService.openSession(sessionUUID1, indexShard1);
restoreSourceService.openSession(sessionUUID2, indexShard1);
assertNotNull(restoreSourceService.openSession(sessionUUID1, indexShard1));
HashSet<String> sessionsForShard = restoreSourceService.getSessionsForShard(indexShard1);
assertEquals(1, sessionsForShard.size());
assertTrue(sessionsForShard.contains(sessionUUID1));
assertNotNull(restoreSourceService.openSession(sessionUUID2, indexShard1));
sessionsForShard = restoreSourceService.getSessionsForShard(indexShard1);
assertEquals(2, sessionsForShard.size());
assertTrue(sessionsForShard.contains(sessionUUID2));
try (CcrRestoreSourceService.SessionReader reader1 = restoreSourceService.getSessionReader(sessionUUID1);
CcrRestoreSourceService.SessionReader reader2 = restoreSourceService.getSessionReader(sessionUUID2)) {
// Would throw exception if missing
}
assertNull(restoreSourceService.getSessionsForShard(indexShard2));
assertNotNull(restoreSourceService.openSession(sessionUUID3, indexShard2));
sessionsForShard = restoreSourceService.getSessionsForShard(indexShard2);
assertEquals(1, sessionsForShard.size());
assertTrue(sessionsForShard.contains(sessionUUID3));
restoreSourceService.openSession(sessionUUID3, indexShard2);
try (CcrRestoreSourceService.SessionReader reader1 = restoreSourceService.getSessionReader(sessionUUID1);
CcrRestoreSourceService.SessionReader reader2 = restoreSourceService.getSessionReader(sessionUUID2);
CcrRestoreSourceService.SessionReader reader3 = restoreSourceService.getSessionReader(sessionUUID3)) {
// Would throw exception if missing
}
restoreSourceService.closeSession(sessionUUID1);
restoreSourceService.closeSession(sessionUUID2);
@ -68,7 +80,6 @@ public class CcrRestoreSourceServiceTests extends IndexShardTestCase {
closeShards(indexShard);
String sessionUUID = UUIDs.randomBase64UUID();
expectThrows(IllegalIndexShardStateException.class, () -> restoreSourceService.openSession(sessionUUID, indexShard));
assertNull(restoreSourceService.getOngoingRestore(sessionUUID));
}
public void testCloseSession() throws IOException {
@ -82,25 +93,26 @@ public class CcrRestoreSourceServiceTests extends IndexShardTestCase {
restoreSourceService.openSession(sessionUUID2, indexShard1);
restoreSourceService.openSession(sessionUUID3, indexShard2);
assertEquals(2, restoreSourceService.getSessionsForShard(indexShard1).size());
assertEquals(1, restoreSourceService.getSessionsForShard(indexShard2).size());
assertNotNull(restoreSourceService.getOngoingRestore(sessionUUID1));
assertNotNull(restoreSourceService.getOngoingRestore(sessionUUID2));
assertNotNull(restoreSourceService.getOngoingRestore(sessionUUID3));
try (CcrRestoreSourceService.SessionReader reader1 = restoreSourceService.getSessionReader(sessionUUID1);
CcrRestoreSourceService.SessionReader reader2 = restoreSourceService.getSessionReader(sessionUUID2);
CcrRestoreSourceService.SessionReader reader3 = restoreSourceService.getSessionReader(sessionUUID3)) {
// Would throw exception if missing
}
assertTrue(taskQueue.hasDeferredTasks());
restoreSourceService.closeSession(sessionUUID1);
assertEquals(1, restoreSourceService.getSessionsForShard(indexShard1).size());
assertNull(restoreSourceService.getOngoingRestore(sessionUUID1));
assertFalse(restoreSourceService.getSessionsForShard(indexShard1).contains(sessionUUID1));
assertTrue(restoreSourceService.getSessionsForShard(indexShard1).contains(sessionUUID2));
expectThrows(IllegalArgumentException.class, () -> restoreSourceService.getSessionReader(sessionUUID1));
restoreSourceService.closeSession(sessionUUID2);
assertNull(restoreSourceService.getSessionsForShard(indexShard1));
assertNull(restoreSourceService.getOngoingRestore(sessionUUID2));
expectThrows(IllegalArgumentException.class, () -> restoreSourceService.getSessionReader(sessionUUID2));
restoreSourceService.closeSession(sessionUUID3);
assertNull(restoreSourceService.getSessionsForShard(indexShard2));
assertNull(restoreSourceService.getOngoingRestore(sessionUUID3));
expectThrows(IllegalArgumentException.class, () -> restoreSourceService.getSessionReader(sessionUUID3));
taskQueue.runAllTasks();
// The tasks will not be rescheduled as the sessions are closed.
assertFalse(taskQueue.hasDeferredTasks());
closeShards(indexShard1, indexShard2);
}
@ -116,14 +128,20 @@ public class CcrRestoreSourceServiceTests extends IndexShardTestCase {
restoreSourceService.openSession(sessionUUID2, indexShard1);
restoreSourceService.openSession(sessionUUID3, indexShard2);
assertEquals(2, restoreSourceService.getSessionsForShard(indexShard1).size());
assertEquals(1, restoreSourceService.getSessionsForShard(indexShard2).size());
try (CcrRestoreSourceService.SessionReader reader1 = restoreSourceService.getSessionReader(sessionUUID1);
CcrRestoreSourceService.SessionReader reader2 = restoreSourceService.getSessionReader(sessionUUID2);
CcrRestoreSourceService.SessionReader reader3 = restoreSourceService.getSessionReader(sessionUUID3)) {
// Would throw exception if missing
}
restoreSourceService.afterIndexShardClosed(indexShard1.shardId(), indexShard1, Settings.EMPTY);
assertNull(restoreSourceService.getSessionsForShard(indexShard1));
assertNull(restoreSourceService.getOngoingRestore(sessionUUID1));
assertNull(restoreSourceService.getOngoingRestore(sessionUUID2));
expectThrows(IllegalArgumentException.class, () -> restoreSourceService.getSessionReader(sessionUUID1));
expectThrows(IllegalArgumentException.class, () -> restoreSourceService.getSessionReader(sessionUUID2));
try (CcrRestoreSourceService.SessionReader reader3 = restoreSourceService.getSessionReader(sessionUUID3)) {
// Would throw exception if missing
}
restoreSourceService.closeSession(sessionUUID3);
closeShards(indexShard1, indexShard2);
@ -167,24 +185,59 @@ public class CcrRestoreSourceServiceTests extends IndexShardTestCase {
indexDoc(indexShard, "_doc", Integer.toString(i));
flushShard(indexShard, true);
}
final String sessionUUID1 = UUIDs.randomBase64UUID();
final String sessionUUID = UUIDs.randomBase64UUID();
restoreSourceService.openSession(sessionUUID1, indexShard);
restoreSourceService.openSession(sessionUUID, indexShard);
ArrayList<StoreFileMetaData> files = new ArrayList<>();
indexShard.snapshotStoreMetadata().forEach(files::add);
try (CcrRestoreSourceService.SessionReader sessionReader = restoreSourceService.getSessionReader(sessionUUID1)) {
try (CcrRestoreSourceService.SessionReader sessionReader = restoreSourceService.getSessionReader(sessionUUID)) {
sessionReader.readFileBytes(files.get(0).name(), new BytesArray(new byte[10]));
}
// Request a second file to ensure that original file is not leaked
try (CcrRestoreSourceService.SessionReader sessionReader = restoreSourceService.getSessionReader(sessionUUID1)) {
try (CcrRestoreSourceService.SessionReader sessionReader = restoreSourceService.getSessionReader(sessionUUID)) {
sessionReader.readFileBytes(files.get(1).name(), new BytesArray(new byte[10]));
}
restoreSourceService.closeSession(sessionUUID1);
restoreSourceService.closeSession(sessionUUID);
closeShards(indexShard);
// Exception will be thrown if file is not closed.
}
public void testSessionCanTimeout() throws Exception {
IndexShard indexShard = newStartedShard(true);
final String sessionUUID = UUIDs.randomBase64UUID();
restoreSourceService.openSession(sessionUUID, indexShard);
// Session starts as not idle. First task will mark it as idle
assertTrue(taskQueue.hasDeferredTasks());
taskQueue.advanceTime();
taskQueue.runAllRunnableTasks();
// Task is still scheduled
assertTrue(taskQueue.hasDeferredTasks());
// Accessing session marks it as not-idle
try (CcrRestoreSourceService.SessionReader reader = restoreSourceService.getSessionReader(sessionUUID)) {
// Check session exists
}
assertTrue(taskQueue.hasDeferredTasks());
taskQueue.advanceTime();
taskQueue.runAllRunnableTasks();
// Task is still scheduled
assertTrue(taskQueue.hasDeferredTasks());
taskQueue.advanceTime();
taskQueue.runAllRunnableTasks();
// Task is cancelled when the session times out
assertFalse(taskQueue.hasDeferredTasks());
expectThrows(IllegalArgumentException.class, () -> restoreSourceService.getSessionReader(sessionUUID));
closeShards(indexShard);
}
}