Non-peer recovery should set the global checkpoint (#27965)

Non-Peer recoveries should restore the global checkpoint rather than wait for the activation of the primary. This brings us a step closer to a universe where a recovered shard always has a valid global checkpoint. Concretely:

1) Recovery from store can read the checkpoint from the translog
2) Recovery from local shards and snapshots can set the global checkpoint to the local checkpoint as this is the only copy of the shard.
3) Recovery of an empty shard can set it to `NO_OPS_PERFORMED`

Peer recoveries will follow but require more work and thus will have their own PR.

I also used the moment to clean up `IndexShard`'s api around starting the engine and doing recovery from the translog. The current naming are a relic of the past and don't align with the current naming schemes in the engine.
This commit is contained in:
Boaz Leskes 2017-12-22 21:39:12 +01:00 committed by GitHub
parent 6435928c4f
commit adb49efe17
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 96 additions and 103 deletions

View File

@ -49,7 +49,6 @@ import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lucene.LoggerInfoStream; import org.elasticsearch.common.lucene.LoggerInfoStream;
import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.Lucene;
@ -59,7 +58,6 @@ import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver;
import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndSeqNo; import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndSeqNo;
import org.elasticsearch.common.metrics.CounterMetric; import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.KeyedLock;
import org.elasticsearch.common.util.concurrent.ReleasableLock; import org.elasticsearch.common.util.concurrent.ReleasableLock;
import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.VersionType; import org.elasticsearch.index.VersionType;
@ -244,9 +242,9 @@ public class InternalEngine extends Engine {
break; break;
case OPEN_INDEX_AND_TRANSLOG: case OPEN_INDEX_AND_TRANSLOG:
case OPEN_INDEX_CREATE_TRANSLOG: case OPEN_INDEX_CREATE_TRANSLOG:
final Tuple<Long, Long> seqNoStats = store.loadSeqNoInfo(); final SequenceNumbers.CommitInfo seqNoStats = store.loadSeqNoInfo();
maxSeqNo = seqNoStats.v1(); maxSeqNo = seqNoStats.maxSeqNo;
localCheckpoint = seqNoStats.v2(); localCheckpoint = seqNoStats.localCheckpoint;
logger.trace("recovered maximum sequence number [{}] and local checkpoint [{}]", maxSeqNo, localCheckpoint); logger.trace("recovered maximum sequence number [{}] and local checkpoint [{}]", maxSeqNo, localCheckpoint);
break; break;
default: throw new IllegalArgumentException("unknown type: " + openMode); default: throw new IllegalArgumentException("unknown type: " + openMode);

View File

@ -19,8 +19,6 @@
package org.elasticsearch.index.seqno; package org.elasticsearch.index.seqno;
import org.elasticsearch.common.collect.Tuple;
import java.util.Map; import java.util.Map;
/** /**
@ -49,7 +47,7 @@ public class SequenceNumbers {
* @param commitData the commit data * @param commitData the commit data
* @return the sequence number stats * @return the sequence number stats
*/ */
public static Tuple<Long, Long> loadSeqNoInfoFromLuceneCommit( public static CommitInfo loadSeqNoInfoFromLuceneCommit(
final Iterable<Map.Entry<String, String>> commitData) { final Iterable<Map.Entry<String, String>> commitData) {
long maxSeqNo = NO_OPS_PERFORMED; long maxSeqNo = NO_OPS_PERFORMED;
long localCheckpoint = NO_OPS_PERFORMED; long localCheckpoint = NO_OPS_PERFORMED;
@ -65,7 +63,7 @@ public class SequenceNumbers {
} }
} }
return new Tuple<>(maxSeqNo, localCheckpoint); return new CommitInfo(maxSeqNo, localCheckpoint);
} }
/** /**
@ -116,4 +114,13 @@ public class SequenceNumbers {
} }
} }
public static final class CommitInfo {
public final long maxSeqNo;
public final long localCheckpoint;
public CommitInfo(long maxSeqNo, long localCheckpoint) {
this.maxSeqNo = maxSeqNo;
this.localCheckpoint = localCheckpoint;
}
}
} }

View File

@ -1275,21 +1275,40 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
return opsRecovered; return opsRecovered;
} }
/** /** creates an empty index and translog and opens the engine **/
* After the store has been recovered, we need to start the engine in order to apply operations public void createIndexAndTranslog() throws IOException {
*/ assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.EMPTY_STORE;
public void performTranslogRecovery(boolean indexExists) throws IOException { assert shardRouting.primary() && shardRouting.isRelocationTarget() == false;
if (indexExists == false) { // note: these are set when recovering from the translog
// note: these are set when recovering from the translog final RecoveryState.Translog translogStats = recoveryState().getTranslog();
final RecoveryState.Translog translogStats = recoveryState().getTranslog(); translogStats.totalOperations(0);
translogStats.totalOperations(0); translogStats.totalOperationsOnStart(0);
translogStats.totalOperationsOnStart(0); globalCheckpointTracker.updateGlobalCheckpointOnReplica(SequenceNumbers.NO_OPS_PERFORMED, "index created");
} innerOpenEngineAndTranslog(EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG, false);
internalPerformTranslogRecovery(false, indexExists);
assert recoveryState.getStage() == RecoveryState.Stage.TRANSLOG : "TRANSLOG stage expected but was: " + recoveryState.getStage();
} }
private void internalPerformTranslogRecovery(boolean skipTranslogRecovery, boolean indexExists) throws IOException { /** opens the engine on top of the existing lucene engine but creates an empty translog **/
public void openIndexAndCreateTranslog(boolean forceNewHistoryUUID, long globalCheckpoint) throws IOException {
assert recoveryState.getRecoverySource().getType() != RecoverySource.Type.EMPTY_STORE &&
recoveryState.getRecoverySource().getType() != RecoverySource.Type.EXISTING_STORE;
SequenceNumbers.CommitInfo commitInfo = store.loadSeqNoInfo();
assert commitInfo.localCheckpoint >= globalCheckpoint :
"trying to create a shard whose local checkpoint [" + commitInfo.localCheckpoint + "] is < global checkpoint ["
+ globalCheckpoint + "]";
globalCheckpointTracker.updateGlobalCheckpointOnReplica(globalCheckpoint, "opening index with a new translog");
innerOpenEngineAndTranslog(EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG, forceNewHistoryUUID);
}
/**
* opens the engine on top of the existing lucene engine and translog.
* Operations from the translog will be replayed to bring lucene up to date.
**/
public void openIndexAndTranslog() throws IOException {
assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.EXISTING_STORE;
innerOpenEngineAndTranslog(EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG, false);
}
private void innerOpenEngineAndTranslog(final EngineConfig.OpenMode openMode, final boolean forceNewHistoryUUID) throws IOException {
if (state != IndexShardState.RECOVERING) { if (state != IndexShardState.RECOVERING) {
throw new IndexShardNotRecoveringException(shardId, state); throw new IndexShardNotRecoveringException(shardId, state);
} }
@ -1303,35 +1322,11 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
} }
} }
recoveryState.setStage(RecoveryState.Stage.TRANSLOG); recoveryState.setStage(RecoveryState.Stage.TRANSLOG);
final EngineConfig.OpenMode openMode;
/* by default we recover and index and replay the translog but if the index
* doesn't exist we create everything from the scratch. Yet, if the index
* doesn't exist we don't need to worry about the skipTranslogRecovery since
* there is no translog on a non-existing index.
* The skipTranslogRecovery invariant is used if we do remote recovery since
* there the translog isn't local but on the remote host, hence we can skip it.
*/
if (indexExists == false) {
openMode = EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG;
} else if (skipTranslogRecovery) {
openMode = EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG;
} else {
openMode = EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG;
}
assert indexExists == false || assertMaxUnsafeAutoIdInCommit(); assert openMode == EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG || assertMaxUnsafeAutoIdInCommit();
final EngineConfig config = newEngineConfig(openMode); final EngineConfig config = newEngineConfig(openMode, forceNewHistoryUUID);
if (openMode == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) {
// set global checkpoint before opening engine, to ensure that the global checkpoint written to the checkpoint file
// is not reset to the default value, which could prevent future sequence-number based recoveries or rolling back of Lucene.
globalCheckpointTracker.updateGlobalCheckpointOnReplica(
Translog.readGlobalCheckpoint(config.getTranslogConfig().getTranslogPath()),
"opening index and translog"
);
}
// we disable deletes since we allow for operations to be executed against the shard while recovering // we disable deletes since we allow for operations to be executed against the shard while recovering
// but we need to make sure we don't loose deletes until we are done recovering // but we need to make sure we don't loose deletes until we are done recovering
@ -1342,9 +1337,14 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
// We set active because we are now writing operations to the engine; this way, if we go idle after some time and become inactive, // We set active because we are now writing operations to the engine; this way, if we go idle after some time and become inactive,
// we still give sync'd flush a chance to run: // we still give sync'd flush a chance to run:
active.set(true); active.set(true);
// we have to set it before we recover from the translog as acquring a snapshot from the translog causes a sync which
// causes the global checkpoint to be pulled in.
globalCheckpointTracker.updateGlobalCheckpointOnReplica(getEngine().getTranslog().getLastSyncedGlobalCheckpoint(),
"read from translog");
newEngine.recoverFromTranslog(); newEngine.recoverFromTranslog();
} }
assertSequenceNumbersInCommit(); assertSequenceNumbersInCommit();
assert recoveryState.getStage() == RecoveryState.Stage.TRANSLOG : "TRANSLOG stage expected but was: " + recoveryState.getStage();
} }
private boolean assertSequenceNumbersInCommit() throws IOException { private boolean assertSequenceNumbersInCommit() throws IOException {
@ -1375,17 +1375,6 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
refreshListeners.setTranslog(newEngine.getTranslog()); refreshListeners.setTranslog(newEngine.getTranslog());
} }
/**
* After the store has been recovered, we need to start the engine. This method starts a new engine but skips
* the replay of the transaction log which is required in cases where we restore a previous index or recover from
* a remote peer.
*/
public void skipTranslogRecovery() throws IOException {
assert getEngineOrNull() == null : "engine was already created";
internalPerformTranslogRecovery(true, true);
assert recoveryState.getTranslog().recoveredOperations() == 0;
}
/** /**
* called if recovery has to be restarted after network error / delay ** * called if recovery has to be restarted after network error / delay **
*/ */
@ -2172,22 +2161,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
return mapperService.documentMapperWithAutoCreate(type); return mapperService.documentMapperWithAutoCreate(type);
} }
private EngineConfig newEngineConfig(EngineConfig.OpenMode openMode) { private EngineConfig newEngineConfig(EngineConfig.OpenMode openMode, final boolean forceNewHistoryUUID) {
Sort indexSort = indexSortSupplier.get(); Sort indexSort = indexSortSupplier.get();
final boolean forceNewHistoryUUID;
switch (shardRouting.recoverySource().getType()) {
case EXISTING_STORE:
case PEER:
forceNewHistoryUUID = false;
break;
case EMPTY_STORE:
case SNAPSHOT:
case LOCAL_SHARDS:
forceNewHistoryUUID = true;
break;
default:
throw new AssertionError("unknown recovery type: [" + shardRouting.recoverySource().getType() + "]");
}
return new EngineConfig(openMode, shardId, shardRouting.allocationId().getId(), return new EngineConfig(openMode, shardId, shardRouting.allocationId().getId(),
threadPool, indexSettings, warmer, store, indexSettings.getMergePolicy(), threadPool, indexSettings, warmer, store, indexSettings.getMergePolicy(),
mapperService.indexAnalyzer(), similarityService.similarity(mapperService), codecService, shardEventListener, mapperService.indexAnalyzer(), similarityService.similarity(mapperService), codecService, shardEventListener,

View File

@ -381,7 +381,7 @@ final class StoreRecovery {
recoveryState.getIndex().updateVersion(version); recoveryState.getIndex().updateVersion(version);
if (recoveryState.getRecoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS) { if (recoveryState.getRecoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS) {
assert indexShouldExists; assert indexShouldExists;
indexShard.skipTranslogRecovery(); indexShard.openIndexAndCreateTranslog(true, store.loadSeqNoInfo().localCheckpoint);
} else { } else {
// since we recover from local, just fill the files and size // since we recover from local, just fill the files and size
try { try {
@ -392,9 +392,12 @@ final class StoreRecovery {
} catch (IOException e) { } catch (IOException e) {
logger.debug("failed to list file details", e); logger.debug("failed to list file details", e);
} }
indexShard.performTranslogRecovery(indexShouldExists); if (indexShouldExists) {
assert indexShard.shardRouting.primary() : "only primary shards can recover from store"; indexShard.openIndexAndTranslog();
indexShard.getEngine().fillSeqNoGaps(indexShard.getPrimaryTerm()); indexShard.getEngine().fillSeqNoGaps(indexShard.getPrimaryTerm());
} else {
indexShard.createIndexAndTranslog();
}
} }
indexShard.finalizeRecovery(); indexShard.finalizeRecovery();
indexShard.postRecovery("post recovery from shard_store"); indexShard.postRecovery("post recovery from shard_store");
@ -435,7 +438,15 @@ final class StoreRecovery {
} }
final IndexId indexId = repository.getRepositoryData().resolveIndexId(indexName); final IndexId indexId = repository.getRepositoryData().resolveIndexId(indexName);
repository.restoreShard(indexShard, restoreSource.snapshot().getSnapshotId(), restoreSource.version(), indexId, snapshotShardId, indexShard.recoveryState()); repository.restoreShard(indexShard, restoreSource.snapshot().getSnapshotId(), restoreSource.version(), indexId, snapshotShardId, indexShard.recoveryState());
indexShard.skipTranslogRecovery(); final Store store = indexShard.store();
final long localCheckpoint;
store.incRef();
try {
localCheckpoint = store.loadSeqNoInfo().localCheckpoint;
} finally {
store.decRef();
}
indexShard.openIndexAndCreateTranslog(true, localCheckpoint);
assert indexShard.shardRouting.primary() : "only primary shards can recover from store"; assert indexShard.shardRouting.primary() : "only primary shards can recover from store";
indexShard.getEngine().fillSeqNoGaps(indexShard.getPrimaryTerm()); indexShard.getEngine().fillSeqNoGaps(indexShard.getPrimaryTerm());
indexShard.finalizeRecovery(); indexShard.finalizeRecovery();

View File

@ -53,7 +53,6 @@ import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
@ -215,10 +214,10 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
/** /**
* Loads the maximum sequence number and local checkpoint from the latest Lucene commit point. * Loads the maximum sequence number and local checkpoint from the latest Lucene commit point.
* *
* @return a tuple populated with the maximum sequence number and the local checkpoint * @return {@link SequenceNumbers.CommitInfo} containing information about the last commit
* @throws IOException if an I/O exception occurred reading the latest Lucene commit point from disk * @throws IOException if an I/O exception occurred reading the latest Lucene commit point from disk
*/ */
public Tuple<Long, Long> loadSeqNoInfo() throws IOException { public SequenceNumbers.CommitInfo loadSeqNoInfo() throws IOException {
final Map<String, String> userData = SegmentInfos.readLatestCommit(directory).getUserData(); final Map<String, String> userData = SegmentInfos.readLatestCommit(directory).getUserData();
return SequenceNumbers.loadSeqNoInfoFromLuceneCommit(userData.entrySet()); return SequenceNumbers.loadSeqNoInfoFromLuceneCommit(userData.entrySet());
} }

View File

@ -196,7 +196,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
final long generation = deletionPolicy.getMinTranslogGenerationForRecovery(); final long generation = deletionPolicy.getMinTranslogGenerationForRecovery();
logger.debug("wipe translog location - creating new translog, starting generation [{}]", generation); logger.debug("wipe translog location - creating new translog, starting generation [{}]", generation);
Files.createDirectories(location); Files.createDirectories(location);
final long initialGlobalCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO; final long initialGlobalCheckpoint = globalCheckpointSupplier.getAsLong();
final Checkpoint checkpoint = Checkpoint.emptyTranslogCheckpoint(0, generation, initialGlobalCheckpoint, generation); final Checkpoint checkpoint = Checkpoint.emptyTranslogCheckpoint(0, generation, initialGlobalCheckpoint, generation);
final Path checkpointFile = location.resolve(CHECKPOINT_FILE_NAME); final Path checkpointFile = location.resolve(CHECKPOINT_FILE_NAME);
Checkpoint.write(getChannelFactory(), checkpointFile, checkpoint, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW); Checkpoint.write(getChannelFactory(), checkpointFile, checkpoint, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW);

View File

@ -32,7 +32,6 @@ import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.ByteSizeValue;
@ -354,17 +353,15 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde
public static long getStartingSeqNo(final RecoveryTarget recoveryTarget) { public static long getStartingSeqNo(final RecoveryTarget recoveryTarget) {
try { try {
final long globalCheckpoint = Translog.readGlobalCheckpoint(recoveryTarget.translogLocation()); final long globalCheckpoint = Translog.readGlobalCheckpoint(recoveryTarget.translogLocation());
final Tuple<Long, Long> seqNoStats = recoveryTarget.store().loadSeqNoInfo(); final SequenceNumbers.CommitInfo seqNoStats = recoveryTarget.store().loadSeqNoInfo();
long maxSeqNo = seqNoStats.v1(); if (seqNoStats.maxSeqNo <= globalCheckpoint) {
long localCheckpoint = seqNoStats.v2(); assert seqNoStats.localCheckpoint <= globalCheckpoint;
if (maxSeqNo <= globalCheckpoint) {
assert localCheckpoint <= globalCheckpoint;
/* /*
* Commit point is good for sequence-number based recovery as the maximum sequence number included in it is below the global * Commit point is good for sequence-number based recovery as the maximum sequence number included in it is below the global
* checkpoint (i.e., it excludes any operations that may not be on the primary). Recovery will start at the first operation * checkpoint (i.e., it excludes any operations that may not be on the primary). Recovery will start at the first operation
* after the local checkpoint stored in the commit. * after the local checkpoint stored in the commit.
*/ */
return localCheckpoint + 1; return seqNoStats.localCheckpoint + 1;
} else { } else {
return SequenceNumbers.UNASSIGNED_SEQ_NO; return SequenceNumbers.UNASSIGNED_SEQ_NO;
} }

View File

@ -42,6 +42,7 @@ import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.mapper.MapperException; import org.elasticsearch.index.mapper.MapperException;
import org.elasticsearch.index.seqno.GlobalCheckpointTracker; import org.elasticsearch.index.seqno.GlobalCheckpointTracker;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardNotRecoveringException; import org.elasticsearch.index.shard.IndexShardNotRecoveringException;
import org.elasticsearch.index.shard.IndexShardState; import org.elasticsearch.index.shard.IndexShardState;
@ -363,7 +364,8 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
@Override @Override
public void prepareForTranslogOperations(int totalTranslogOps) throws IOException { public void prepareForTranslogOperations(int totalTranslogOps) throws IOException {
state().getTranslog().totalOperations(totalTranslogOps); state().getTranslog().totalOperations(totalTranslogOps);
indexShard().skipTranslogRecovery(); // TODO: take the local checkpoint from store as global checkpoint, once we know it's safe
indexShard().openIndexAndCreateTranslog(false, SequenceNumbers.UNASSIGNED_SEQ_NO);
} }
@Override @Override

View File

@ -19,8 +19,6 @@
package org.elasticsearch.index.shard; package org.elasticsearch.index.shard;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.IndexCommit;
@ -58,7 +56,6 @@ import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
@ -87,12 +84,8 @@ import org.elasticsearch.index.fielddata.IndexFieldDataService;
import org.elasticsearch.index.mapper.IdFieldMapper; import org.elasticsearch.index.mapper.IdFieldMapper;
import org.elasticsearch.index.mapper.MappedFieldType; import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.Mapping; import org.elasticsearch.index.mapper.Mapping;
import org.elasticsearch.index.mapper.ParseContext;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
import org.elasticsearch.index.mapper.SourceToParse; import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.mapper.Uid; import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.refresh.RefreshStats;
import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
import org.elasticsearch.index.store.Store; import org.elasticsearch.index.store.Store;
@ -1802,8 +1795,13 @@ public class IndexShardTests extends IndexShardTestCase {
} }
} }
})); }));
assertThat(target.getLocalCheckpoint(), equalTo(0L));
assertThat(target.seqNoStats().getMaxSeqNo(), equalTo(0L));
assertThat(target.getGlobalCheckpointTracker().getGlobalCheckpoint(), equalTo(0L));
IndexShardTestCase.updateRoutingEntry(target, routing.moveToStarted()); IndexShardTestCase.updateRoutingEntry(target, routing.moveToStarted());
assertThat(target.getGlobalCheckpointTracker().getTrackedLocalCheckpointForShard(
target.routingEntry().allocationId().getId()).getLocalCheckpoint(), equalTo(0L));
assertDocs(target, "0"); assertDocs(target, "0");
closeShards(source, target); closeShards(source, target);
@ -2094,7 +2092,7 @@ public class IndexShardTests extends IndexShardTestCase {
shard.prepareForIndexRecovery(); shard.prepareForIndexRecovery();
// Shard is still inactive since we haven't started recovering yet // Shard is still inactive since we haven't started recovering yet
assertFalse(shard.isActive()); assertFalse(shard.isActive());
shard.performTranslogRecovery(true); shard.openIndexAndTranslog();
// Shard should now be active since we did recover: // Shard should now be active since we did recover:
assertTrue(shard.isActive()); assertTrue(shard.isActive());
closeShards(shard); closeShards(shard);
@ -2243,9 +2241,10 @@ public class IndexShardTests extends IndexShardTestCase {
assertEquals(file.recovered(), file.length()); assertEquals(file.recovered(), file.length());
} }
} }
IndexShardTestCase.updateRoutingEntry(targetShard, ShardRoutingHelper.moveToStarted(targetShard.routingEntry()));
// check that local checkpoint of new primary is properly tracked after recovery // check that local checkpoint of new primary is properly tracked after recovery
assertThat(targetShard.getLocalCheckpoint(), equalTo(1L)); assertThat(targetShard.getLocalCheckpoint(), equalTo(1L));
assertThat(targetShard.getGlobalCheckpointTracker().getGlobalCheckpoint(), equalTo(1L));
IndexShardTestCase.updateRoutingEntry(targetShard, ShardRoutingHelper.moveToStarted(targetShard.routingEntry()));
assertThat(targetShard.getGlobalCheckpointTracker().getTrackedLocalCheckpointForShard( assertThat(targetShard.getGlobalCheckpointTracker().getTrackedLocalCheckpointForShard(
targetShard.routingEntry().allocationId().getId()).getLocalCheckpoint(), equalTo(1L)); targetShard.routingEntry().allocationId().getId()).getLocalCheckpoint(), equalTo(1L));
assertDocCount(targetShard, 2); assertDocCount(targetShard, 2);

