Tighten mapping syncing in ccr remote restore (#38071)
There are two issues regarding the way that we sync mapping from leader to follower when a ccr restore is completed: 1. The returned mapping from a cluster service might not be up to date as the mapping of the restored index commit. 2. We should not compare the mapping version of the follower and the leader. They are not related to one another. Moreover, I think we should only ensure that once the restore is done, the mapping on the follower should be at least the mapping of the copied index commit. We don't have to sync the mapping which is updated after we have opened a session. Relates #36879 Closes #37887
This commit is contained in:
parent
5a33816c86
commit
cecfa5bd6d
|
@ -6,11 +6,15 @@
|
|||
package org.elasticsearch.xpack.ccr.action;
|
||||
|
||||
import org.elasticsearch.ElasticsearchStatusException;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
|
||||
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
|
||||
import org.elasticsearch.action.admin.indices.mapping.put.MappingRequestValidator;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.MappingMetaData;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
|
@ -18,6 +22,7 @@ import org.elasticsearch.xpack.ccr.CcrSettings;
|
|||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public final class CcrRequests {
|
||||
|
@ -40,6 +45,39 @@ public final class CcrRequests {
|
|||
return putMappingRequest;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets an {@link IndexMetaData} of the given index. The mapping version and metadata version of the returned {@link IndexMetaData}
|
||||
* must be at least the provided {@code mappingVersion} and {@code metadataVersion} respectively.
|
||||
*/
|
||||
public static void getIndexMetadata(Client client, Index index, long mappingVersion, long metadataVersion,
|
||||
Supplier<TimeValue> timeoutSupplier, ActionListener<IndexMetaData> listener) {
|
||||
final ClusterStateRequest request = CcrRequests.metaDataRequest(index.getName());
|
||||
if (metadataVersion > 0) {
|
||||
request.waitForMetaDataVersion(metadataVersion).waitForTimeout(timeoutSupplier.get());
|
||||
}
|
||||
client.admin().cluster().state(request, ActionListener.wrap(
|
||||
response -> {
|
||||
if (response.getState() == null) {
|
||||
assert metadataVersion > 0 : metadataVersion;
|
||||
throw new IllegalStateException("timeout to get cluster state with" +
|
||||
" metadata version [" + metadataVersion + "], mapping version [" + mappingVersion + "]");
|
||||
}
|
||||
final MetaData metaData = response.getState().metaData();
|
||||
final IndexMetaData indexMetaData = metaData.getIndexSafe(index);
|
||||
if (indexMetaData.getMappingVersion() >= mappingVersion) {
|
||||
listener.onResponse(indexMetaData);
|
||||
return;
|
||||
}
|
||||
if (timeoutSupplier.get().nanos() < 0) {
|
||||
throw new IllegalStateException("timeout to get cluster state with mapping version [" + mappingVersion + "]");
|
||||
}
|
||||
// ask for the next version.
|
||||
getIndexMetadata(client, index, mappingVersion, metaData.version() + 1, timeoutSupplier, listener);
|
||||
},
|
||||
listener::onFailure
|
||||
));
|
||||
}
|
||||
|
||||
public static final MappingRequestValidator CCR_PUT_MAPPING_REQUEST_VALIDATOR = (request, state, indices) -> {
|
||||
if (request.origin() == null) {
|
||||
return null; // a put-mapping-request on old versions does not have origin.
|
||||
|
|
|
@ -24,7 +24,6 @@ import org.elasticsearch.client.Client;
|
|||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.MappingMetaData;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.routing.IndexRoutingTable;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.CheckedConsumer;
|
||||
|
@ -59,6 +58,7 @@ import java.util.Optional;
|
|||
import java.util.function.BiConsumer;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.LongConsumer;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import static org.elasticsearch.xpack.ccr.CcrLicenseChecker.wrapClient;
|
||||
import static org.elasticsearch.xpack.ccr.action.TransportResumeFollowAction.extractLeaderShardHistoryUUIDs;
|
||||
|
@ -111,7 +111,9 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
|
|||
@Override
|
||||
protected void innerUpdateMapping(long minRequiredMappingVersion, LongConsumer handler, Consumer<Exception> errorHandler) {
|
||||
final Index followerIndex = params.getFollowShardId().getIndex();
|
||||
getIndexMetadata(minRequiredMappingVersion, 0L, params, ActionListener.wrap(
|
||||
final Index leaderIndex = params.getLeaderShardId().getIndex();
|
||||
final Supplier<TimeValue> timeout = () -> isStopped() ? TimeValue.MINUS_ONE : waitForMetadataTimeOut;
|
||||
CcrRequests.getIndexMetadata(remoteClient(params), leaderIndex, minRequiredMappingVersion, 0L, timeout, ActionListener.wrap(
|
||||
indexMetaData -> {
|
||||
if (indexMetaData.getMappings().isEmpty()) {
|
||||
assert indexMetaData.getMappingVersion() == 1;
|
||||
|
@ -246,39 +248,6 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
|
|||
return wrapClient(client.getRemoteClusterClient(params.getRemoteCluster()), params.getHeaders());
|
||||
}
|
||||
|
||||
private void getIndexMetadata(long minRequiredMappingVersion, long minRequiredMetadataVersion,
|
||||
ShardFollowTask params, ActionListener<IndexMetaData> listener) {
|
||||
final Index leaderIndex = params.getLeaderShardId().getIndex();
|
||||
final ClusterStateRequest clusterStateRequest = CcrRequests.metaDataRequest(leaderIndex.getName());
|
||||
if (minRequiredMetadataVersion > 0) {
|
||||
clusterStateRequest.waitForMetaDataVersion(minRequiredMetadataVersion).waitForTimeout(waitForMetadataTimeOut);
|
||||
}
|
||||
try {
|
||||
remoteClient(params).admin().cluster().state(clusterStateRequest, ActionListener.wrap(
|
||||
r -> {
|
||||
// if wait_for_metadata_version timeout, the response is empty
|
||||
if (r.getState() == null) {
|
||||
assert minRequiredMetadataVersion > 0;
|
||||
getIndexMetadata(minRequiredMappingVersion, minRequiredMetadataVersion, params, listener);
|
||||
return;
|
||||
}
|
||||
final MetaData metaData = r.getState().metaData();
|
||||
final IndexMetaData indexMetaData = metaData.getIndexSafe(leaderIndex);
|
||||
if (indexMetaData.getMappingVersion() < minRequiredMappingVersion) {
|
||||
// ask for the next version.
|
||||
getIndexMetadata(minRequiredMappingVersion, metaData.version() + 1, params, listener);
|
||||
} else {
|
||||
assert metaData.version() >= minRequiredMetadataVersion : metaData.version() + " < " + minRequiredMetadataVersion;
|
||||
listener.onResponse(indexMetaData);
|
||||
}
|
||||
},
|
||||
listener::onFailure
|
||||
));
|
||||
} catch (Exception e) {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
}
|
||||
|
||||
interface FollowerStatsInfoHandler {
|
||||
void accept(String followerHistoryUUID, long globalCheckpoint, long maxSeqNo);
|
||||
}
|
||||
|
|
|
@ -72,7 +72,8 @@ public class PutCcrRestoreSessionAction extends Action<PutCcrRestoreSessionActio
|
|||
throw new ShardNotFoundException(shardId);
|
||||
}
|
||||
Store.MetadataSnapshot storeFileMetaData = ccrRestoreService.openSession(request.getSessionUUID(), indexShard);
|
||||
return new PutCcrRestoreSessionResponse(clusterService.localNode(), storeFileMetaData);
|
||||
long mappingVersion = indexShard.indexSettings().getIndexMetaData().getMappingVersion();
|
||||
return new PutCcrRestoreSessionResponse(clusterService.localNode(), storeFileMetaData, mappingVersion);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -97,19 +98,22 @@ public class PutCcrRestoreSessionAction extends Action<PutCcrRestoreSessionActio
|
|||
|
||||
private DiscoveryNode node;
|
||||
private Store.MetadataSnapshot storeFileMetaData;
|
||||
private long mappingVersion;
|
||||
|
||||
PutCcrRestoreSessionResponse() {
|
||||
}
|
||||
|
||||
PutCcrRestoreSessionResponse(DiscoveryNode node, Store.MetadataSnapshot storeFileMetaData) {
|
||||
PutCcrRestoreSessionResponse(DiscoveryNode node, Store.MetadataSnapshot storeFileMetaData, long mappingVersion) {
|
||||
this.node = node;
|
||||
this.storeFileMetaData = storeFileMetaData;
|
||||
this.mappingVersion = mappingVersion;
|
||||
}
|
||||
|
||||
PutCcrRestoreSessionResponse(StreamInput in) throws IOException {
|
||||
super(in);
|
||||
node = new DiscoveryNode(in);
|
||||
storeFileMetaData = new Store.MetadataSnapshot(in);
|
||||
mappingVersion = in.readVLong();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -117,6 +121,7 @@ public class PutCcrRestoreSessionAction extends Action<PutCcrRestoreSessionActio
|
|||
super.readFrom(in);
|
||||
node = new DiscoveryNode(in);
|
||||
storeFileMetaData = new Store.MetadataSnapshot(in);
|
||||
mappingVersion = in.readVLong();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -124,6 +129,7 @@ public class PutCcrRestoreSessionAction extends Action<PutCcrRestoreSessionActio
|
|||
super.writeTo(out);
|
||||
node.writeTo(out);
|
||||
storeFileMetaData.writeTo(out);
|
||||
out.writeVLong(mappingVersion);
|
||||
}
|
||||
|
||||
public DiscoveryNode getNode() {
|
||||
|
@ -133,5 +139,9 @@ public class PutCcrRestoreSessionAction extends Action<PutCcrRestoreSessionActio
|
|||
public Store.MetadataSnapshot getStoreFileMetaData() {
|
||||
return storeFileMetaData;
|
||||
}
|
||||
|
||||
public long getMappingVersion() {
|
||||
return mappingVersion;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -29,9 +29,9 @@ import org.elasticsearch.common.io.stream.StreamInput;
|
|||
import org.elasticsearch.common.metrics.CounterMetric;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.CombinedRateLimiter;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.IndexSettings;
|
||||
import org.elasticsearch.index.engine.EngineException;
|
||||
import org.elasticsearch.index.shard.IndexShard;
|
||||
import org.elasticsearch.index.shard.IndexShardRecoveryException;
|
||||
|
@ -72,6 +72,8 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.function.LongConsumer;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
|
||||
/**
|
||||
* This repository relies on a remote cluster for Ccr restores. It is read-only so it can only be used to
|
||||
|
@ -288,11 +290,10 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
|
|||
String name = metadata.name();
|
||||
try (RestoreSession restoreSession = openSession(name, remoteClient, leaderShardId, indexShard, recoveryState)) {
|
||||
restoreSession.restoreFiles();
|
||||
updateMappings(remoteClient, leaderIndex, restoreSession.mappingVersion, client, indexShard.routingEntry().index());
|
||||
} catch (Exception e) {
|
||||
throw new IndexShardRestoreFailedException(indexShard.shardId(), "failed to restore snapshot [" + snapshotId + "]", e);
|
||||
}
|
||||
|
||||
maybeUpdateMappings(client, remoteClient, leaderIndex, indexShard.indexSettings());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -300,18 +301,20 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
|
|||
throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE);
|
||||
}
|
||||
|
||||
private void maybeUpdateMappings(Client localClient, Client remoteClient, Index leaderIndex, IndexSettings followerIndexSettings) {
|
||||
ClusterStateRequest clusterStateRequest = CcrRequests.metaDataRequest(leaderIndex.getName());
|
||||
ClusterStateResponse clusterState = remoteClient.admin().cluster().state(clusterStateRequest)
|
||||
.actionGet(ccrSettings.getRecoveryActionTimeout());
|
||||
IndexMetaData leaderIndexMetadata = clusterState.getState().metaData().getIndexSafe(leaderIndex);
|
||||
long leaderMappingVersion = leaderIndexMetadata.getMappingVersion();
|
||||
|
||||
if (leaderMappingVersion > followerIndexSettings.getIndexMetaData().getMappingVersion()) {
|
||||
Index followerIndex = followerIndexSettings.getIndex();
|
||||
MappingMetaData mappingMetaData = leaderIndexMetadata.mapping();
|
||||
PutMappingRequest putMappingRequest = CcrRequests.putMappingRequest(followerIndex.getName(), mappingMetaData);
|
||||
localClient.admin().indices().putMapping(putMappingRequest).actionGet(ccrSettings.getRecoveryActionTimeout());
|
||||
private void updateMappings(Client leaderClient, Index leaderIndex, long leaderMappingVersion,
|
||||
Client followerClient, Index followerIndex) {
|
||||
final PlainActionFuture<IndexMetaData> indexMetadataFuture = new PlainActionFuture<>();
|
||||
final long startTimeInNanos = System.nanoTime();
|
||||
final Supplier<TimeValue> timeout = () -> {
|
||||
final long elapsedInNanos = System.nanoTime() - startTimeInNanos;
|
||||
return TimeValue.timeValueNanos(ccrSettings.getRecoveryActionTimeout().nanos() - elapsedInNanos);
|
||||
};
|
||||
CcrRequests.getIndexMetadata(leaderClient, leaderIndex, leaderMappingVersion, 0L, timeout, indexMetadataFuture);
|
||||
final IndexMetaData leaderIndexMetadata = indexMetadataFuture.actionGet(ccrSettings.getRecoveryActionTimeout());
|
||||
final MappingMetaData mappingMetaData = leaderIndexMetadata.mapping();
|
||||
if (mappingMetaData != null) {
|
||||
final PutMappingRequest putMappingRequest = CcrRequests.putMappingRequest(followerIndex.getName(), mappingMetaData);
|
||||
followerClient.admin().indices().putMapping(putMappingRequest).actionGet(ccrSettings.getRecoveryActionTimeout());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -321,7 +324,7 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
|
|||
PutCcrRestoreSessionAction.PutCcrRestoreSessionResponse response = remoteClient.execute(PutCcrRestoreSessionAction.INSTANCE,
|
||||
new PutCcrRestoreSessionRequest(sessionUUID, leaderShardId)).actionGet(ccrSettings.getRecoveryActionTimeout());
|
||||
return new RestoreSession(repositoryName, remoteClient, sessionUUID, response.getNode(), indexShard, recoveryState,
|
||||
response.getStoreFileMetaData(), ccrSettings, throttledTime::inc);
|
||||
response.getStoreFileMetaData(), response.getMappingVersion(), ccrSettings, throttledTime::inc);
|
||||
}
|
||||
|
||||
private static class RestoreSession extends FileRestoreContext implements Closeable {
|
||||
|
@ -332,17 +335,19 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
|
|||
private final String sessionUUID;
|
||||
private final DiscoveryNode node;
|
||||
private final Store.MetadataSnapshot sourceMetaData;
|
||||
private final long mappingVersion;
|
||||
private final CcrSettings ccrSettings;
|
||||
private final LongConsumer throttleListener;
|
||||
|
||||
RestoreSession(String repositoryName, Client remoteClient, String sessionUUID, DiscoveryNode node, IndexShard indexShard,
|
||||
RecoveryState recoveryState, Store.MetadataSnapshot sourceMetaData, CcrSettings ccrSettings,
|
||||
LongConsumer throttleListener) {
|
||||
RecoveryState recoveryState, Store.MetadataSnapshot sourceMetaData, long mappingVersion,
|
||||
CcrSettings ccrSettings, LongConsumer throttleListener) {
|
||||
super(repositoryName, indexShard, SNAPSHOT_ID, recoveryState, BUFFER_SIZE);
|
||||
this.remoteClient = remoteClient;
|
||||
this.sessionUUID = sessionUUID;
|
||||
this.node = node;
|
||||
this.sourceMetaData = sourceMetaData;
|
||||
this.mappingVersion = mappingVersion;
|
||||
this.ccrSettings = ccrSettings;
|
||||
this.throttleListener = throttleListener;
|
||||
}
|
||||
|
|
|
@ -390,7 +390,6 @@ public class CcrRepositoryIT extends CcrIntegTestCase {
|
|||
assertAcked(followerClient().admin().cluster().updateSettings(settingsRequest).actionGet());
|
||||
}
|
||||
|
||||
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/37887")
|
||||
public void testFollowerMappingIsUpdated() throws IOException {
|
||||
String leaderClusterRepoName = CcrRepository.NAME_PREFIX + "leader_cluster";
|
||||
String leaderIndex = "index1";
|
||||
|
@ -413,16 +412,8 @@ public class CcrRepositoryIT extends CcrIntegTestCase {
|
|||
.renameReplacement(followerIndex).masterNodeTimeout(new TimeValue(1L, TimeUnit.HOURS))
|
||||
.indexSettings(settingsBuilder);
|
||||
|
||||
// TODO: Eventually when the file recovery work is complete, we should test updated mappings by
|
||||
// indexing to the leader while the recovery is happening. However, into order to that test mappings
|
||||
// are updated prior to that work, we index documents in the clear session callback. This will
|
||||
// ensure a mapping change prior to the final mapping check on the follower side.
|
||||
for (CcrRestoreSourceService restoreSourceService : getLeaderCluster().getDataNodeInstances(CcrRestoreSourceService.class)) {
|
||||
restoreSourceService.addCloseSessionListener(s -> {
|
||||
final String source = String.format(Locale.ROOT, "{\"k\":%d}", 1);
|
||||
leaderClient().prepareIndex("index1", "doc", Long.toString(1)).setSource(source, XContentType.JSON).get();
|
||||
});
|
||||
}
|
||||
final String source = String.format(Locale.ROOT, "{\"k\":%d}", 1);
|
||||
leaderClient().prepareIndex("index1", "doc", Long.toString(1)).setSource(source, XContentType.JSON).get();
|
||||
|
||||
PlainActionFuture<RestoreInfo> future = PlainActionFuture.newFuture();
|
||||
restoreService.restoreSnapshot(restoreRequest, waitForRestore(clusterService, future));
|
||||
|
@ -435,10 +426,6 @@ public class CcrRepositoryIT extends CcrIntegTestCase {
|
|||
clusterStateRequest.clear();
|
||||
clusterStateRequest.metaData(true);
|
||||
clusterStateRequest.indices(followerIndex);
|
||||
ClusterStateResponse clusterState = followerClient().admin().cluster().state(clusterStateRequest).actionGet();
|
||||
IndexMetaData followerIndexMetadata = clusterState.getState().metaData().index(followerIndex);
|
||||
assertEquals(2, followerIndexMetadata.getMappingVersion());
|
||||
|
||||
MappingMetaData mappingMetaData = followerClient().admin().indices().prepareGetMappings("index2").get().getMappings()
|
||||
.get("index2").get("doc");
|
||||
assertThat(XContentMapValues.extractValue("properties.k.type", mappingMetaData.sourceAsMap()), equalTo("long"));
|
||||
|
|
Loading…
Reference in New Issue