Fixes retrieval of the latest snapshot index blob (#22700)

This commit ensures that the index.latest blob is first examined to
determine the latest index-N blob id, before attempting to list all
index-N blobs and picking the blob with the highest N.

It also fixes the MockRepository#move so that tests are able to handle
non-atomic moves.  This is done by adding a special setting to the
MockRepository that requires the test to specify if it can handle
non-atomic moves.  If so, then the MockRepository#move operation will be
non-atomic to allow testing for against such repositories.
This commit is contained in:
Ali Beyad 2017-01-20 17:00:46 -06:00 committed by GitHub
parent f7524fbdef
commit 3bf06d1440
3 changed files with 36 additions and 77 deletions

View File

@ -190,15 +190,15 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
private static final String INDEX_METADATA_CODEC = "index-metadata";
protected static final String SNAPSHOT_NAME_FORMAT = SNAPSHOT_PREFIX + "%s.dat";
private static final String SNAPSHOT_NAME_FORMAT = SNAPSHOT_PREFIX + "%s.dat";
protected static final String SNAPSHOT_INDEX_PREFIX = "index-";
private static final String SNAPSHOT_INDEX_PREFIX = "index-";
protected static final String SNAPSHOT_INDEX_NAME_FORMAT = SNAPSHOT_INDEX_PREFIX + "%s";
private static final String SNAPSHOT_INDEX_NAME_FORMAT = SNAPSHOT_INDEX_PREFIX + "%s";
protected static final String SNAPSHOT_INDEX_CODEC = "snapshots";
private static final String SNAPSHOT_INDEX_CODEC = "snapshots";
protected static final String DATA_BLOB_PREFIX = "__";
private static final String DATA_BLOB_PREFIX = "__";
private final RateLimiter snapshotRateLimiter;
@ -732,23 +732,22 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
*/
long latestIndexBlobId() throws IOException {
try {
// first, try listing the blobs and determining which index blob is the latest
return listBlobsToGetLatestIndexId();
} catch (UnsupportedOperationException e) {
// could not list the blobs because the repository does not support the operation,
// try reading from the index-latest file
// first, try reading the latest index generation from the index.latest blob
return readSnapshotIndexLatestBlob();
} catch (IOException ioe) {
// we could not find the index.latest blob, this can happen in two scenarios:
// (1) its an empty repository
// (2) when writing the index-latest blob, if the blob already exists,
// we first delete it, then atomically write the new blob. there is
// a small window in time when the blob is deleted and the new one
// written - if the node crashes during that time, we won't have an
// index-latest blob
// lets try to list all index-N blobs to determine the last one, if listing the blobs
// is not a supported operation (which is the case for read-only repositories), then
// assume its an empty repository.
try {
return readSnapshotIndexLatestBlob();
} catch (IOException ioe) {
// we likely could not find the blob, this can happen in two scenarios:
// (1) its an empty repository
// (2) when writing the index-latest blob, if the blob already exists,
// we first delete it, then atomically write the new blob. there is
// a small window in time when the blob is deleted and the new one
// written - if the node crashes during that time, we won't have an
// index-latest blob
// in a read-only repository, we can't know which of the two scenarios it is,
// but we will assume (1) because we can't do anything about (2) anyway
return listBlobsToGetLatestIndexId();
} catch (UnsupportedOperationException uoe) {
return RepositoryData.EMPTY_REPO_GEN;
}
}

View File

@ -2738,6 +2738,8 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
Settings.builder()
.put("location", repoPath)
.put("random_control_io_exception_rate", randomIntBetween(5, 20) / 100f)
// test that we can take a snapshot after a failed one, even if a partial index-N was written
.put("atomic_move", false)
.put("random", randomAsciiOfLength(10))));
logger.info("--> indexing some data");

View File