View File

@ -3133,13 +3133,18 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
.setWaitForCompletion(true).execute().actionGet(); .setWaitForCompletion(true).execute().actionGet();
assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0)); assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0));
IndicesStatsResponse stats = client().admin().indices().prepareStats(indexName).clear().get();
ShardStats shardStats = stats.getShards()[0];
assertTrue(shardStats.getShardRouting().primary());
assertThat(shardStats.getSeqNoStats().getLocalCheckpoint(), equalTo(10L)); // 10 indexed docs and one "missing" op.
assertThat(shardStats.getSeqNoStats().getGlobalCheckpoint(), equalTo(10L));
logger.info("--> indexing some more"); logger.info("--> indexing some more");
for (int i = 10; i < 15; i++) { for (int i = 10; i < 15; i++) {
index(indexName, "_doc", Integer.toString(i), "foo", "bar" + i); index(indexName, "_doc", Integer.toString(i), "foo", "bar" + i);
} }
IndicesStatsResponse stats = client().admin().indices().prepareStats(indexName).clear().get(); stats = client().admin().indices().prepareStats(indexName).clear().get();
ShardStats shardStats = stats.getShards()[0]; shardStats = stats.getShards()[0];
assertTrue(shardStats.getShardRouting().primary()); assertTrue(shardStats.getShardRouting().primary());
assertThat(shardStats.getSeqNoStats().getLocalCheckpoint(), equalTo(15L)); // 15 indexed docs and one "missing" op. assertThat(shardStats.getSeqNoStats().getLocalCheckpoint(), equalTo(15L)); // 15 indexed docs and one "missing" op.
assertThat(shardStats.getSeqNoStats().getGlobalCheckpoint(), equalTo(15L)); assertThat(shardStats.getSeqNoStats().getGlobalCheckpoint(), equalTo(15L));