Introduce a History UUID as a requirement for ops based recovery (#26577)

The new ops based recovery, introduce as part of  #10708, is based on the assumption that all operations below the global checkpoint known to the replica do not need to be synced with the primary. This is based on the guarantee that all ops below it are available on primary and they are equal. Under normal operations this guarantee holds. Sadly, it can be violated when a primary is restored from an old snapshot. At the point the restore primary can miss operations below the replica's global checkpoint, or even worse may have total different operations at the same spot. This PR introduces the notion of a history uuid to be able to capture the difference with the restored primary (in a follow up PR).

The History UUID is generated by a primary when it is first created and is synced to the replicas which are recovered via a file based recovery. The PR adds a requirement to ops based recovery to make sure that the history uuid of the source and the target are equal. Under normal operations, all shard copies will stay with that history uuid for the rest of the index lifetime and thus this is a noop. However, it gives us a place to guarantee we fall back to file base syncing in special events like a restore from snapshot (to be done as a follow up) and when someone calls the truncate translog command which can go wrong when combined with primary recovery (this is done in this PR).

We considered in the past to use the translog uuid for this function (i.e., sync it across copies) and thus avoid adding an extra identifier. This idea was rejected as it removes the ability to verify that a specific translog really belongs to a specific lucene index. We also feel that having a history uuid will serve us well in the future.
This commit is contained in:
Boaz Leskes 2017-09-14 21:25:02 +03:00 committed by GitHub
parent e69c39a60f
commit 1ca0b5e9e4
21 changed files with 382 additions and 153 deletions

View File

@ -186,7 +186,7 @@ task verifyVersions {
* after the backport of the backcompat code is complete.
*/
allprojects {
ext.bwc_tests_enabled = true
ext.bwc_tests_enabled = false
}
task verifyBwcTestsEnabled {

View File

@ -95,6 +95,7 @@ import java.util.function.Function;
public abstract class Engine implements Closeable {
public static final String SYNC_COMMIT_ID = "sync_id";
public static final String HISTORY_UUID_KEY = "history_uuid";
protected final ShardId shardId;
protected final String allocationId;
@ -183,6 +184,9 @@ public abstract class Engine implements Closeable {
return new MergeStats();
}
/** returns the history uuid for the engine */
public abstract String getHistoryUUID();
/**
* A throttling class that can be activated, causing the
* {@code acquireThrottle} method to block on a lock when throttling

View File

@ -48,6 +48,7 @@ import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lucene.LoggerInfoStream;
import org.elasticsearch.common.lucene.Lucene;
@ -142,6 +143,8 @@ public class InternalEngine extends Engine {
private final CounterMetric numVersionLookups = new CounterMetric();
private final CounterMetric numIndexVersionsLookups = new CounterMetric();
@Nullable
private final String historyUUID;
public InternalEngine(EngineConfig engineConfig) throws EngineException {
super(engineConfig);
@ -174,15 +177,23 @@ public class InternalEngine extends Engine {
switch (openMode) {
case OPEN_INDEX_AND_TRANSLOG:
writer = createWriter(false);
String existingHistoryUUID = loadHistoryUUIDFromCommit(writer);
if (existingHistoryUUID == null) {
historyUUID = UUIDs.randomBase64UUID();
} else {
historyUUID = existingHistoryUUID;
}
final long globalCheckpoint = Translog.readGlobalCheckpoint(engineConfig.getTranslogConfig().getTranslogPath());
seqNoStats = store.loadSeqNoStats(globalCheckpoint);
break;
case OPEN_INDEX_CREATE_TRANSLOG:
writer = createWriter(false);
historyUUID = loadHistoryUUIDFromCommit(writer);
seqNoStats = store.loadSeqNoStats(SequenceNumbers.UNASSIGNED_SEQ_NO);
break;
case CREATE_INDEX_AND_TRANSLOG:
writer = createWriter(true);
historyUUID = UUIDs.randomBase64UUID();
seqNoStats = new SeqNoStats(
SequenceNumbers.NO_OPS_PERFORMED,
SequenceNumbers.NO_OPS_PERFORMED,
@ -342,6 +353,12 @@ public class InternalEngine extends Engine {
flush(true, true);
} else if (translog.isCurrent(translogGeneration) == false) {
commitIndexWriter(indexWriter, translog, lastCommittedSegmentInfos.getUserData().get(Engine.SYNC_COMMIT_ID));
refreshLastCommittedSegmentInfos();
} else if (lastCommittedSegmentInfos.getUserData().containsKey(HISTORY_UUID_KEY) == false) {
assert historyUUID != null;
// put the history uuid into the index
commitIndexWriter(indexWriter, translog, lastCommittedSegmentInfos.getUserData().get(Engine.SYNC_COMMIT_ID));
refreshLastCommittedSegmentInfos();
}
// clean up what's not needed
translog.trimUnreferencedReaders();
@ -382,6 +399,11 @@ public class InternalEngine extends Engine {
return translog;
}
@Override
public String getHistoryUUID() {
return historyUUID;
}
/**
* Reads the current stored translog ID from the IW commit data. If the id is not found, recommits the current
* translog id into lucene and returns null.
@ -401,6 +423,19 @@ public class InternalEngine extends Engine {
}
}
/**
* Reads the current stored history ID from the IW commit data. If the id is not found, returns null.
*/
@Nullable
private String loadHistoryUUIDFromCommit(final IndexWriter writer) throws IOException {
String uuid = commitDataAsMap(writer).get(HISTORY_UUID_KEY);
if (uuid == null) {
assert config().getIndexSettings().getIndexVersionCreated().before(Version.V_6_0_0_rc1) :
"index was created after 6_0_0_rc1 but has no history uuid";
}
return uuid;
}
private SearcherManager createSearcherManager() throws EngineException {
boolean success = false;
SearcherManager searcherManager = null;
@ -1312,30 +1347,8 @@ public class InternalEngine extends Engine {
} catch (Exception e) {
throw new FlushFailedEngineException(shardId, e);
}
/*
* we have to inc-ref the store here since if the engine is closed by a tragic event
* we don't acquire the write lock and wait until we have exclusive access. This might also
* dec the store reference which can essentially close the store and unless we can inc the reference
* we can't use it.
*/
store.incRef();
try {
// reread the last committed segment infos
lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo();
} catch (Exception e) {
if (isClosed.get() == false) {
try {
logger.warn("failed to read latest segment infos on flush", e);
} catch (Exception inner) {
e.addSuppressed(inner);
}
if (Lucene.isCorruptionException(e)) {
throw new FlushFailedEngineException(shardId, e);
}
}
} finally {
store.decRef();
}
refreshLastCommittedSegmentInfos();
}
newCommitId = lastCommittedSegmentInfos.getId();
} catch (FlushFailedEngineException ex) {
@ -1353,6 +1366,33 @@ public class InternalEngine extends Engine {
return new CommitId(newCommitId);
}
private void refreshLastCommittedSegmentInfos() {
/*
* we have to inc-ref the store here since if the engine is closed by a tragic event
* we don't acquire the write lock and wait until we have exclusive access. This might also
* dec the store reference which can essentially close the store and unless we can inc the reference
* we can't use it.
*/
store.incRef();
try {
// reread the last committed segment infos
lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo();
} catch (Exception e) {
if (isClosed.get() == false) {
try {
logger.warn("failed to read latest segment infos on flush", e);
} catch (Exception inner) {
e.addSuppressed(inner);
}
if (Lucene.isCorruptionException(e)) {
throw new FlushFailedEngineException(shardId, e);
}
}
} finally {
store.decRef();
}
}
@Override
public void rollTranslogGeneration() throws EngineException {
try (ReleasableLock ignored = readLock.acquire()) {
@ -1874,7 +1914,7 @@ public class InternalEngine extends Engine {
* {@link IndexWriter#commit()} call flushes all documents, we defer computation of the maximum sequence number to the time
* of invocation of the commit data iterator (which occurs after all documents have been flushed to Lucene).
*/
final Map<String, String> commitData = new HashMap<>(5);
final Map<String, String> commitData = new HashMap<>(6);
commitData.put(Translog.TRANSLOG_GENERATION_KEY, translogFileGeneration);
commitData.put(Translog.TRANSLOG_UUID_KEY, translogUUID);
commitData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, localCheckpointValue);
@ -1883,6 +1923,9 @@ public class InternalEngine extends Engine {
}
commitData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(seqNoService().getMaxSeqNo()));
commitData.put(MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID, Long.toString(maxUnsafeAutoIdTimestamp.get()));
if (historyUUID != null) {
commitData.put(HISTORY_UUID_KEY, historyUUID);
}
logger.trace("committing writer with commit data [{}]", commitData);
return commitData.entrySet().iterator();
});
@ -1992,7 +2035,7 @@ public class InternalEngine extends Engine {
* Gets the commit data from {@link IndexWriter} as a map.
*/
private static Map<String, String> commitDataAsMap(final IndexWriter indexWriter) {
Map<String, String> commitData = new HashMap<>(5);
Map<String, String> commitData = new HashMap<>(6);
for (Map.Entry<String, String> entry : indexWriter.getLiveCommitData()) {
commitData.put(entry.getKey(), entry.getValue());
}

View File

@ -1585,6 +1585,10 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
return getEngine().getTranslog();
}
public String getHistoryUUID() {
return getEngine().getHistoryUUID();
}
public IndexEventListener getIndexEventListener() {
return indexEventListener;
}

View File

@ -35,10 +35,12 @@ import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.RecoverySource.SnapshotRecoverySource;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.engine.InternalEngine;
import org.elasticsearch.index.mapper.MapperService;
@ -162,10 +164,11 @@ final class StoreRecovery {
* document-level semantics.
*/
writer.setLiveCommitData(() -> {
final HashMap<String, String> liveCommitData = new HashMap<>(2);
final HashMap<String, String> liveCommitData = new HashMap<>(4);
liveCommitData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(maxSeqNo));
liveCommitData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(maxSeqNo));
liveCommitData.put(InternalEngine.MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID, Long.toString(maxUnsafeAutoIdTimestamp));
liveCommitData.put(Engine.HISTORY_UUID_KEY, UUIDs.randomBase64UUID());
return liveCommitData.entrySet().iterator();
});
writer.commit();

View File

@ -79,6 +79,7 @@ import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
import java.io.Closeable;
import java.io.EOFException;
@ -1027,6 +1028,20 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
return commitUserData;
}
/**
* returns the history uuid the store points at, or null if not existant.
*/
public String getHistoryUUID() {
return commitUserData.get(Engine.HISTORY_UUID_KEY);
}
/**
* returns the translog uuid the store points at
*/
public String getTranslogUUID() {
return commitUserData.get(Translog.TRANSLOG_UUID_KEY);
}
/**
* Returns true iff this metadata contains the given file.
*/

View File

@ -25,6 +25,8 @@ import joptsimple.OptionSpec;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.NoMergePolicy;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.store.Lock;
@ -37,9 +39,11 @@ import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cli.EnvironmentAwareCommand;
import org.elasticsearch.cli.Terminal;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.io.PathUtils;
import org.elasticsearch.env.Environment;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.seqno.SequenceNumbers;
import java.io.IOException;
@ -51,6 +55,7 @@ import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.nio.file.StandardOpenOption;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@ -101,64 +106,82 @@ public class TruncateTranslogCommand extends EnvironmentAwareCommand {
if (Files.exists(idxLocation) == false || Files.isDirectory(idxLocation) == false) {
throw new ElasticsearchException("unable to find a shard at [" + idxLocation + "], which must exist and be a directory");
}
try (Directory dir = FSDirectory.open(idxLocation, NativeFSLockFactory.INSTANCE)) {
final String historyUUID = UUIDs.randomBase64UUID();
final Map<String, String> commitData;
// Hold the lock open for the duration of the tool running
try (Lock writeLock = dir.obtainLock(IndexWriter.WRITE_LOCK_NAME)) {
Set<Path> translogFiles;
try {
terminal.println("Checking existing translog files");
translogFiles = filesInDirectory(translogPath);
} catch (IOException e) {
terminal.println("encountered IOException while listing directory, aborting...");
throw new ElasticsearchException("failed to find existing translog files", e);
}
// Hold the lock open for the duration of the tool running
try (Directory dir = FSDirectory.open(idxLocation, NativeFSLockFactory.INSTANCE);
Lock writeLock = dir.obtainLock(IndexWriter.WRITE_LOCK_NAME)) {
Set<Path> translogFiles;
try {
terminal.println("Checking existing translog files");
translogFiles = filesInDirectory(translogPath);
} catch (IOException e) {
terminal.println("encountered IOException while listing directory, aborting...");
throw new ElasticsearchException("failed to find existing translog files", e);
}
// Warn about ES being stopped and files being deleted
warnAboutDeletingFiles(terminal, translogFiles, batch);
// Warn about ES being stopped and files being deleted
warnAboutDeletingFiles(terminal, translogFiles, batch);
List<IndexCommit> commits;
try {
terminal.println("Reading translog UUID information from Lucene commit from shard at [" + idxLocation + "]");
commits = DirectoryReader.listCommits(dir);
} catch (IndexNotFoundException infe) {
throw new ElasticsearchException("unable to find a valid shard at [" + idxLocation + "]", infe);
}
List<IndexCommit> commits;
try {
terminal.println("Reading translog UUID information from Lucene commit from shard at [" + idxLocation + "]");
commits = DirectoryReader.listCommits(dir);
} catch (IndexNotFoundException infe) {
throw new ElasticsearchException("unable to find a valid shard at [" + idxLocation + "]", infe);
}
// Retrieve the generation and UUID from the existing data
Map<String, String> commitData = commits.get(commits.size() - 1).getUserData();
String translogGeneration = commitData.get(Translog.TRANSLOG_GENERATION_KEY);
String translogUUID = commitData.get(Translog.TRANSLOG_UUID_KEY);
if (translogGeneration == null || translogUUID == null) {
throw new ElasticsearchException("shard must have a valid translog generation and UUID but got: [{}] and: [{}]",
// Retrieve the generation and UUID from the existing data
commitData = commits.get(commits.size() - 1).getUserData();
String translogGeneration = commitData.get(Translog.TRANSLOG_GENERATION_KEY);
String translogUUID = commitData.get(Translog.TRANSLOG_UUID_KEY);
if (translogGeneration == null || translogUUID == null) {
throw new ElasticsearchException("shard must have a valid translog generation and UUID but got: [{}] and: [{}]",
translogGeneration, translogUUID);
}
terminal.println("Translog Generation: " + translogGeneration);
terminal.println("Translog UUID : " + translogUUID);
terminal.println("History UUID : " + historyUUID);
Path tempEmptyCheckpoint = translogPath.resolve("temp-" + Translog.CHECKPOINT_FILE_NAME);
Path realEmptyCheckpoint = translogPath.resolve(Translog.CHECKPOINT_FILE_NAME);
Path tempEmptyTranslog = translogPath.resolve("temp-" + Translog.TRANSLOG_FILE_PREFIX +
translogGeneration + Translog.TRANSLOG_FILE_SUFFIX);
Path realEmptyTranslog = translogPath.resolve(Translog.TRANSLOG_FILE_PREFIX +
translogGeneration + Translog.TRANSLOG_FILE_SUFFIX);
// Write empty checkpoint and translog to empty files
long gen = Long.parseLong(translogGeneration);
int translogLen = writeEmptyTranslog(tempEmptyTranslog, translogUUID);
writeEmptyCheckpoint(tempEmptyCheckpoint, translogLen, gen);
terminal.println("Removing existing translog files");
IOUtils.rm(translogFiles.toArray(new Path[]{}));
terminal.println("Creating new empty checkpoint at [" + realEmptyCheckpoint + "]");
Files.move(tempEmptyCheckpoint, realEmptyCheckpoint, StandardCopyOption.ATOMIC_MOVE);
terminal.println("Creating new empty translog at [" + realEmptyTranslog + "]");
Files.move(tempEmptyTranslog, realEmptyTranslog, StandardCopyOption.ATOMIC_MOVE);
// Fsync the translog directory after rename
IOUtils.fsync(translogPath, true);
}
terminal.println("Translog Generation: " + translogGeneration);
terminal.println("Translog UUID : " + translogUUID);
Path tempEmptyCheckpoint = translogPath.resolve("temp-" + Translog.CHECKPOINT_FILE_NAME);
Path realEmptyCheckpoint = translogPath.resolve(Translog.CHECKPOINT_FILE_NAME);
Path tempEmptyTranslog = translogPath.resolve("temp-" + Translog.TRANSLOG_FILE_PREFIX +
translogGeneration + Translog.TRANSLOG_FILE_SUFFIX);
Path realEmptyTranslog = translogPath.resolve(Translog.TRANSLOG_FILE_PREFIX +
translogGeneration + Translog.TRANSLOG_FILE_SUFFIX);
// Write empty checkpoint and translog to empty files
long gen = Long.parseLong(translogGeneration);
int translogLen = writeEmptyTranslog(tempEmptyTranslog, translogUUID);
writeEmptyCheckpoint(tempEmptyCheckpoint, translogLen, gen);
terminal.println("Removing existing translog files");
IOUtils.rm(translogFiles.toArray(new Path[]{}));
terminal.println("Creating new empty checkpoint at [" + realEmptyCheckpoint + "]");
Files.move(tempEmptyCheckpoint, realEmptyCheckpoint, StandardCopyOption.ATOMIC_MOVE);
terminal.println("Creating new empty translog at [" + realEmptyTranslog + "]");
Files.move(tempEmptyTranslog, realEmptyTranslog, StandardCopyOption.ATOMIC_MOVE);
// Fsync the translog directory after rename
IOUtils.fsync(translogPath, true);
terminal.println("Marking index with the new history uuid");
// commit the new histroy id
IndexWriterConfig iwc = new IndexWriterConfig(null)
.setCommitOnClose(false)
// we don't want merges to happen here - we call maybe merge on the engine
// later once we stared it up otherwise we would need to wait for it here
// we also don't specify a codec here and merges should use the engines for this index
.setMergePolicy(NoMergePolicy.INSTANCE)
.setOpenMode(IndexWriterConfig.OpenMode.APPEND);
try (IndexWriter writer = new IndexWriter(dir, iwc)) {
Map<String, String> newCommitData = new HashMap<>(commitData);
newCommitData.put(Engine.HISTORY_UUID_KEY, historyUUID);
writer.setLiveCommitData(newCommitData.entrySet());
writer.commit();
}
} catch (LockObtainFailedException lofe) {
throw new ElasticsearchException("Failed to lock shard's directory at [" + idxLocation + "], is Elasticsearch still running?");
}

View File

@ -31,6 +31,7 @@ import org.apache.lucene.store.RateLimiter;
import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
@ -147,8 +148,8 @@ public class RecoverySourceHandler {
final Translog translog = shard.getTranslog();
final long startingSeqNo;
boolean isSequenceNumberBasedRecoveryPossible = request.startingSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO &&
isTranslogReadyForSequenceNumberBasedRecovery();
final boolean isSequenceNumberBasedRecoveryPossible = request.startingSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO &&
isTargetSameHistory() && isTranslogReadyForSequenceNumberBasedRecovery();
if (isSequenceNumberBasedRecoveryPossible) {
logger.trace("performing sequence numbers based recovery. starting at [{}]", request.startingSeqNo());
@ -198,6 +199,13 @@ public class RecoverySourceHandler {
return response;
}
private boolean isTargetSameHistory() {
final String targetHistoryUUID = request.metadataSnapshot().getHistoryUUID();
assert targetHistoryUUID != null || shard.indexSettings().getIndexVersionCreated().before(Version.V_6_0_0_rc1) :
"incoming target history N/A but index was created after or on 6.0.0-rc1";
return targetHistoryUUID != null && targetHistoryUUID.equals(shard.getHistoryUUID());
}
private void runUnderPrimaryPermit(CancellableThreads.Interruptable runnable) {
cancellableThreads.execute(() -> {
final PlainActionFuture<Releasable> onAcquired = new PlainActionFuture<>();

View File

@ -75,6 +75,8 @@ public class StartRecoveryRequest extends TransportRequest {
this.metadataSnapshot = metadataSnapshot;
this.primaryRelocation = primaryRelocation;
this.startingSeqNo = startingSeqNo;
assert startingSeqNo == SequenceNumbers.UNASSIGNED_SEQ_NO || metadataSnapshot.getHistoryUUID() != null :
"starting seq no is set but not history uuid";
}
public long recoveryId() {

View File

@ -2810,6 +2810,44 @@ public class InternalEngineTests extends ESTestCase {
assertVisibleCount(engine, numDocs, false);
}
public void testRecoverFromStoreSetsHistoryUUIDIfNeeded() throws IOException {
final int numDocs = randomIntBetween(0, 3);
for (int i = 0; i < numDocs; i++) {
ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), new BytesArray("{}"), null);
Engine.Index firstIndexRequest = new Engine.Index(newUid(doc), doc, SequenceNumbers.UNASSIGNED_SEQ_NO, 0, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, System.nanoTime(), -1, false);
Engine.IndexResult index = engine.index(firstIndexRequest);
assertThat(index.getVersion(), equalTo(1L));
}
assertVisibleCount(engine, numDocs);
engine.close();
IndexWriterConfig iwc = new IndexWriterConfig(null)
.setCommitOnClose(false)
// we don't want merges to happen here - we call maybe merge on the engine
// later once we stared it up otherwise we would need to wait for it here
// we also don't specify a codec here and merges should use the engines for this index
.setMergePolicy(NoMergePolicy.INSTANCE)
.setOpenMode(IndexWriterConfig.OpenMode.APPEND);
try (IndexWriter writer = new IndexWriter(store.directory(), iwc)) {
Map<String, String> newCommitData = new HashMap<>();
for (Map.Entry<String, String> entry: writer.getLiveCommitData()) {
if (entry.getKey().equals(Engine.HISTORY_UUID_KEY) == false) {
newCommitData.put(entry.getKey(), entry.getValue());
}
}
writer.setLiveCommitData(newCommitData.entrySet());
writer.commit();
}
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("test", Settings.builder()
.put(defaultSettings.getSettings())
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.V_6_0_0_beta1)
.build());
engine = createEngine(indexSettings, store, primaryTranslogDir, newMergePolicy(), null);
assertVisibleCount(engine, numDocs, false);
assertThat(engine.getHistoryUUID(), notNullValue());
}
public void testShardNotAvailableExceptionWhenEngineClosedConcurrently() throws IOException, InterruptedException {
AtomicReference<Exception> exception = new AtomicReference<>();
String operation = randomFrom("optimize", "refresh", "flush");

View File

@ -315,7 +315,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
return routingTable.build();
}
synchronized boolean removeReplica(IndexShard replica) throws IOException {
public synchronized boolean removeReplica(IndexShard replica) throws IOException {
final boolean removed = replicas.remove(replica);
if (removed) {
updateAllocationIDsOnPrimary();

View File

@ -21,6 +21,7 @@ package org.elasticsearch.index.translog;
import org.apache.lucene.store.ByteArrayDataOutput;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.lease.Releasable;
@ -172,13 +173,14 @@ public class TranslogDeletionPolicyTests extends ESTestCase {
TranslogWriter writer = null;
List<TranslogReader> readers = new ArrayList<>();
final int numberOfReaders = randomIntBetween(0, 10);
final String translogUUID = UUIDs.randomBase64UUID(random());
for (long gen = 1; gen <= numberOfReaders + 1; gen++) {
if (writer != null) {
final TranslogReader reader = Mockito.spy(writer.closeIntoReader());
Mockito.doReturn(writer.getLastModifiedTime()).when(reader).getLastModifiedTime();
readers.add(reader);
}
writer = TranslogWriter.create(new ShardId("index", "uuid", 0), "translog_uuid", gen,
writer = TranslogWriter.create(new ShardId("index", "uuid", 0), translogUUID, gen,
tempDir.resolve(Translog.getFilename(gen)), FileChannel::open, TranslogConfig.DEFAULT_BUFFER_SIZE, () -> 1L, 1L, () -> 1L
);
writer = Mockito.spy(writer);

View File

@ -89,7 +89,7 @@ public class TranslogVersionTests extends ESTestCase {
final long minSeqNo = SequenceNumbers.NO_OPS_PERFORMED;
final long maxSeqNo = SequenceNumbers.NO_OPS_PERFORMED;
final Checkpoint checkpoint =
new Checkpoint(Files.size(path), 1, id, minSeqNo, maxSeqNo, SequenceNumbers.UNASSIGNED_SEQ_NO, id);
new Checkpoint(Files.size(path), 1, id, minSeqNo, maxSeqNo, SequenceNumbers.UNASSIGNED_SEQ_NO, id);
return TranslogReader.open(channel, path, checkpoint, null);
}
}

View File

@ -77,7 +77,6 @@ import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.notNullValue;
@ -144,16 +143,12 @@ public class TruncateTranslogIT extends ESIntegTestCase {
}
}
final boolean expectSeqNoRecovery;
if (randomBoolean() && numDocsToTruncate > 0) {
// flush the replica, so it will have more docs than what the primary will have
Index index = resolveIndex("test");
IndexShard replica = internalCluster().getInstance(IndicesService.class, replicaNode).getShardOrNull(new ShardId(index, 0));
replica.flush(new FlushRequest());
expectSeqNoRecovery = false;
logger.info("--> ops based recovery disabled by flushing replica");
} else {
expectSeqNoRecovery = true;
logger.info("--> performed extra flushing on replica");
}
// shut down the replica node to be tested later
@ -219,8 +214,7 @@ public class TruncateTranslogIT extends ESIntegTestCase {
final RecoveryResponse recoveryResponse = client().admin().indices().prepareRecoveries("test").setActiveOnly(false).get();
final RecoveryState replicaRecoveryState = recoveryResponse.shardRecoveryStates().get("test").stream()
.filter(recoveryState -> recoveryState.getPrimary() == false).findFirst().get();
assertThat(replicaRecoveryState.getIndex().toString(), replicaRecoveryState.getIndex().recoveredFileCount(),
expectSeqNoRecovery ? equalTo(0) : greaterThan(0));
assertThat(replicaRecoveryState.getIndex().toString(), replicaRecoveryState.getIndex().recoveredFileCount(), greaterThan(0));
}
public void testCorruptTranslogTruncationOfReplica() throws Exception {

View File

@ -21,8 +21,10 @@ package org.elasticsearch.indices.recovery;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardTestCase;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.transport.TransportService;
@ -39,7 +41,8 @@ public class PeerRecoverySourceServiceTests extends IndexShardTestCase {
mock(TransportService.class), mock(IndicesService.class),
new RecoverySettings(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)));
StartRecoveryRequest startRecoveryRequest = new StartRecoveryRequest(primary.shardId(), randomAlphaOfLength(10),
getFakeDiscoNode("source"), getFakeDiscoNode("target"), null, randomBoolean(), randomLong(), randomLong());
getFakeDiscoNode("source"), getFakeDiscoNode("target"), Store.MetadataSnapshot.EMPTY, randomBoolean(), randomLong(),
SequenceNumbers.UNASSIGNED_SEQ_NO);
RecoverySourceHandler handler = peerRecoverySourceService.ongoingRecoveries.addNewRecovery(startRecoveryRequest, primary);
DelayRecoveryException delayRecoveryException = expectThrows(DelayRecoveryException.class,
() -> peerRecoverySourceService.ongoingRecoveries.addNewRecovery(startRecoveryRequest, primary));

View File

@ -73,11 +73,11 @@ public class PeerRecoveryTargetServiceTests extends IndexShardTestCase {
translogLocation.set(replica.getTranslog().location());
final Translog translog = replica.getTranslog();
final String translogUUID = translog.getTranslogUUID();
assertThat(PeerRecoveryTargetService.getStartingSeqNo(recoveryTarget), equalTo(SequenceNumbers.UNASSIGNED_SEQ_NO));
final Translog translog = replica.getTranslog();
translogLocation.set(
writeTranslog(replica.shardId(), translog.getTranslogUUID(), translog.currentFileGeneration(), maxSeqNo - 1));
translogLocation.set(writeTranslog(replica.shardId(), translogUUID, translog.currentFileGeneration(), maxSeqNo - 1));
// commit is good, global checkpoint is at least max *committed* which is NO_OPS_PERFORMED
assertThat(PeerRecoveryTargetService.getStartingSeqNo(recoveryTarget), equalTo(0L));
@ -89,8 +89,7 @@ public class PeerRecoveryTargetServiceTests extends IndexShardTestCase {
// commit is not good, global checkpoint is below max
assertThat(PeerRecoveryTargetService.getStartingSeqNo(recoveryTarget), equalTo(SequenceNumbers.UNASSIGNED_SEQ_NO));
translogLocation.set(
writeTranslog(replica.shardId(), translog.getTranslogUUID(), translog.currentFileGeneration(), maxSeqNo));
translogLocation.set(writeTranslog(replica.shardId(), translogUUID, translog.currentFileGeneration(), maxSeqNo));
// commit is good, global checkpoint is above max
assertThat(PeerRecoveryTargetService.getStartingSeqNo(recoveryTarget), equalTo(localCheckpoint + 1));

View File

@ -38,6 +38,7 @@ import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.FileSystemUtils;
@ -96,17 +97,9 @@ public class RecoverySourceHandlerTests extends ESTestCase {
public void testSendFiles() throws Throwable {
Settings settings = Settings.builder().put("indices.recovery.concurrent_streams", 1).
put("indices.recovery.concurrent_small_file_streams", 1).build();
put("indices.recovery.concurrent_small_file_streams", 1).build();
final RecoverySettings recoverySettings = new RecoverySettings(settings, service);
final StartRecoveryRequest request = new StartRecoveryRequest(
shardId,
null,
new DiscoveryNode("b", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT),
new DiscoveryNode("b", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT),
null,
randomBoolean(),
randomNonNegativeLong(),
randomBoolean() ? SequenceNumbers.UNASSIGNED_SEQ_NO : randomNonNegativeLong());
final StartRecoveryRequest request = getStartRecoveryRequest();
Store store = newStore(createTempDir());
RecoverySourceHandler handler = new RecoverySourceHandler(null, null, request,
recoverySettings.getChunkSize().bytesAsInt(), Settings.EMPTY);
@ -151,19 +144,26 @@ public class RecoverySourceHandlerTests extends ESTestCase {
IOUtils.close(reader, store, targetStore);
}
public void testSendSnapshotSendsOps() throws IOException {
final RecoverySettings recoverySettings = new RecoverySettings(Settings.EMPTY, service);
final int fileChunkSizeInBytes = recoverySettings.getChunkSize().bytesAsInt();
final long startingSeqNo = randomBoolean() ? SequenceNumbers.UNASSIGNED_SEQ_NO : randomIntBetween(0, 16);
final StartRecoveryRequest request = new StartRecoveryRequest(
public StartRecoveryRequest getStartRecoveryRequest() throws IOException {
Store.MetadataSnapshot metadataSnapshot = randomBoolean() ? Store.MetadataSnapshot.EMPTY :
new Store.MetadataSnapshot(Collections.emptyMap(),
Collections.singletonMap(Engine.HISTORY_UUID_KEY, UUIDs.randomBase64UUID()), randomIntBetween(0, 100));
return new StartRecoveryRequest(
shardId,
null,
new DiscoveryNode("b", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT),
new DiscoveryNode("b", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT),
null,
metadataSnapshot,
randomBoolean(),
randomNonNegativeLong(),
randomBoolean() ? SequenceNumbers.UNASSIGNED_SEQ_NO : randomNonNegativeLong());
randomBoolean() || metadataSnapshot.getHistoryUUID() == null ?
SequenceNumbers.UNASSIGNED_SEQ_NO : randomNonNegativeLong());
}
public void testSendSnapshotSendsOps() throws IOException {
final RecoverySettings recoverySettings = new RecoverySettings(Settings.EMPTY, service);
final int fileChunkSizeInBytes = recoverySettings.getChunkSize().bytesAsInt();
final StartRecoveryRequest request = getStartRecoveryRequest();
final IndexShard shard = mock(IndexShard.class);
when(shard.state()).thenReturn(IndexShardState.STARTED);
final RecoveryTargetHandler recoveryTarget = mock(RecoveryTargetHandler.class);
@ -181,6 +181,7 @@ public class RecoverySourceHandlerTests extends ESTestCase {
operations.add(new Translog.Index(index, new Engine.IndexResult(1, i - initialNumberOfDocs, true)));
}
operations.add(null);
final long startingSeqNo = randomBoolean() ? SequenceNumbers.UNASSIGNED_SEQ_NO : randomIntBetween(0, 16);
RecoverySourceHandler.SendSnapshotResult result = handler.sendSnapshot(startingSeqNo, new Translog.Snapshot() {
@Override
public void close() {
@ -226,18 +227,9 @@ public class RecoverySourceHandlerTests extends ESTestCase {
public void testHandleCorruptedIndexOnSendSendFiles() throws Throwable {
Settings settings = Settings.builder().put("indices.recovery.concurrent_streams", 1).
put("indices.recovery.concurrent_small_file_streams", 1).build();
put("indices.recovery.concurrent_small_file_streams", 1).build();
final RecoverySettings recoverySettings = new RecoverySettings(settings, service);
final StartRecoveryRequest request =
new StartRecoveryRequest(
shardId,
null,
new DiscoveryNode("b", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT),
new DiscoveryNode("b", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT),
null,
randomBoolean(),
randomNonNegativeLong(),
randomBoolean() ? SequenceNumbers.UNASSIGNED_SEQ_NO : 0L);
final StartRecoveryRequest request = getStartRecoveryRequest();
Path tempDir = createTempDir();
Store store = newStore(tempDir, false);
AtomicBoolean failedEngine = new AtomicBoolean(false);
@ -268,8 +260,8 @@ public class RecoverySourceHandlerTests extends ESTestCase {
}
CorruptionUtils.corruptFile(random(), FileSystemUtils.files(tempDir, (p) ->
(p.getFileName().toString().equals("write.lock") ||
p.getFileName().toString().startsWith("extra")) == false));
(p.getFileName().toString().equals("write.lock") ||
p.getFileName().toString().startsWith("extra")) == false));
Store targetStore = newStore(createTempDir(), false);
try {
handler.sendFiles(store, metas.toArray(new StoreFileMetaData[0]), (md) -> {
@ -296,18 +288,9 @@ public class RecoverySourceHandlerTests extends ESTestCase {
public void testHandleExceptinoOnSendSendFiles() throws Throwable {
Settings settings = Settings.builder().put("indices.recovery.concurrent_streams", 1).
put("indices.recovery.concurrent_small_file_streams", 1).build();
put("indices.recovery.concurrent_small_file_streams", 1).build();
final RecoverySettings recoverySettings = new RecoverySettings(settings, service);
final StartRecoveryRequest request =
new StartRecoveryRequest(
shardId,
null,
new DiscoveryNode("b", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT),
new DiscoveryNode("b", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT),
null,
randomBoolean(),
randomNonNegativeLong(),
randomBoolean() ? SequenceNumbers.UNASSIGNED_SEQ_NO : 0L);
final StartRecoveryRequest request = getStartRecoveryRequest();
Path tempDir = createTempDir();
Store store = newStore(tempDir, false);
AtomicBoolean failedEngine = new AtomicBoolean(false);
@ -363,17 +346,7 @@ public class RecoverySourceHandlerTests extends ESTestCase {
public void testThrowExceptionOnPrimaryRelocatedBeforePhase1Started() throws IOException {
final RecoverySettings recoverySettings = new RecoverySettings(Settings.EMPTY, service);
final boolean attemptSequenceNumberBasedRecovery = randomBoolean();
final StartRecoveryRequest request =
new StartRecoveryRequest(
shardId,
null,
new DiscoveryNode("b", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT),
new DiscoveryNode("b", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT),
null,
false,
randomNonNegativeLong(),
attemptSequenceNumberBasedRecovery ? randomNonNegativeLong() : SequenceNumbers.UNASSIGNED_SEQ_NO);
final StartRecoveryRequest request = getStartRecoveryRequest();
final IndexShard shard = mock(IndexShard.class);
when(shard.seqNoStats()).thenReturn(mock(SeqNoStats.class));
when(shard.segmentStats(anyBoolean())).thenReturn(mock(SegmentsStats.class));

View File

@ -19,22 +19,35 @@
package org.elasticsearch.indices.recovery;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.NoMergePolicy;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.replication.ESIndexLevelReplicationTestCase;
import org.elasticsearch.index.replication.RecoveryDuringReplicationTests;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogConfig;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import static org.elasticsearch.index.translog.TranslogDeletionPolicyTests.createTranslogDeletionPolicy;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.not;
public class RecoveryTests extends ESIndexLevelReplicationTestCase {
@ -54,7 +67,6 @@ public class RecoveryTests extends ESIndexLevelReplicationTestCase {
}
}
public void testRetentionPolicyChangeDuringRecovery() throws Exception {
try (ReplicationGroup shards = createGroup(0)) {
shards.startPrimary();
@ -132,4 +144,67 @@ public class RecoveryTests extends ESIndexLevelReplicationTestCase {
assertThat(newReplica.getTranslog().totalOperations(), equalTo(translogOps));
}
}
public void testDifferentHistoryUUIDDisablesOPsRecovery() throws Exception {
try (ReplicationGroup shards = createGroup(1)) {
shards.startAll();
// index some shared docs
final int flushedDocs = 10;
final int nonFlushedDocs = randomIntBetween(0, 10);
final int numDocs = flushedDocs + nonFlushedDocs;
shards.indexDocs(flushedDocs);
shards.flush();
shards.indexDocs(nonFlushedDocs);
IndexShard replica = shards.getReplicas().get(0);
final String translogUUID = replica.getTranslog().getTranslogUUID();
final String historyUUID = replica.getHistoryUUID();
Translog.TranslogGeneration translogGeneration = replica.getTranslog().getGeneration();
shards.removeReplica(replica);
replica.close("test", false);
IndexWriterConfig iwc = new IndexWriterConfig(null)
.setCommitOnClose(false)
// we don't want merges to happen here - we call maybe merge on the engine
// later once we stared it up otherwise we would need to wait for it here
// we also don't specify a codec here and merges should use the engines for this index
.setMergePolicy(NoMergePolicy.INSTANCE)
.setOpenMode(IndexWriterConfig.OpenMode.APPEND);
Map<String, String> userData = new HashMap<>(replica.store().readLastCommittedSegmentsInfo().getUserData());
final String translogUUIDtoUse;
final long translogGenToUse;
final String historyUUIDtoUse = UUIDs.randomBase64UUID(random());
if (randomBoolean()) {
// create a new translog
final TranslogConfig translogConfig =
new TranslogConfig(replica.shardId(), replica.shardPath().resolveTranslog(), replica.indexSettings(),
BigArrays.NON_RECYCLING_INSTANCE);
try (Translog translog = new Translog(translogConfig, null, createTranslogDeletionPolicy(), () -> flushedDocs)) {
translogUUIDtoUse = translog.getTranslogUUID();
translogGenToUse = translog.currentFileGeneration();
}
} else {
translogUUIDtoUse = translogGeneration.translogUUID;
translogGenToUse = translogGeneration.translogFileGeneration;
}
try (IndexWriter writer = new IndexWriter(replica.store().directory(), iwc)) {
userData.put(Engine.HISTORY_UUID_KEY, historyUUIDtoUse);
userData.put(Translog.TRANSLOG_UUID_KEY, translogUUIDtoUse);
userData.put(Translog.TRANSLOG_GENERATION_KEY, Long.toString(translogGenToUse));
writer.setLiveCommitData(userData.entrySet());
writer.commit();
}
replica.store().close();
IndexShard newReplica = shards.addReplicaWithExistingPath(replica.shardPath(), replica.routingEntry().currentNodeId());
shards.recoverReplica(newReplica);
// file based recovery should be made
assertThat(newReplica.recoveryState().getIndex().fileDetails(), not(empty()));
assertThat(newReplica.getTranslog().totalOperations(), equalTo(numDocs));
// history uuid was restored
assertThat(newReplica.getHistoryUUID(), equalTo(historyUUID));
assertThat(newReplica.commitStats().getUserData().get(Engine.HISTORY_UUID_KEY), equalTo(historyUUID));
shards.assertAllEqual(numDocs);
}
}
}

View File

@ -24,6 +24,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.io.stream.InputStreamStreamInput;
import org.elasticsearch.common.io.stream.OutputStreamStreamOutput;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.Store;
@ -31,6 +32,7 @@ import org.elasticsearch.test.ESTestCase;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.util.Collections;
import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
@ -41,6 +43,9 @@ public class StartRecoveryRequestTests extends ESTestCase {
public void testSerialization() throws Exception {
final Version targetNodeVersion = randomVersion(random());
Store.MetadataSnapshot metadataSnapshot = randomBoolean() ? Store.MetadataSnapshot.EMPTY :
new Store.MetadataSnapshot(Collections.emptyMap(),
Collections.singletonMap(Engine.HISTORY_UUID_KEY, UUIDs.randomBase64UUID()), randomIntBetween(0, 100));
final StartRecoveryRequest outRequest = new StartRecoveryRequest(
new ShardId("test", "_na_", 0),
UUIDs.randomBase64UUID(),
@ -49,7 +54,8 @@ public class StartRecoveryRequestTests extends ESTestCase {
Store.MetadataSnapshot.EMPTY,
randomBoolean(),
randomNonNegativeLong(),
randomBoolean() ? SequenceNumbers.UNASSIGNED_SEQ_NO : randomNonNegativeLong());
randomBoolean() || metadataSnapshot.getHistoryUUID() == null ?
SequenceNumbers.UNASSIGNED_SEQ_NO : randomNonNegativeLong());
final ByteArrayOutputStream outBuffer = new ByteArrayOutputStream();
final OutputStreamStreamOutput out = new OutputStreamStreamOutput(outBuffer);

View File

@ -96,6 +96,7 @@ which returns something similar to:
"generation" : 2,
"user_data" : {
"translog_uuid" : "hnOG3xFcTDeoI_kvvvOdNA",
"history_uuid" : "XP7KDJGiS1a2fHYiFL5TXQ",
"local_checkpoint" : "-1",
"translog_generation" : "1",
"max_seq_no" : "-1",
@ -117,6 +118,7 @@ which returns something similar to:
--------------------------------------------------
// TESTRESPONSE[s/"id" : "3M3zkw2GHMo2Y4h4\/KFKCg=="/"id": $body.indices.twitter.shards.0.0.commit.id/]
// TESTRESPONSE[s/"translog_uuid" : "hnOG3xFcTDeoI_kvvvOdNA"/"translog_uuid": $body.indices.twitter.shards.0.0.commit.user_data.translog_uuid/]
// TESTRESPONSE[s/"history_uuid" : "XP7KDJGiS1a2fHYiFL5TXQ"/"history_uuid": $body.indices.twitter.shards.0.0.commit.user_data.history_uuid/]
// TESTRESPONSE[s/"sync_id" : "AVvFY-071siAOuFGEO9P"/"sync_id": $body.indices.twitter.shards.0.0.commit.user_data.sync_id/]
// TESTRESPONSE[s/"1": \.\.\./"1": $body.indices.twitter.shards.1/]
// TESTRESPONSE[s/"2": \.\.\./"2": $body.indices.twitter.shards.2/]

View File

@ -33,6 +33,7 @@ import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.test.NotEqualMessageBuilder;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.elasticsearch.test.rest.yaml.ObjectPath;
import org.junit.Before;
import java.io.IOException;
@ -52,6 +53,7 @@ import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.notNullValue;
/**
* Tests to run before and after a full cluster restart. This is run twice,
@ -761,6 +763,39 @@ public class FullClusterRestartIT extends ESRestTestCase {
}
}
public void testHistoryUUIDIsAdded() throws Exception {
if (runningAgainstOldCluster) {
XContentBuilder mappingsAndSettings = jsonBuilder();
mappingsAndSettings.startObject();
{
mappingsAndSettings.startObject("settings");
mappingsAndSettings.field("number_of_shards", 1);
mappingsAndSettings.field("number_of_replicas", 1);
mappingsAndSettings.endObject();
}
mappingsAndSettings.endObject();
client().performRequest("PUT", "/" + index, Collections.emptyMap(),
new StringEntity(mappingsAndSettings.string(), ContentType.APPLICATION_JSON));
} else {
Response response = client().performRequest("GET", index + "/_stats", singletonMap("level", "shards"));
List<Object> shardStats = ObjectPath.createFromResponse(response).evaluate("indices." + index + ".shards.0");
String globalHistoryUUID = null;
for (Object shard : shardStats) {
final String nodeId = ObjectPath.evaluate(shard, "routing.node");
final Boolean primary = ObjectPath.evaluate(shard, "routing.primary");
logger.info("evaluating: {} , {}", ObjectPath.evaluate(shard, "routing"), ObjectPath.evaluate(shard, "commit"));
String historyUUID = ObjectPath.evaluate(shard, "commit.user_data.history_uuid");
assertThat("no history uuid found on " + nodeId + " (primary: " + primary + ")", historyUUID, notNullValue());
if (globalHistoryUUID == null) {
globalHistoryUUID = historyUUID;
} else {
assertThat("history uuid mismatch on " + nodeId + " (primary: " + primary + ")", historyUUID,
equalTo(globalHistoryUUID));
}
}
}
}
private void checkSnapshot(String snapshotName, int count, Version tookOnVersion) throws IOException {
// Check the snapshot metadata, especially the version
String response = toStr(client().performRequest("GET", "/_snapshot/repo/" + snapshotName, listSnapshotVerboseParams()));