Add timeout for ccr recovery action (#37840)
This is related to #35975. It adds a action timeout setting that allows timeouts to be applied to the individual transport actions that are used during a ccr recovery.
This commit is contained in:
parent
e9332331a3
commit
f3f9cabd67
|
@ -406,6 +406,16 @@ public final class MockTransportService extends TransportService {
|
||||||
return transport().addSendBehavior(transportAddress, sendBehavior);
|
return transport().addSendBehavior(transportAddress, sendBehavior);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Adds a send behavior that is the default send behavior.
|
||||||
|
*
|
||||||
|
* @return {@code true} if no default send behavior was registered
|
||||||
|
*/
|
||||||
|
public boolean addSendBehavior(StubbableTransport.SendRequestBehavior behavior) {
|
||||||
|
return transport().setDefaultSendBehavior(behavior);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Adds a new connect behavior that is used for creating connections with the given delegate service.
|
* Adds a new connect behavior that is used for creating connections with the given delegate service.
|
||||||
*
|
*
|
||||||
|
|
|
@ -69,7 +69,9 @@ public class StubbableConnectionManager extends ConnectionManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void clearBehaviors() {
|
public void clearBehaviors() {
|
||||||
|
defaultGetConnectionBehavior = ConnectionManager::getConnection;
|
||||||
getConnectionBehaviors.clear();
|
getConnectionBehaviors.clear();
|
||||||
|
defaultNodeConnectedBehavior = ConnectionManager::nodeConnected;
|
||||||
nodeConnectedBehaviors.clear();
|
nodeConnectedBehaviors.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -55,6 +55,12 @@ public final class StubbableTransport implements Transport {
|
||||||
this.delegate = transport;
|
this.delegate = transport;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
boolean setDefaultSendBehavior(SendRequestBehavior sendBehavior) {
|
||||||
|
SendRequestBehavior prior = defaultSendRequest;
|
||||||
|
defaultSendRequest = sendBehavior;
|
||||||
|
return prior == null;
|
||||||
|
}
|
||||||
|
|
||||||
public boolean setDefaultConnectBehavior(OpenConnectionBehavior openConnectionBehavior) {
|
public boolean setDefaultConnectBehavior(OpenConnectionBehavior openConnectionBehavior) {
|
||||||
OpenConnectionBehavior prior = this.defaultConnectBehavior;
|
OpenConnectionBehavior prior = this.defaultConnectBehavior;
|
||||||
this.defaultConnectBehavior = openConnectionBehavior;
|
this.defaultConnectBehavior = openConnectionBehavior;
|
||||||
|
@ -70,7 +76,9 @@ public final class StubbableTransport implements Transport {
|
||||||
}
|
}
|
||||||
|
|
||||||
void clearBehaviors() {
|
void clearBehaviors() {
|
||||||
|
this.defaultSendRequest = null;
|
||||||
sendBehaviors.clear();
|
sendBehaviors.clear();
|
||||||
|
this.defaultConnectBehavior = null;
|
||||||
connectBehaviors.clear();
|
connectBehaviors.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -57,6 +57,13 @@ public final class CcrSettings {
|
||||||
Setting.timeSetting("ccr.indices.recovery.recovery_activity_timeout", TimeValue.timeValueSeconds(60),
|
Setting.timeSetting("ccr.indices.recovery.recovery_activity_timeout", TimeValue.timeValueSeconds(60),
|
||||||
Setting.Property.Dynamic, Setting.Property.NodeScope);
|
Setting.Property.Dynamic, Setting.Property.NodeScope);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The timeout value to use for requests made as part of ccr recovery process.
|
||||||
|
* */
|
||||||
|
public static final Setting<TimeValue> INDICES_RECOVERY_ACTION_TIMEOUT_SETTING =
|
||||||
|
Setting.positiveTimeSetting("ccr.indices.recovery.internal_action_timeout", TimeValue.timeValueSeconds(60),
|
||||||
|
Property.Dynamic, Property.NodeScope);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The settings defined by CCR.
|
* The settings defined by CCR.
|
||||||
*
|
*
|
||||||
|
@ -67,6 +74,7 @@ public final class CcrSettings {
|
||||||
XPackSettings.CCR_ENABLED_SETTING,
|
XPackSettings.CCR_ENABLED_SETTING,
|
||||||
CCR_FOLLOWING_INDEX_SETTING,
|
CCR_FOLLOWING_INDEX_SETTING,
|
||||||
RECOVERY_MAX_BYTES_PER_SECOND,
|
RECOVERY_MAX_BYTES_PER_SECOND,
|
||||||
|
INDICES_RECOVERY_ACTION_TIMEOUT_SETTING,
|
||||||
INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING,
|
INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING,
|
||||||
CCR_AUTO_FOLLOW_WAIT_FOR_METADATA_TIMEOUT,
|
CCR_AUTO_FOLLOW_WAIT_FOR_METADATA_TIMEOUT,
|
||||||
CCR_WAIT_FOR_METADATA_TIMEOUT);
|
CCR_WAIT_FOR_METADATA_TIMEOUT);
|
||||||
|
@ -74,12 +82,15 @@ public final class CcrSettings {
|
||||||
|
|
||||||
private final CombinedRateLimiter ccrRateLimiter;
|
private final CombinedRateLimiter ccrRateLimiter;
|
||||||
private volatile TimeValue recoveryActivityTimeout;
|
private volatile TimeValue recoveryActivityTimeout;
|
||||||
|
private volatile TimeValue recoveryActionTimeout;
|
||||||
|
|
||||||
public CcrSettings(Settings settings, ClusterSettings clusterSettings) {
|
public CcrSettings(Settings settings, ClusterSettings clusterSettings) {
|
||||||
this.recoveryActivityTimeout = INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING.get(settings);
|
this.recoveryActivityTimeout = INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING.get(settings);
|
||||||
|
this.recoveryActionTimeout = INDICES_RECOVERY_ACTION_TIMEOUT_SETTING.get(settings);
|
||||||
this.ccrRateLimiter = new CombinedRateLimiter(RECOVERY_MAX_BYTES_PER_SECOND.get(settings));
|
this.ccrRateLimiter = new CombinedRateLimiter(RECOVERY_MAX_BYTES_PER_SECOND.get(settings));
|
||||||
clusterSettings.addSettingsUpdateConsumer(RECOVERY_MAX_BYTES_PER_SECOND, this::setMaxBytesPerSec);
|
clusterSettings.addSettingsUpdateConsumer(RECOVERY_MAX_BYTES_PER_SECOND, this::setMaxBytesPerSec);
|
||||||
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING, this::setRecoveryActivityTimeout);
|
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING, this::setRecoveryActivityTimeout);
|
||||||
|
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_ACTION_TIMEOUT_SETTING, this::setRecoveryActionTimeout);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void setMaxBytesPerSec(ByteSizeValue maxBytesPerSec) {
|
private void setMaxBytesPerSec(ByteSizeValue maxBytesPerSec) {
|
||||||
|
@ -90,6 +101,10 @@ public final class CcrSettings {
|
||||||
this.recoveryActivityTimeout = recoveryActivityTimeout;
|
this.recoveryActivityTimeout = recoveryActivityTimeout;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void setRecoveryActionTimeout(TimeValue recoveryActionTimeout) {
|
||||||
|
this.recoveryActionTimeout = recoveryActionTimeout;
|
||||||
|
}
|
||||||
|
|
||||||
public CombinedRateLimiter getRateLimiter() {
|
public CombinedRateLimiter getRateLimiter() {
|
||||||
return ccrRateLimiter;
|
return ccrRateLimiter;
|
||||||
}
|
}
|
||||||
|
@ -97,4 +112,8 @@ public final class CcrSettings {
|
||||||
public TimeValue getRecoveryActivityTimeout() {
|
public TimeValue getRecoveryActivityTimeout() {
|
||||||
return recoveryActivityTimeout;
|
return recoveryActivityTimeout;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public TimeValue getRecoveryActionTimeout() {
|
||||||
|
return recoveryActionTimeout;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -127,7 +127,8 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
|
||||||
public SnapshotInfo getSnapshotInfo(SnapshotId snapshotId) {
|
public SnapshotInfo getSnapshotInfo(SnapshotId snapshotId) {
|
||||||
assert SNAPSHOT_ID.equals(snapshotId) : "RemoteClusterRepository only supports " + SNAPSHOT_ID + " as the SnapshotId";
|
assert SNAPSHOT_ID.equals(snapshotId) : "RemoteClusterRepository only supports " + SNAPSHOT_ID + " as the SnapshotId";
|
||||||
Client remoteClient = client.getRemoteClusterClient(remoteClusterAlias);
|
Client remoteClient = client.getRemoteClusterClient(remoteClusterAlias);
|
||||||
ClusterStateResponse response = remoteClient.admin().cluster().prepareState().clear().setMetaData(true).setNodes(true).get();
|
ClusterStateResponse response = remoteClient.admin().cluster().prepareState().clear().setMetaData(true).setNodes(true)
|
||||||
|
.get(ccrSettings.getRecoveryActionTimeout());
|
||||||
ImmutableOpenMap<String, IndexMetaData> indicesMap = response.getState().metaData().indices();
|
ImmutableOpenMap<String, IndexMetaData> indicesMap = response.getState().metaData().indices();
|
||||||
ArrayList<String> indices = new ArrayList<>(indicesMap.size());
|
ArrayList<String> indices = new ArrayList<>(indicesMap.size());
|
||||||
indicesMap.keysIt().forEachRemaining(indices::add);
|
indicesMap.keysIt().forEachRemaining(indices::add);
|
||||||
|
@ -141,7 +142,8 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
|
||||||
Client remoteClient = client.getRemoteClusterClient(remoteClusterAlias);
|
Client remoteClient = client.getRemoteClusterClient(remoteClusterAlias);
|
||||||
// We set a single dummy index name to avoid fetching all the index data
|
// We set a single dummy index name to avoid fetching all the index data
|
||||||
ClusterStateRequest clusterStateRequest = CcrRequests.metaDataRequest("dummy_index_name");
|
ClusterStateRequest clusterStateRequest = CcrRequests.metaDataRequest("dummy_index_name");
|
||||||
ClusterStateResponse clusterState = remoteClient.admin().cluster().state(clusterStateRequest).actionGet();
|
ClusterStateResponse clusterState = remoteClient.admin().cluster().state(clusterStateRequest)
|
||||||
|
.actionGet(ccrSettings.getRecoveryActionTimeout());
|
||||||
return clusterState.getState().metaData();
|
return clusterState.getState().metaData();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -152,13 +154,14 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
|
||||||
Client remoteClient = client.getRemoteClusterClient(remoteClusterAlias);
|
Client remoteClient = client.getRemoteClusterClient(remoteClusterAlias);
|
||||||
|
|
||||||
ClusterStateRequest clusterStateRequest = CcrRequests.metaDataRequest(leaderIndex);
|
ClusterStateRequest clusterStateRequest = CcrRequests.metaDataRequest(leaderIndex);
|
||||||
ClusterStateResponse clusterState = remoteClient.admin().cluster().state(clusterStateRequest).actionGet();
|
ClusterStateResponse clusterState = remoteClient.admin().cluster().state(clusterStateRequest)
|
||||||
|
.actionGet(ccrSettings.getRecoveryActionTimeout());
|
||||||
|
|
||||||
// Validates whether the leader cluster has been configured properly:
|
// Validates whether the leader cluster has been configured properly:
|
||||||
PlainActionFuture<String[]> future = PlainActionFuture.newFuture();
|
PlainActionFuture<String[]> future = PlainActionFuture.newFuture();
|
||||||
IndexMetaData leaderIndexMetaData = clusterState.getState().metaData().index(leaderIndex);
|
IndexMetaData leaderIndexMetaData = clusterState.getState().metaData().index(leaderIndex);
|
||||||
ccrLicenseChecker.fetchLeaderHistoryUUIDs(remoteClient, leaderIndexMetaData, future::onFailure, future::onResponse);
|
ccrLicenseChecker.fetchLeaderHistoryUUIDs(remoteClient, leaderIndexMetaData, future::onFailure, future::onResponse);
|
||||||
String[] leaderHistoryUUIDs = future.actionGet();
|
String[] leaderHistoryUUIDs = future.actionGet(ccrSettings.getRecoveryActionTimeout());
|
||||||
|
|
||||||
IndexMetaData.Builder imdBuilder = IndexMetaData.builder(leaderIndex);
|
IndexMetaData.Builder imdBuilder = IndexMetaData.builder(leaderIndex);
|
||||||
// Adding the leader index uuid for each shard as custom metadata:
|
// Adding the leader index uuid for each shard as custom metadata:
|
||||||
|
@ -188,7 +191,8 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
|
||||||
@Override
|
@Override
|
||||||
public RepositoryData getRepositoryData() {
|
public RepositoryData getRepositoryData() {
|
||||||
Client remoteClient = client.getRemoteClusterClient(remoteClusterAlias);
|
Client remoteClient = client.getRemoteClusterClient(remoteClusterAlias);
|
||||||
ClusterStateResponse response = remoteClient.admin().cluster().prepareState().clear().setMetaData(true).get();
|
ClusterStateResponse response = remoteClient.admin().cluster().prepareState().clear().setMetaData(true)
|
||||||
|
.get(ccrSettings.getRecoveryActionTimeout());
|
||||||
MetaData remoteMetaData = response.getState().getMetaData();
|
MetaData remoteMetaData = response.getState().getMetaData();
|
||||||
|
|
||||||
Map<String, SnapshotId> copiedSnapshotIds = new HashMap<>();
|
Map<String, SnapshotId> copiedSnapshotIds = new HashMap<>();
|
||||||
|
@ -298,7 +302,8 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
|
||||||
|
|
||||||
private void maybeUpdateMappings(Client localClient, Client remoteClient, Index leaderIndex, IndexSettings followerIndexSettings) {
|
private void maybeUpdateMappings(Client localClient, Client remoteClient, Index leaderIndex, IndexSettings followerIndexSettings) {
|
||||||
ClusterStateRequest clusterStateRequest = CcrRequests.metaDataRequest(leaderIndex.getName());
|
ClusterStateRequest clusterStateRequest = CcrRequests.metaDataRequest(leaderIndex.getName());
|
||||||
ClusterStateResponse clusterState = remoteClient.admin().cluster().state(clusterStateRequest).actionGet();
|
ClusterStateResponse clusterState = remoteClient.admin().cluster().state(clusterStateRequest)
|
||||||
|
.actionGet(ccrSettings.getRecoveryActionTimeout());
|
||||||
IndexMetaData leaderIndexMetadata = clusterState.getState().metaData().getIndexSafe(leaderIndex);
|
IndexMetaData leaderIndexMetadata = clusterState.getState().metaData().getIndexSafe(leaderIndex);
|
||||||
long leaderMappingVersion = leaderIndexMetadata.getMappingVersion();
|
long leaderMappingVersion = leaderIndexMetadata.getMappingVersion();
|
||||||
|
|
||||||
|
@ -306,7 +311,7 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
|
||||||
Index followerIndex = followerIndexSettings.getIndex();
|
Index followerIndex = followerIndexSettings.getIndex();
|
||||||
MappingMetaData mappingMetaData = leaderIndexMetadata.mapping();
|
MappingMetaData mappingMetaData = leaderIndexMetadata.mapping();
|
||||||
PutMappingRequest putMappingRequest = CcrRequests.putMappingRequest(followerIndex.getName(), mappingMetaData);
|
PutMappingRequest putMappingRequest = CcrRequests.putMappingRequest(followerIndex.getName(), mappingMetaData);
|
||||||
localClient.admin().indices().putMapping(putMappingRequest).actionGet();
|
localClient.admin().indices().putMapping(putMappingRequest).actionGet(ccrSettings.getRecoveryActionTimeout());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -314,9 +319,9 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
|
||||||
RecoveryState recoveryState) {
|
RecoveryState recoveryState) {
|
||||||
String sessionUUID = UUIDs.randomBase64UUID();
|
String sessionUUID = UUIDs.randomBase64UUID();
|
||||||
PutCcrRestoreSessionAction.PutCcrRestoreSessionResponse response = remoteClient.execute(PutCcrRestoreSessionAction.INSTANCE,
|
PutCcrRestoreSessionAction.PutCcrRestoreSessionResponse response = remoteClient.execute(PutCcrRestoreSessionAction.INSTANCE,
|
||||||
new PutCcrRestoreSessionRequest(sessionUUID, leaderShardId)).actionGet();
|
new PutCcrRestoreSessionRequest(sessionUUID, leaderShardId)).actionGet(ccrSettings.getRecoveryActionTimeout());
|
||||||
return new RestoreSession(repositoryName, remoteClient, sessionUUID, response.getNode(), indexShard, recoveryState,
|
return new RestoreSession(repositoryName, remoteClient, sessionUUID, response.getNode(), indexShard, recoveryState,
|
||||||
response.getStoreFileMetaData(), ccrSettings.getRateLimiter(), throttledTime::inc);
|
response.getStoreFileMetaData(), ccrSettings, throttledTime::inc);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class RestoreSession extends FileRestoreContext implements Closeable {
|
private static class RestoreSession extends FileRestoreContext implements Closeable {
|
||||||
|
@ -327,18 +332,18 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
|
||||||
private final String sessionUUID;
|
private final String sessionUUID;
|
||||||
private final DiscoveryNode node;
|
private final DiscoveryNode node;
|
||||||
private final Store.MetadataSnapshot sourceMetaData;
|
private final Store.MetadataSnapshot sourceMetaData;
|
||||||
private final CombinedRateLimiter rateLimiter;
|
private final CcrSettings ccrSettings;
|
||||||
private final LongConsumer throttleListener;
|
private final LongConsumer throttleListener;
|
||||||
|
|
||||||
RestoreSession(String repositoryName, Client remoteClient, String sessionUUID, DiscoveryNode node, IndexShard indexShard,
|
RestoreSession(String repositoryName, Client remoteClient, String sessionUUID, DiscoveryNode node, IndexShard indexShard,
|
||||||
RecoveryState recoveryState, Store.MetadataSnapshot sourceMetaData, CombinedRateLimiter rateLimiter,
|
RecoveryState recoveryState, Store.MetadataSnapshot sourceMetaData, CcrSettings ccrSettings,
|
||||||
LongConsumer throttleListener) {
|
LongConsumer throttleListener) {
|
||||||
super(repositoryName, indexShard, SNAPSHOT_ID, recoveryState, BUFFER_SIZE);
|
super(repositoryName, indexShard, SNAPSHOT_ID, recoveryState, BUFFER_SIZE);
|
||||||
this.remoteClient = remoteClient;
|
this.remoteClient = remoteClient;
|
||||||
this.sessionUUID = sessionUUID;
|
this.sessionUUID = sessionUUID;
|
||||||
this.node = node;
|
this.node = node;
|
||||||
this.sourceMetaData = sourceMetaData;
|
this.sourceMetaData = sourceMetaData;
|
||||||
this.rateLimiter = rateLimiter;
|
this.ccrSettings = ccrSettings;
|
||||||
this.throttleListener = throttleListener;
|
this.throttleListener = throttleListener;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -354,14 +359,14 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected InputStream fileInputStream(BlobStoreIndexShardSnapshot.FileInfo fileInfo) {
|
protected InputStream fileInputStream(BlobStoreIndexShardSnapshot.FileInfo fileInfo) {
|
||||||
return new RestoreFileInputStream(remoteClient, sessionUUID, node, fileInfo.metadata(), rateLimiter, throttleListener);
|
return new RestoreFileInputStream(remoteClient, sessionUUID, node, fileInfo.metadata(), ccrSettings, throttleListener);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close() {
|
public void close() {
|
||||||
ClearCcrRestoreSessionRequest clearRequest = new ClearCcrRestoreSessionRequest(sessionUUID, node);
|
ClearCcrRestoreSessionRequest clearRequest = new ClearCcrRestoreSessionRequest(sessionUUID, node);
|
||||||
ClearCcrRestoreSessionAction.ClearCcrRestoreSessionResponse response =
|
ClearCcrRestoreSessionAction.ClearCcrRestoreSessionResponse response =
|
||||||
remoteClient.execute(ClearCcrRestoreSessionAction.INSTANCE, clearRequest).actionGet();
|
remoteClient.execute(ClearCcrRestoreSessionAction.INSTANCE, clearRequest).actionGet(ccrSettings.getRecoveryActionTimeout());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -372,17 +377,19 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
|
||||||
private final DiscoveryNode node;
|
private final DiscoveryNode node;
|
||||||
private final StoreFileMetaData fileToRecover;
|
private final StoreFileMetaData fileToRecover;
|
||||||
private final CombinedRateLimiter rateLimiter;
|
private final CombinedRateLimiter rateLimiter;
|
||||||
|
private final CcrSettings ccrSettings;
|
||||||
private final LongConsumer throttleListener;
|
private final LongConsumer throttleListener;
|
||||||
|
|
||||||
private long pos = 0;
|
private long pos = 0;
|
||||||
|
|
||||||
private RestoreFileInputStream(Client remoteClient, String sessionUUID, DiscoveryNode node, StoreFileMetaData fileToRecover,
|
private RestoreFileInputStream(Client remoteClient, String sessionUUID, DiscoveryNode node, StoreFileMetaData fileToRecover,
|
||||||
CombinedRateLimiter rateLimiter, LongConsumer throttleListener) {
|
CcrSettings ccrSettings, LongConsumer throttleListener) {
|
||||||
this.remoteClient = remoteClient;
|
this.remoteClient = remoteClient;
|
||||||
this.sessionUUID = sessionUUID;
|
this.sessionUUID = sessionUUID;
|
||||||
this.node = node;
|
this.node = node;
|
||||||
this.fileToRecover = fileToRecover;
|
this.fileToRecover = fileToRecover;
|
||||||
this.rateLimiter = rateLimiter;
|
this.ccrSettings = ccrSettings;
|
||||||
|
this.rateLimiter = ccrSettings.getRateLimiter();
|
||||||
this.throttleListener = throttleListener;
|
this.throttleListener = throttleListener;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -407,7 +414,7 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
|
||||||
String fileName = fileToRecover.name();
|
String fileName = fileToRecover.name();
|
||||||
GetCcrRestoreFileChunkRequest request = new GetCcrRestoreFileChunkRequest(node, sessionUUID, fileName, bytesRequested);
|
GetCcrRestoreFileChunkRequest request = new GetCcrRestoreFileChunkRequest(node, sessionUUID, fileName, bytesRequested);
|
||||||
GetCcrRestoreFileChunkAction.GetCcrRestoreFileChunkResponse response =
|
GetCcrRestoreFileChunkAction.GetCcrRestoreFileChunkResponse response =
|
||||||
remoteClient.execute(GetCcrRestoreFileChunkAction.INSTANCE, request).actionGet();
|
remoteClient.execute(GetCcrRestoreFileChunkAction.INSTANCE, request).actionGet(ccrSettings.getRecoveryActionTimeout());
|
||||||
BytesReference fileChunk = response.getChunk();
|
BytesReference fileChunk = response.getChunk();
|
||||||
|
|
||||||
int bytesReceived = fileChunk.length();
|
int bytesReceived = fileChunk.length();
|
||||||
|
|
|
@ -65,7 +65,9 @@ import org.elasticsearch.test.MockHttpTransport;
|
||||||
import org.elasticsearch.test.NodeConfigurationSource;
|
import org.elasticsearch.test.NodeConfigurationSource;
|
||||||
import org.elasticsearch.test.TestCluster;
|
import org.elasticsearch.test.TestCluster;
|
||||||
import org.elasticsearch.test.discovery.TestZenDiscovery;
|
import org.elasticsearch.test.discovery.TestZenDiscovery;
|
||||||
|
import org.elasticsearch.test.transport.MockTransportService;
|
||||||
import org.elasticsearch.transport.TransportService;
|
import org.elasticsearch.transport.TransportService;
|
||||||
|
import org.elasticsearch.transport.nio.MockNioTransportPlugin;
|
||||||
import org.elasticsearch.xpack.ccr.CcrSettings;
|
import org.elasticsearch.xpack.ccr.CcrSettings;
|
||||||
import org.elasticsearch.xpack.ccr.LocalStateCcr;
|
import org.elasticsearch.xpack.ccr.LocalStateCcr;
|
||||||
import org.elasticsearch.xpack.ccr.index.engine.FollowingEngine;
|
import org.elasticsearch.xpack.ccr.index.engine.FollowingEngine;
|
||||||
|
@ -120,7 +122,8 @@ public abstract class CcrIntegTestCase extends ESTestCase {
|
||||||
|
|
||||||
stopClusters();
|
stopClusters();
|
||||||
Collection<Class<? extends Plugin>> mockPlugins = Arrays.asList(ESIntegTestCase.TestSeedPlugin.class,
|
Collection<Class<? extends Plugin>> mockPlugins = Arrays.asList(ESIntegTestCase.TestSeedPlugin.class,
|
||||||
TestZenDiscovery.TestPlugin.class, MockHttpTransport.TestPlugin.class, getTestTransportPlugin());
|
TestZenDiscovery.TestPlugin.class, MockHttpTransport.TestPlugin.class, MockTransportService.TestPlugin.class,
|
||||||
|
MockNioTransportPlugin.class);
|
||||||
|
|
||||||
InternalTestCluster leaderCluster = new InternalTestCluster(randomLong(), createTempDir(), true, true, numberOfNodesPerCluster(),
|
InternalTestCluster leaderCluster = new InternalTestCluster(randomLong(), createTempDir(), true, true, numberOfNodesPerCluster(),
|
||||||
numberOfNodesPerCluster(), UUIDs.randomBase64UUID(random()), createNodeConfigurationSource(null), 0, "leader", mockPlugins,
|
numberOfNodesPerCluster(), UUIDs.randomBase64UUID(random()), createNodeConfigurationSource(null), 0, "leader", mockPlugins,
|
||||||
|
|
|
@ -6,6 +6,8 @@
|
||||||
|
|
||||||
package org.elasticsearch.xpack.ccr;
|
package org.elasticsearch.xpack.ccr;
|
||||||
|
|
||||||
|
import org.elasticsearch.ElasticsearchTimeoutException;
|
||||||
|
import org.elasticsearch.ExceptionsHelper;
|
||||||
import org.elasticsearch.action.ActionListener;
|
import org.elasticsearch.action.ActionListener;
|
||||||
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
|
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
|
||||||
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequest;
|
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequest;
|
||||||
|
@ -34,8 +36,10 @@ import org.elasticsearch.repositories.RepositoryMissingException;
|
||||||
import org.elasticsearch.snapshots.RestoreInfo;
|
import org.elasticsearch.snapshots.RestoreInfo;
|
||||||
import org.elasticsearch.snapshots.RestoreService;
|
import org.elasticsearch.snapshots.RestoreService;
|
||||||
import org.elasticsearch.snapshots.Snapshot;
|
import org.elasticsearch.snapshots.Snapshot;
|
||||||
|
import org.elasticsearch.test.transport.MockTransportService;
|
||||||
import org.elasticsearch.transport.TransportService;
|
import org.elasticsearch.transport.TransportService;
|
||||||
import org.elasticsearch.xpack.CcrIntegTestCase;
|
import org.elasticsearch.xpack.CcrIntegTestCase;
|
||||||
|
import org.elasticsearch.xpack.ccr.action.repositories.GetCcrRestoreFileChunkAction;
|
||||||
import org.elasticsearch.xpack.ccr.repository.CcrRepository;
|
import org.elasticsearch.xpack.ccr.repository.CcrRepository;
|
||||||
import org.elasticsearch.xpack.ccr.repository.CcrRestoreSourceService;
|
import org.elasticsearch.xpack.ccr.repository.CcrRestoreSourceService;
|
||||||
|
|
||||||
|
@ -51,6 +55,7 @@ import static java.util.Collections.singletonMap;
|
||||||
import static org.elasticsearch.snapshots.RestoreService.restoreInProgress;
|
import static org.elasticsearch.snapshots.RestoreService.restoreInProgress;
|
||||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||||
import static org.hamcrest.Matchers.equalTo;
|
import static org.hamcrest.Matchers.equalTo;
|
||||||
|
import static org.hamcrest.Matchers.instanceOf;
|
||||||
|
|
||||||
// TODO: Fold this integration test into a more expansive integration test as more bootstrap from remote work
|
// TODO: Fold this integration test into a more expansive integration test as more bootstrap from remote work
|
||||||
// TODO: is completed.
|
// TODO: is completed.
|
||||||
|
@ -308,6 +313,78 @@ public class CcrRepositoryIT extends CcrIntegTestCase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testIndividualActionsTimeout() throws Exception {
|
||||||
|
ClusterUpdateSettingsRequest settingsRequest = new ClusterUpdateSettingsRequest();
|
||||||
|
TimeValue timeValue = TimeValue.timeValueMillis(100);
|
||||||
|
settingsRequest.persistentSettings(Settings.builder().put(CcrSettings.INDICES_RECOVERY_ACTION_TIMEOUT_SETTING.getKey(), timeValue));
|
||||||
|
assertAcked(followerClient().admin().cluster().updateSettings(settingsRequest).actionGet());
|
||||||
|
|
||||||
|
String leaderClusterRepoName = CcrRepository.NAME_PREFIX + "leader_cluster";
|
||||||
|
String leaderIndex = "index1";
|
||||||
|
String followerIndex = "index2";
|
||||||
|
|
||||||
|
final int numberOfPrimaryShards = randomIntBetween(1, 3);
|
||||||
|
final String leaderIndexSettings = getIndexSettings(numberOfPrimaryShards, between(0, 1),
|
||||||
|
singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
|
||||||
|
assertAcked(leaderClient().admin().indices().prepareCreate(leaderIndex).setSource(leaderIndexSettings, XContentType.JSON));
|
||||||
|
ensureLeaderGreen(leaderIndex);
|
||||||
|
|
||||||
|
List<MockTransportService> transportServices = new ArrayList<>();
|
||||||
|
|
||||||
|
for (TransportService transportService : getFollowerCluster().getDataOrMasterNodeInstances(TransportService.class)) {
|
||||||
|
MockTransportService mockTransportService = (MockTransportService) transportService;
|
||||||
|
transportServices.add(mockTransportService);
|
||||||
|
mockTransportService.addSendBehavior((connection, requestId, action, request, options) -> {
|
||||||
|
if (action.equals(GetCcrRestoreFileChunkAction.NAME) == false) {
|
||||||
|
connection.sendRequest(requestId, action, request, options);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.info("--> indexing some data");
|
||||||
|
for (int i = 0; i < 100; i++) {
|
||||||
|
final String source = String.format(Locale.ROOT, "{\"f\":%d}", i);
|
||||||
|
leaderClient().prepareIndex("index1", "doc", Integer.toString(i)).setSource(source, XContentType.JSON).get();
|
||||||
|
}
|
||||||
|
|
||||||
|
leaderClient().admin().indices().prepareFlush(leaderIndex).setForce(true).setWaitIfOngoing(true).get();
|
||||||
|
|
||||||
|
Settings.Builder settingsBuilder = Settings.builder()
|
||||||
|
.put(IndexMetaData.SETTING_INDEX_PROVIDED_NAME, followerIndex)
|
||||||
|
.put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true);
|
||||||
|
RestoreSnapshotRequest restoreRequest = new RestoreSnapshotRequest(leaderClusterRepoName, CcrRepository.LATEST)
|
||||||
|
.indices(leaderIndex).indicesOptions(indicesOptions).renamePattern("^(.*)$")
|
||||||
|
.renameReplacement(followerIndex).masterNodeTimeout(new TimeValue(1L, TimeUnit.HOURS))
|
||||||
|
.indexSettings(settingsBuilder);
|
||||||
|
|
||||||
|
final RestoreService restoreService = getFollowerCluster().getCurrentMasterNodeInstance(RestoreService.class);
|
||||||
|
final ClusterService clusterService = getFollowerCluster().getCurrentMasterNodeInstance(ClusterService.class);
|
||||||
|
PlainActionFuture<RestoreInfo> future = PlainActionFuture.newFuture();
|
||||||
|
restoreService.restoreSnapshot(restoreRequest, waitForRestore(clusterService, future));
|
||||||
|
|
||||||
|
// Depending on when the timeout occurs this can fail in two ways. If it times-out when fetching
|
||||||
|
// metadata this will throw an exception. If it times-out when restoring a shard, the shard will
|
||||||
|
// be marked as failed. Either one is a success for the purpose of this test.
|
||||||
|
try {
|
||||||
|
RestoreInfo restoreInfo = future.actionGet();
|
||||||
|
assertEquals(0, restoreInfo.successfulShards());
|
||||||
|
assertEquals(numberOfPrimaryShards, restoreInfo.failedShards());
|
||||||
|
} catch (Exception e) {
|
||||||
|
assertThat(ExceptionsHelper.unwrapCause(e), instanceOf(ElasticsearchTimeoutException.class));
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
for (MockTransportService transportService : transportServices) {
|
||||||
|
transportService.clearAllRules();
|
||||||
|
}
|
||||||
|
|
||||||
|
settingsRequest = new ClusterUpdateSettingsRequest();
|
||||||
|
TimeValue defaultValue = CcrSettings.INDICES_RECOVERY_ACTION_TIMEOUT_SETTING.getDefault(Settings.EMPTY);
|
||||||
|
settingsRequest.persistentSettings(Settings.builder().put(CcrSettings.INDICES_RECOVERY_ACTION_TIMEOUT_SETTING.getKey(),
|
||||||
|
defaultValue));
|
||||||
|
assertAcked(followerClient().admin().cluster().updateSettings(settingsRequest).actionGet());
|
||||||
|
}
|
||||||
|
|
||||||
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/37887")
|
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/37887")
|
||||||
public void testFollowerMappingIsUpdated() throws IOException {
|
public void testFollowerMappingIsUpdated() throws IOException {
|
||||||
String leaderClusterRepoName = CcrRepository.NAME_PREFIX + "leader_cluster";
|
String leaderClusterRepoName = CcrRepository.NAME_PREFIX + "leader_cluster";
|
||||||
|
|
|
@ -40,7 +40,7 @@ public class CcrRestoreSourceServiceTests extends IndexShardTestCase {
|
||||||
Settings settings = Settings.builder().put(NODE_NAME_SETTING.getKey(), "node").build();
|
Settings settings = Settings.builder().put(NODE_NAME_SETTING.getKey(), "node").build();
|
||||||
taskQueue = new DeterministicTaskQueue(settings, random());
|
taskQueue = new DeterministicTaskQueue(settings, random());
|
||||||
Set<Setting<?>> registeredSettings = Sets.newHashSet(CcrSettings.INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING,
|
Set<Setting<?>> registeredSettings = Sets.newHashSet(CcrSettings.INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING,
|
||||||
CcrSettings.RECOVERY_MAX_BYTES_PER_SECOND);
|
CcrSettings.RECOVERY_MAX_BYTES_PER_SECOND, CcrSettings.INDICES_RECOVERY_ACTION_TIMEOUT_SETTING);
|
||||||
ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, registeredSettings);
|
ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, registeredSettings);
|
||||||
restoreSourceService = new CcrRestoreSourceService(taskQueue.getThreadPool(), new CcrSettings(Settings.EMPTY, clusterSettings));
|
restoreSourceService = new CcrRestoreSourceService(taskQueue.getThreadPool(), new CcrSettings(Settings.EMPTY, clusterSettings));
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue