Prevent CCR recovery from missing documents (#38237)

Currently the snapshot/restore process manually sets the global
checkpoint to the max sequence number from the restored segements. This
does not work for Ccr as this will lead to documents that would be
recovered in the normal followering operation from being recovered.

This commit fixes this issue by setting the initial global checkpoint to
the existing local checkpoint.
This commit is contained in:
Tim Brooks 2019-02-05 13:32:41 -06:00 committed by GitHub
parent aef5775561
commit c2a8fe1f91
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 399 additions and 218 deletions

View File

@ -397,9 +397,9 @@ final class StoreRecovery {
assert indexShouldExists;
store.bootstrapNewHistory();
final SegmentInfos segmentInfos = store.readLastCommittedSegmentsInfo();
final long maxSeqNo = Long.parseLong(segmentInfos.userData.get(SequenceNumbers.MAX_SEQ_NO));
final long localCheckpoint = Long.parseLong(segmentInfos.userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
final String translogUUID = Translog.createEmptyTranslog(
indexShard.shardPath().resolveTranslog(), maxSeqNo, shardId, indexShard.getPendingPrimaryTerm());
indexShard.shardPath().resolveTranslog(), localCheckpoint, shardId, indexShard.getPendingPrimaryTerm());
store.associateIndexWithNewTranslog(translogUUID);
} else if (indexShouldExists) {
if (recoveryState.getRecoverySource().shouldBootstrapNewHistoryUUID()) {
@ -466,9 +466,9 @@ final class StoreRecovery {
final Store store = indexShard.store();
store.bootstrapNewHistory();
final SegmentInfos segmentInfos = store.readLastCommittedSegmentsInfo();
final long maxSeqNo = Long.parseLong(segmentInfos.userData.get(SequenceNumbers.MAX_SEQ_NO));
final long localCheckpoint = Long.parseLong(segmentInfos.userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
final String translogUUID = Translog.createEmptyTranslog(
indexShard.shardPath().resolveTranslog(), maxSeqNo, shardId, indexShard.getPendingPrimaryTerm());
indexShard.shardPath().resolveTranslog(), localCheckpoint, shardId, indexShard.getPendingPrimaryTerm());
store.associateIndexWithNewTranslog(translogUUID);
assert indexShard.shardRouting.primary() : "only primary shards can recover from store";
indexShard.openEngineAndRecoverFromTranslog();

View File

@ -1428,26 +1428,27 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
try {
Map<String, String> userData = readLastCommittedSegmentsInfo().getUserData();
final long maxSeqNo = Long.parseLong(userData.get(SequenceNumbers.MAX_SEQ_NO));
bootstrapNewHistory(maxSeqNo);
final long localCheckpoint = Long.parseLong(userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
bootstrapNewHistory(localCheckpoint, maxSeqNo);
} finally {
metadataLock.writeLock().unlock();
}
}
/**
* Marks an existing lucene index with a new history uuid and sets the given maxSeqNo as the local checkpoint
* Marks an existing lucene index with a new history uuid and sets the given local checkpoint
* as well as the maximum sequence number.
* This is used to make sure no existing shard will recovery from this index using ops based recovery.
* This is used to make sure no existing shard will recover from this index using ops based recovery.
* @see SequenceNumbers#LOCAL_CHECKPOINT_KEY
* @see SequenceNumbers#MAX_SEQ_NO
*/
public void bootstrapNewHistory(long maxSeqNo) throws IOException {
public void bootstrapNewHistory(long localCheckpoint, long maxSeqNo) throws IOException {
metadataLock.writeLock().lock();
try (IndexWriter writer = newAppendingIndexWriter(directory, null)) {
final Map<String, String> map = new HashMap<>();
map.put(Engine.HISTORY_UUID_KEY, UUIDs.randomBase64UUID());
map.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(localCheckpoint));
map.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(maxSeqNo));
map.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(maxSeqNo));
updateCommitData(writer, map);
} finally {
metadataLock.writeLock().unlock();

View File

@ -21,7 +21,6 @@ package org.elasticsearch.index.shard;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexableField;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.IndexSearcher;
@ -46,8 +45,6 @@ import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.AllocationId;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
@ -62,7 +59,6 @@ import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.lease.Releasable;
@ -107,7 +103,6 @@ import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.mapper.VersionFieldMapper;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.StoreStats;
import org.elasticsearch.index.store.StoreUtils;
@ -121,12 +116,8 @@ import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.indices.recovery.RecoveryTarget;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.RepositoryData;
import org.elasticsearch.snapshots.Snapshot;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.snapshots.SnapshotInfo;
import org.elasticsearch.snapshots.SnapshotShardFailure;
import org.elasticsearch.test.CorruptionUtils;
import org.elasticsearch.test.DummyShardLock;
import org.elasticsearch.test.FieldMaskingReader;
@ -143,7 +134,6 @@ import java.nio.file.attribute.BasicFileAttributes;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
@ -174,7 +164,6 @@ import static org.elasticsearch.common.lucene.Lucene.cleanLuceneIndex;
import static org.elasticsearch.common.xcontent.ToXContent.EMPTY_PARAMS;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
import static org.elasticsearch.repositories.RepositoryData.EMPTY_REPO_GEN;
import static org.elasticsearch.test.hamcrest.RegexMatcher.matches;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.containsString;
@ -2159,9 +2148,12 @@ public class IndexShardTests extends IndexShardTestCase {
public void testRestoreShard() throws IOException {
final IndexShard source = newStartedShard(true);
IndexShard target = newStartedShard(true);
IndexShard target = newStartedShard(true, Settings.builder()
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), source.indexSettings().isSoftDeleteEnabled()).build());
indexDoc(source, "_doc", "0");
EngineTestCase.generateNewSeqNo(source.getEngine()); // create a gap in the history
indexDoc(source, "_doc", "2");
if (randomBoolean()) {
source.refresh("test");
}
@ -2197,16 +2189,18 @@ public class IndexShardTests extends IndexShardTestCase {
}
}
}));
assertThat(target.getLocalCheckpoint(), equalTo(0L));
assertThat(target.seqNoStats().getMaxSeqNo(), equalTo(0L));
assertThat(target.getReplicationTracker().getGlobalCheckpoint(), equalTo(0L));
assertThat(target.getLocalCheckpoint(), equalTo(2L));
assertThat(target.seqNoStats().getMaxSeqNo(), equalTo(2L));
assertThat(target.seqNoStats().getGlobalCheckpoint(), equalTo(0L));
IndexShardTestCase.updateRoutingEntry(target, routing.moveToStarted());
assertThat(target.getReplicationTracker().getTrackedLocalCheckpointForShard(
target.routingEntry().allocationId().getId()).getLocalCheckpoint(), equalTo(0L));
target.routingEntry().allocationId().getId()).getLocalCheckpoint(), equalTo(2L));
assertThat(target.seqNoStats().getGlobalCheckpoint(), equalTo(2L));
assertDocs(target, "0");
assertDocs(target, "0", "2");
closeShards(source, target);
closeShard(source, false);
closeShards(target);
}
public void testSearcherWrapperIsUsed() throws IOException {
@ -3131,107 +3125,6 @@ public class IndexShardTests extends IndexShardTestCase {
return new Result(localCheckpoint, max);
}
/** A dummy repository for testing which just needs restore overridden */
private abstract static class RestoreOnlyRepository extends AbstractLifecycleComponent implements Repository {
private final String indexName;
RestoreOnlyRepository(String indexName) {
this.indexName = indexName;
}
@Override
protected void doStart() {
}
@Override
protected void doStop() {
}
@Override
protected void doClose() {
}
@Override
public RepositoryMetaData getMetadata() {
return null;
}
@Override
public SnapshotInfo getSnapshotInfo(SnapshotId snapshotId) {
return null;
}
@Override
public MetaData getSnapshotGlobalMetaData(SnapshotId snapshotId) {
return null;
}
@Override
public IndexMetaData getSnapshotIndexMetaData(SnapshotId snapshotId, IndexId index) throws IOException {
return null;
}
@Override
public RepositoryData getRepositoryData() {
Map<IndexId, Set<SnapshotId>> map = new HashMap<>();
map.put(new IndexId(indexName, "blah"), emptySet());
return new RepositoryData(EMPTY_REPO_GEN, Collections.emptyMap(), Collections.emptyMap(), map, Collections.emptyList());
}
@Override
public void initializeSnapshot(SnapshotId snapshotId, List<IndexId> indices, MetaData metaData) {
}
@Override
public SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List<IndexId> indices, long startTime, String failure,
int totalShards, List<SnapshotShardFailure> shardFailures, long repositoryStateId,
boolean includeGlobalState) {
return null;
}
@Override
public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId) {
}
@Override
public long getSnapshotThrottleTimeInNanos() {
return 0;
}
@Override
public long getRestoreThrottleTimeInNanos() {
return 0;
}
@Override
public String startVerification() {
return null;
}
@Override
public void endVerification(String verificationToken) {
}
@Override
public boolean isReadOnly() {
return false;
}
@Override
public void snapshotShard(IndexShard shard, Store store, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit,
IndexShardSnapshotStatus snapshotStatus) {
}
@Override
public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, Version version, IndexId indexId, ShardId shardId) {
return null;
}
@Override
public void verify(String verificationToken, DiscoveryNode localNode) {
}
}
public void testIsSearchIdle() throws Exception {
Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)