@ -42,7 +42,6 @@ import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.compress.NotXContentException;
import org.elasticsearch.common.io.PathUtils;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
@ -52,7 +51,6 @@ import org.elasticsearch.env.Environment;
import org.elasticsearch.plugins.RepositoryPlugin;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.RepositoryData;
import org.elasticsearch.repositories.fs.FsRepository;
import org.elasticsearch.snapshots.SnapshotId;
@ -102,6 +100,8 @@ public class MockRepository extends FsRepository {
private volatile boolean blockOnDataFiles;
private volatile boolean atomicMove;
private volatile boolean blocked = false;
public MockRepository(RepositoryMetaData metadata, Environment environment,
@ -116,6 +116,7 @@ public class MockRepository extends FsRepository {
blockOnInitialization = metadata.settings().getAsBoolean("block_on_init", false);
randomPrefix = metadata.settings().get("random", "default");
waitAfterUnblock = metadata.settings().getAsLong("wait_after_unblock", 0L);
atomicMove = metadata.settings().getAsBoolean("atomic_move", true);
logger.info("starting mock repository with random prefix {}", randomPrefix);
mockBlobStore = new MockBlobStore(super.blobStore());
}
@ -156,52 +157,7 @@ public class MockRepository extends FsRepository {
return mockBlobStore;
}
public void unblock() {
unblockExecution();
}
public void blockOnDataFiles(boolean blocked) {
blockOnDataFiles = blocked;
}
@Override
public RepositoryData getRepositoryData() {
final int numIterations = 10;
int count = 0;
NotXContentException ex = null;
RepositoryData repositoryData = null;
while (count < numIterations) {
try {
repositoryData = super.getRepositoryData();
} catch (NotXContentException e) {
ex = e;
}
if (repositoryData != null) {
break;
}
count++;
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
}
}
if (ex != null) {
logger.info("--> [{}] repository failed to read x-content from index file, on iteration [{}] the repository data was [{}]",
metadata.name(), count, repositoryData);
throw ex;
}
return repositoryData;
}
public void blockOnControlFiles(boolean blocked) {
blockOnControlFiles = blocked;
}
public boolean blockOnDataFiles() {
return blockOnDataFiles;
}
public synchronized void unblockExecution() {
public synchronized void unblock() {
blocked = false;
// Clean blocking flags, so we wouldn't try to block again
blockOnDataFiles = false;
@ -210,6 +166,10 @@ public class MockRepository extends FsRepository {
this.notifyAll();
}
public void blockOnDataFiles(boolean blocked) {
blockOnDataFiles = blocked;
}
public boolean blocked() {
return blocked;
}
@ -300,9 +260,7 @@ public class MockRepository extends FsRepository {
}
}
}
}
// don't block on the index-N files, as getRepositoryData depends on it
else if (blobName.startsWith("index-") == false) {
} else {
if (shouldFail(blobName, randomControlIOExceptionRate) && (incrementAndGetFailureCount() < maximumNumberOfFailures)) {
logger.info("throwing random IOException for file [{}] at path [{}]", blobName, path());
throw new IOException("Random IOException");
@ -357,16 +315,16 @@ public class MockRepository extends FsRepository {
@Override
public void move(String sourceBlob, String targetBlob) throws IOException {
if (RandomizedContext.current().getRandom().nextBoolean()) {
if (atomicMove) {
// atomic move since this inherits from FsBlobContainer which provides atomic moves
maybeIOExceptionOrBlock(targetBlob);
super.move(sourceBlob, targetBlob);
} else {
// simulate a non-atomic move, since many blob container implementations
// will not have an atomic move, and we should be able to handle that
maybeIOExceptionOrBlock(targetBlob);
super.writeBlob(targetBlob, super.readBlob(sourceBlob), 0L);
super.deleteBlob(sourceBlob);
} else {
// atomic move since this inherits from FsBlobContainer which provides atomic moves
maybeIOExceptionOrBlock(targetBlob);
super.move(sourceBlob, targetBlob);
}
}