View File

@ -0,0 +1,146 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.shard;
import org.apache.lucene.index.IndexCommit;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.RepositoryData;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.snapshots.SnapshotInfo;
import org.elasticsearch.snapshots.SnapshotShardFailure;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static java.util.Collections.emptySet;
import static org.elasticsearch.repositories.RepositoryData.EMPTY_REPO_GEN;
/** A dummy repository for testing which just needs restore overridden */
public abstract class RestoreOnlyRepository extends AbstractLifecycleComponent implements Repository {
private final String indexName;
public RestoreOnlyRepository(String indexName) {
this.indexName = indexName;
}
@Override
protected void doStart() {
}
@Override
protected void doStop() {
}
@Override
protected void doClose() {
}
@Override
public RepositoryMetaData getMetadata() {
return null;
}
@Override
public SnapshotInfo getSnapshotInfo(SnapshotId snapshotId) {
return null;
}
@Override
public MetaData getSnapshotGlobalMetaData(SnapshotId snapshotId) {
return null;
}
@Override
public IndexMetaData getSnapshotIndexMetaData(SnapshotId snapshotId, IndexId index) throws IOException {
return null;
}
@Override
public RepositoryData getRepositoryData() {
Map<IndexId, Set<SnapshotId>> map = new HashMap<>();
map.put(new IndexId(indexName, "blah"), emptySet());
return new RepositoryData(EMPTY_REPO_GEN, Collections.emptyMap(), Collections.emptyMap(), map, Collections.emptyList());
}
@Override
public void initializeSnapshot(SnapshotId snapshotId, List<IndexId> indices, MetaData metaData) {
}
@Override
public SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List<IndexId> indices, long startTime, String failure,
int totalShards, List<SnapshotShardFailure> shardFailures, long repositoryStateId,
boolean includeGlobalState) {
return null;
}
@Override
public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId) {
}
@Override
public long getSnapshotThrottleTimeInNanos() {
return 0;
}
@Override
public long getRestoreThrottleTimeInNanos() {
return 0;
}
@Override
public String startVerification() {
return null;
}
@Override
public void endVerification(String verificationToken) {
}
@Override
public boolean isReadOnly() {
return false;
}
@Override
public void snapshotShard(IndexShard shard, Store store, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit,
IndexShardSnapshotStatus snapshotStatus) {
}
@Override
public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, Version version, IndexId indexId, ShardId shardId) {
return null;
}
@Override
public void verify(String verificationToken, DiscoveryNode localNode) {
}
}

View File

@ -52,6 +52,7 @@ public class BackgroundIndexer implements AutoCloseable {
private final Logger logger = LogManager.getLogger(getClass());
final Thread[] writers;
final Client client;
final CountDownLatch stopLatch;
final CopyOnWriteArrayList<Exception> failures;
final AtomicBoolean stop = new AtomicBoolean(false);
@ -122,6 +123,7 @@ public class BackgroundIndexer implements AutoCloseable {
if (random == null) {
random = RandomizedTest.getRandom();
}
this.client = client;
useAutoGeneratedIDs = random.nextBoolean();
failures = new CopyOnWriteArrayList<>();
writers = new Thread[writerCount];
@ -316,6 +318,10 @@ public class BackgroundIndexer implements AutoCloseable {
stop();
}
public Client getClient() {
return client;
}
/**
* Returns the ID set of all documents indexed by this indexer run
*/

View File

@ -218,21 +218,17 @@ public final class TransportPutFollowAction
final PutFollowAction.Request request,
final ActionListener<PutFollowAction.Response> listener) {
assert request.waitForActiveShards() != ActiveShardCount.DEFAULT : "PutFollowAction does not support DEFAULT.";
activeShardsObserver.waitForActiveShards(new String[]{request.getFollowerIndex()},
request.waitForActiveShards(), request.timeout(), result -> {
if (result) {
FollowParameters parameters = request.getParameters();
ResumeFollowAction.Request resumeFollowRequest = new ResumeFollowAction.Request();
resumeFollowRequest.setFollowerIndex(request.getFollowerIndex());
resumeFollowRequest.setParameters(new FollowParameters(parameters));
client.execute(ResumeFollowAction.INSTANCE, resumeFollowRequest, ActionListener.wrap(
r -> listener.onResponse(new PutFollowAction.Response(true, true, r.isAcknowledged())),
listener::onFailure
));
} else {
listener.onResponse(new PutFollowAction.Response(true, false, false));
}
}, listener::onFailure);
FollowParameters parameters = request.getParameters();
ResumeFollowAction.Request resumeFollowRequest = new ResumeFollowAction.Request();
resumeFollowRequest.setFollowerIndex(request.getFollowerIndex());
resumeFollowRequest.setParameters(new FollowParameters(parameters));
client.execute(ResumeFollowAction.INSTANCE, resumeFollowRequest, ActionListener.wrap(
r -> activeShardsObserver.waitForActiveShards(new String[]{request.getFollowerIndex()},
request.waitForActiveShards(), request.timeout(), result ->
listener.onResponse(new PutFollowAction.Response(true, result, r.isAcknowledged())),
listener::onFailure),
listener::onFailure
));
}
@Override

View File

@ -44,6 +44,7 @@ import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.engine.DocIdSeqNoAndTerm;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.IndexShard;
@ -58,6 +59,7 @@ import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.test.BackgroundIndexer;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.InternalTestCluster;
@ -93,6 +95,8 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BooleanSupplier;
import java.util.function.Function;
import java.util.stream.Collectors;
@ -554,6 +558,70 @@ public abstract class CcrIntegTestCase extends ESTestCase {
});
}
/**
* Waits until at least a give number of document is visible for searchers
*
* @param numDocs number of documents to wait for
* @param indexer a {@link org.elasticsearch.test.BackgroundIndexer}. Will be first checked for documents indexed.
* This saves on unneeded searches.
* @return the actual number of docs seen.
*/
public long waitForDocs(final long numDocs, final BackgroundIndexer indexer) throws InterruptedException {
// indexing threads can wait for up to ~1m before retrying when they first try to index into a shard which is not STARTED.
return waitForDocs(numDocs, 90, TimeUnit.SECONDS, indexer);
}
/**
* Waits until at least a give number of document is visible for searchers
*
* @param numDocs number of documents to wait for
* @param maxWaitTime if not progress have been made during this time, fail the test
* @param maxWaitTimeUnit the unit in which maxWaitTime is specified
* @param indexer Will be first checked for documents indexed.
* This saves on unneeded searches.
* @return the actual number of docs seen.
*/
public long waitForDocs(final long numDocs, int maxWaitTime, TimeUnit maxWaitTimeUnit, final BackgroundIndexer indexer)
throws InterruptedException {
final AtomicLong lastKnownCount = new AtomicLong(-1);
long lastStartCount = -1;
BooleanSupplier testDocs = () -> {
lastKnownCount.set(indexer.totalIndexedDocs());
if (lastKnownCount.get() >= numDocs) {
try {
long count = indexer.getClient().prepareSearch()
.setTrackTotalHits(true)
.setSize(0)
.setQuery(QueryBuilders.matchAllQuery())
.get()
.getHits().getTotalHits().value;
if (count == lastKnownCount.get()) {
// no progress - try to refresh for the next time
indexer.getClient().admin().indices().prepareRefresh().get();
}
lastKnownCount.set(count);
} catch (Exception e) { // count now acts like search and barfs if all shards failed...
logger.debug("failed to executed count", e);
return false;
}
logger.debug("[{}] docs visible for search. waiting for [{}]", lastKnownCount.get(), numDocs);
} else {
logger.debug("[{}] docs indexed. waiting for [{}]", lastKnownCount.get(), numDocs);
}
return lastKnownCount.get() >= numDocs;
};
while (!awaitBusy(testDocs, maxWaitTime, maxWaitTimeUnit)) {
if (lastStartCount == lastKnownCount.get()) {
// we didn't make any progress
fail("failed to reach " + numDocs + "docs");
}
lastStartCount = lastKnownCount.get();
}
return lastKnownCount.get();
}
static void removeCCRRelatedMetadataFromClusterState(ClusterService clusterService) throws Exception {
CountDownLatch latch = new CountDownLatch(1);
clusterService.submitStateUpdateTask("remove-ccr-related-metadata", new ClusterStateUpdateTask() {

View File

@ -32,7 +32,6 @@ import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
@ -61,6 +60,7 @@ import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.snapshots.SnapshotRestoreException;
import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.test.BackgroundIndexer;
import org.elasticsearch.transport.NoSuchRemoteClusterException;
import org.elasticsearch.xpack.CcrIntegTestCase;
import org.elasticsearch.xpack.ccr.action.ShardFollowTask;
@ -118,93 +118,85 @@ public class IndexFollowingIT extends CcrIntegTestCase {
} else {
firstBatchNumDocs = randomIntBetween(10, 64);
}
final int flushPoint = (int) (firstBatchNumDocs * 0.75);
logger.info("Indexing [{}] docs as first batch", firstBatchNumDocs);
BulkRequestBuilder bulkRequestBuilder = leaderClient().prepareBulk();
for (int i = 0; i < flushPoint; i++) {
final String source = String.format(Locale.ROOT, "{\"f\":%d}", i);
IndexRequest indexRequest = new IndexRequest("index1", "doc", Integer.toString(i))
.source(source, XContentType.JSON)
.timeout(TimeValue.timeValueSeconds(1));
bulkRequestBuilder.add(indexRequest);
}
bulkRequestBuilder.get();
try (BackgroundIndexer indexer = new BackgroundIndexer("index1", "_doc", leaderClient(), firstBatchNumDocs,
randomIntBetween(1, 5))) {
waitForDocs(randomInt(firstBatchNumDocs), indexer);
leaderClient().admin().indices().prepareFlush("index1").setWaitIfOngoing(true).get();
waitForDocs(firstBatchNumDocs, indexer);
indexer.assertNoFailures();
leaderClient().admin().indices().prepareFlush("index1").setWaitIfOngoing(true).get();
boolean waitOnAll = randomBoolean();
// Index some docs after the flush that might be recovered in the normal index following operations
for (int i = flushPoint; i < firstBatchNumDocs; i++) {
final String source = String.format(Locale.ROOT, "{\"f\":%d}", i);
leaderClient().prepareIndex("index1", "doc", Integer.toString(i)).setSource(source, XContentType.JSON).get();
}
boolean waitOnAll = randomBoolean();
final PutFollowAction.Request followRequest;
if (waitOnAll) {
followRequest = putFollow("index1", "index2", ActiveShardCount.ALL);
} else {
followRequest = putFollow("index1", "index2", ActiveShardCount.ONE);
}
PutFollowAction.Response response = followerClient().execute(PutFollowAction.INSTANCE, followRequest).get();
assertTrue(response.isFollowIndexCreated());
assertTrue(response.isFollowIndexShardsAcked());
assertTrue(response.isIndexFollowingStarted());
ClusterHealthRequest healthRequest = Requests.clusterHealthRequest("index2").waitForNoRelocatingShards(true);
ClusterIndexHealth indexHealth = followerClient().admin().cluster().health(healthRequest).actionGet().getIndices().get("index2");
for (ClusterShardHealth shardHealth : indexHealth.getShards().values()) {
final PutFollowAction.Request followRequest;
if (waitOnAll) {
assertTrue(shardHealth.isPrimaryActive());
assertEquals(1 + numberOfReplicas, shardHealth.getActiveShards());
followRequest = putFollow("index1", "index2", ActiveShardCount.ALL);
} else {
assertTrue(shardHealth.isPrimaryActive());
followRequest = putFollow("index1", "index2", ActiveShardCount.ONE);
}
}
PutFollowAction.Response response = followerClient().execute(PutFollowAction.INSTANCE, followRequest).get();
assertTrue(response.isFollowIndexCreated());
assertTrue(response.isFollowIndexShardsAcked());
assertTrue(response.isIndexFollowingStarted());
final Map<ShardId, Long> firstBatchNumDocsPerShard = new HashMap<>();
final ShardStats[] firstBatchShardStats =
leaderClient().admin().indices().prepareStats("index1").get().getIndex("index1").getShards();
for (final ShardStats shardStats : firstBatchShardStats) {
if (shardStats.getShardRouting().primary()) {
long value = shardStats.getStats().getIndexing().getTotal().getIndexCount() - 1;
firstBatchNumDocsPerShard.put(shardStats.getShardRouting().shardId(), value);
ClusterHealthRequest healthRequest = Requests.clusterHealthRequest("index2").waitForNoRelocatingShards(true);
ClusterIndexHealth indexHealth = followerClient().admin().cluster().health(healthRequest).get().getIndices().get("index2");
for (ClusterShardHealth shardHealth : indexHealth.getShards().values()) {
if (waitOnAll) {
assertTrue(shardHealth.isPrimaryActive());
assertEquals(1 + numberOfReplicas, shardHealth.getActiveShards());
} else {
assertTrue(shardHealth.isPrimaryActive());
}
}
}
assertBusy(assertTask(numberOfPrimaryShards, firstBatchNumDocsPerShard));
for (int i = 0; i < firstBatchNumDocs; i++) {
assertBusy(assertExpectedDocumentRunnable(i));
}
pauseFollow("index2");
followerClient().execute(ResumeFollowAction.INSTANCE, resumeFollow("index2")).get();
final int secondBatchNumDocs = randomIntBetween(2, 64);
logger.info("Indexing [{}] docs as second batch", secondBatchNumDocs);
for (int i = firstBatchNumDocs; i < firstBatchNumDocs + secondBatchNumDocs; i++) {
final String source = String.format(Locale.ROOT, "{\"f\":%d}", i);
leaderClient().prepareIndex("index1", "doc", Integer.toString(i)).setSource(source, XContentType.JSON).get();
}
final Map<ShardId, Long> secondBatchNumDocsPerShard = new HashMap<>();
final ShardStats[] secondBatchShardStats =
leaderClient().admin().indices().prepareStats("index1").get().getIndex("index1").getShards();
for (final ShardStats shardStats : secondBatchShardStats) {
if (shardStats.getShardRouting().primary()) {
final long value = shardStats.getStats().getIndexing().getTotal().getIndexCount() - 1;
secondBatchNumDocsPerShard.put(shardStats.getShardRouting().shardId(), value);
final Map<ShardId, Long> firstBatchNumDocsPerShard = new HashMap<>();
final ShardStats[] firstBatchShardStats =
leaderClient().admin().indices().prepareStats("index1").get().getIndex("index1").getShards();
for (final ShardStats shardStats : firstBatchShardStats) {
if (shardStats.getShardRouting().primary()) {
long value = shardStats.getStats().getIndexing().getTotal().getIndexCount() - 1;
firstBatchNumDocsPerShard.put(shardStats.getShardRouting().shardId(), value);
}
}
}
assertBusy(assertTask(numberOfPrimaryShards, secondBatchNumDocsPerShard));
assertBusy(assertTask(numberOfPrimaryShards, firstBatchNumDocsPerShard));
for (int i = firstBatchNumDocs; i < firstBatchNumDocs + secondBatchNumDocs; i++) {
assertBusy(assertExpectedDocumentRunnable(i));
for (String docId : indexer.getIds()) {
assertBusy(() -> {
final GetResponse getResponse = followerClient().prepareGet("index2", "_doc", docId).get();
assertTrue("Doc with id [" + docId + "] is missing", getResponse.isExists());
});
}
pauseFollow("index2");
followerClient().execute(ResumeFollowAction.INSTANCE, resumeFollow("index2")).get();
final int secondBatchNumDocs = randomIntBetween(2, 64);
logger.info("Indexing [{}] docs as second batch", secondBatchNumDocs);
indexer.continueIndexing(secondBatchNumDocs);
final Map<ShardId, Long> secondBatchNumDocsPerShard = new HashMap<>();
final ShardStats[] secondBatchShardStats =
leaderClient().admin().indices().prepareStats("index1").get().getIndex("index1").getShards();
for (final ShardStats shardStats : secondBatchShardStats) {
if (shardStats.getShardRouting().primary()) {
final long value = shardStats.getStats().getIndexing().getTotal().getIndexCount() - 1;
secondBatchNumDocsPerShard.put(shardStats.getShardRouting().shardId(), value);
}
}
assertBusy(assertTask(numberOfPrimaryShards, secondBatchNumDocsPerShard));
for (String docId : indexer.getIds()) {
assertBusy(() -> {
final GetResponse getResponse = followerClient().prepareGet("index2", "_doc", docId).get();
assertTrue("Doc with id [" + docId + "] is missing", getResponse.isExists());
});
}
pauseFollow("index2");
assertMaxSeqNoOfUpdatesIsTransferred(resolveLeaderIndex("index1"), resolveFollowerIndex("index2"), numberOfPrimaryShards);
}
pauseFollow("index2");
assertMaxSeqNoOfUpdatesIsTransferred(resolveLeaderIndex("index1"), resolveFollowerIndex("index2"), numberOfPrimaryShards);
}
public void testFollowIndexWithConcurrentMappingChanges() throws Exception {

View File

@ -5,27 +5,46 @@
*/
package org.elasticsearch.xpack.ccr.index.engine;
import org.apache.lucene.store.IOContext;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingHelper;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.EngineTestCase;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardTestCase;
import org.elasticsearch.index.shard.RestoreOnlyRepository;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.snapshots.Snapshot;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ccr.CcrSettings;
import java.io.IOException;
import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static org.elasticsearch.cluster.routing.TestShardRouting.newShardRouting;
import static org.elasticsearch.common.lucene.Lucene.cleanLuceneIndex;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
public class FollowEngineIndexShardTests extends IndexShardTestCase {
@ -76,4 +95,63 @@ public class FollowEngineIndexShardTests extends IndexShardTestCase {
closeShards(indexShard);
}
public void testRestoreShard() throws IOException {
final Settings sourceSettings = Settings.builder()
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
.build();
final IndexShard source = newStartedShard(true, sourceSettings);
final Settings targetSettings = Settings.builder()
.put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true)
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
.build();
IndexShard target = newStartedShard(true, targetSettings, new FollowingEngineFactory());
assertThat(IndexShardTestCase.getEngine(target), instanceOf(FollowingEngine.class));
indexDoc(source, "_doc", "0");
EngineTestCase.generateNewSeqNo(IndexShardTestCase.getEngine(source));
indexDoc(source, "_doc", "2");
if (randomBoolean()) {
source.refresh("test");
}
flushShard(source); // only flush source
ShardRouting routing = ShardRoutingHelper.initWithSameId(target.routingEntry(),
RecoverySource.ExistingStoreRecoverySource.INSTANCE);
final Snapshot snapshot = new Snapshot("foo", new SnapshotId("bar", UUIDs.randomBase64UUID()));
routing = ShardRoutingHelper.newWithRestoreSource(routing,
new RecoverySource.SnapshotRecoverySource(UUIDs.randomBase64UUID(), snapshot, Version.CURRENT, "test"));
target = reinitShard(target, routing);
Store sourceStore = source.store();
Store targetStore = target.store();
DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
target.markAsRecovering("store", new RecoveryState(routing, localNode, null));
assertTrue(target.restoreFromRepository(new RestoreOnlyRepository("test") {
@Override
public void restoreShard(IndexShard shard, SnapshotId snapshotId, Version version, IndexId indexId, ShardId snapshotShardId,
RecoveryState recoveryState) {
try {
cleanLuceneIndex(targetStore.directory());
for (String file : sourceStore.directory().listAll()) {
if (file.equals("write.lock") || file.startsWith("extra")) {
continue;
}
targetStore.directory().copyFrom(sourceStore.directory(), file, file, IOContext.DEFAULT);
}
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}
}));
assertThat(target.getLocalCheckpoint(), equalTo(0L));
assertThat(target.seqNoStats().getMaxSeqNo(), equalTo(2L));
assertThat(target.seqNoStats().getGlobalCheckpoint(), equalTo(0L));
IndexShardTestCase.updateRoutingEntry(target, routing.moveToStarted());
assertThat(target.seqNoStats().getGlobalCheckpoint(), equalTo(0L));
assertDocs(target, "0", "2");
closeShard(source, false);
closeShards(target);
}
}

View File

@ -128,7 +128,8 @@ public final class SourceOnlySnapshotRepository extends FilterRepository {
snapshot.syncSnapshot(snapshotIndexCommit);
// we will use the lucene doc ID as the seq ID so we set the local checkpoint to maxDoc with a new index UUID
SegmentInfos segmentInfos = tempStore.readLastCommittedSegmentsInfo();
tempStore.bootstrapNewHistory(segmentInfos.totalMaxDoc());
final long maxDoc = segmentInfos.totalMaxDoc();
tempStore.bootstrapNewHistory(maxDoc, maxDoc);
store.incRef();
try (DirectoryReader reader = DirectoryReader.open(tempStore.directory())) {
IndexCommit indexCommit = reader.getIndexCommit();