Writing the `index.latest` blob is unnecessary unless the contents of the repository are to be used as a URL-repository. Also, in some edge cases, the fact that `index.latest` is the only blob in the repository that regularly gets overwritten was causing compatibility issues with some backing blobstores (Azure no-overwrite policy, Hitachy S3 equivalent). => this commit changes behavior to make snapshots not fail if writing `index.latest` fails and adds a setting to disable writing `index.latest`.
This commit is contained in:
parent
e76447833a
commit
204efe9387
|
@ -19,6 +19,7 @@
|
||||||
|
|
||||||
package org.elasticsearch.snapshots;
|
package org.elasticsearch.snapshots;
|
||||||
|
|
||||||
|
import org.apache.lucene.util.BytesRef;
|
||||||
import org.apache.lucene.util.LuceneTestCase;
|
import org.apache.lucene.util.LuceneTestCase;
|
||||||
import org.elasticsearch.ElasticsearchException;
|
import org.elasticsearch.ElasticsearchException;
|
||||||
import org.elasticsearch.ExceptionsHelper;
|
import org.elasticsearch.ExceptionsHelper;
|
||||||
|
@ -58,6 +59,7 @@ import org.elasticsearch.cluster.routing.ShardRouting;
|
||||||
import org.elasticsearch.cluster.routing.ShardRoutingState;
|
import org.elasticsearch.cluster.routing.ShardRoutingState;
|
||||||
import org.elasticsearch.cluster.routing.UnassignedInfo;
|
import org.elasticsearch.cluster.routing.UnassignedInfo;
|
||||||
import org.elasticsearch.cluster.service.ClusterService;
|
import org.elasticsearch.cluster.service.ClusterService;
|
||||||
|
import org.elasticsearch.common.Numbers;
|
||||||
import org.elasticsearch.common.Strings;
|
import org.elasticsearch.common.Strings;
|
||||||
import org.elasticsearch.common.bytes.BytesArray;
|
import org.elasticsearch.common.bytes.BytesArray;
|
||||||
import org.elasticsearch.common.bytes.BytesReference;
|
import org.elasticsearch.common.bytes.BytesReference;
|
||||||
|
@ -3463,6 +3465,34 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testIndexLatestFailuresIgnored() throws Exception {
|
||||||
|
final String repoName = "test-repo";
|
||||||
|
final Path repoPath = randomRepoPath();
|
||||||
|
createRepository(repoName, "mock", repoPath);
|
||||||
|
final MockRepository repository =
|
||||||
|
(MockRepository) internalCluster().getCurrentMasterNodeInstance(RepositoriesService.class).repository(repoName);
|
||||||
|
repository.setFailOnIndexLatest(true);
|
||||||
|
createFullSnapshot(repoName, "snapshot-1");
|
||||||
|
repository.setFailOnIndexLatest(false);
|
||||||
|
createFullSnapshot(repoName, "snapshot-2");
|
||||||
|
final long repoGenInIndexLatest =
|
||||||
|
Numbers.bytesToLong(new BytesRef(Files.readAllBytes(repoPath.resolve(BlobStoreRepository.INDEX_LATEST_BLOB))));
|
||||||
|
assertEquals(getRepositoryData(repoName).getGenId(), repoGenInIndexLatest);
|
||||||
|
|
||||||
|
createRepository(repoName, "fs", Settings.builder()
|
||||||
|
.put("location", repoPath).put(BlobStoreRepository.SUPPORT_URL_REPO.getKey(), false));
|
||||||
|
createFullSnapshot(repoName, "snapshot-3");
|
||||||
|
final long repoGenInIndexLatest2 =
|
||||||
|
Numbers.bytesToLong(new BytesRef(Files.readAllBytes(repoPath.resolve(BlobStoreRepository.INDEX_LATEST_BLOB))));
|
||||||
|
assertEquals("index.latest should not have been written to", repoGenInIndexLatest, repoGenInIndexLatest2);
|
||||||
|
|
||||||
|
createRepository(repoName, "fs", repoPath);
|
||||||
|
createFullSnapshot(repoName, "snapshot-4");
|
||||||
|
final long repoGenInIndexLatest3 =
|
||||||
|
Numbers.bytesToLong(new BytesRef(Files.readAllBytes(repoPath.resolve(BlobStoreRepository.INDEX_LATEST_BLOB))));
|
||||||
|
assertEquals(getRepositoryData(repoName).getGenId(), repoGenInIndexLatest3);
|
||||||
|
}
|
||||||
|
|
||||||
private void verifySnapshotInfo(final GetSnapshotsResponse response, final Map<String, List<String>> indicesPerSnapshot) {
|
private void verifySnapshotInfo(final GetSnapshotsResponse response, final Map<String, List<String>> indicesPerSnapshot) {
|
||||||
for (SnapshotInfo snapshotInfo : response.getSnapshots()) {
|
for (SnapshotInfo snapshotInfo : response.getSnapshots()) {
|
||||||
final List<String> expected = snapshotInfo.indices();
|
final List<String> expected = snapshotInfo.indices();
|
||||||
|
|
|
@ -210,6 +210,14 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
||||||
ByteSizeValue.parseBytesSizeValue("128kb", "io_buffer_size"), ByteSizeValue.parseBytesSizeValue("8kb", "buffer_size"),
|
ByteSizeValue.parseBytesSizeValue("128kb", "io_buffer_size"), ByteSizeValue.parseBytesSizeValue("8kb", "buffer_size"),
|
||||||
ByteSizeValue.parseBytesSizeValue("16mb", "io_buffer_size"), Setting.Property.NodeScope);
|
ByteSizeValue.parseBytesSizeValue("16mb", "io_buffer_size"), Setting.Property.NodeScope);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Setting to disable writing the {@code index.latest} blob which enables the contents of this repository to be used with a
|
||||||
|
* url-repository.
|
||||||
|
*/
|
||||||
|
public static final Setting<Boolean> SUPPORT_URL_REPO = Setting.boolSetting("support_url_repo", true, Setting.Property.NodeScope);
|
||||||
|
|
||||||
|
protected final boolean supportURLRepo;
|
||||||
|
|
||||||
private final boolean compress;
|
private final boolean compress;
|
||||||
|
|
||||||
private final boolean cacheRepositoryData;
|
private final boolean cacheRepositoryData;
|
||||||
|
@ -303,6 +311,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
||||||
this.threadPool = clusterService.getClusterApplierService().threadPool();
|
this.threadPool = clusterService.getClusterApplierService().threadPool();
|
||||||
this.clusterService = clusterService;
|
this.clusterService = clusterService;
|
||||||
this.recoverySettings = recoverySettings;
|
this.recoverySettings = recoverySettings;
|
||||||
|
this.supportURLRepo = SUPPORT_URL_REPO.get(metadata.settings());
|
||||||
snapshotRateLimiter = getRateLimiter(metadata.settings(), "max_snapshot_bytes_per_sec", new ByteSizeValue(40, ByteSizeUnit.MB));
|
snapshotRateLimiter = getRateLimiter(metadata.settings(), "max_snapshot_bytes_per_sec", new ByteSizeValue(40, ByteSizeUnit.MB));
|
||||||
restoreRateLimiter = getRateLimiter(metadata.settings(), "max_restore_bytes_per_sec", ByteSizeValue.ZERO);
|
restoreRateLimiter = getRateLimiter(metadata.settings(), "max_restore_bytes_per_sec", ByteSizeValue.ZERO);
|
||||||
readOnly = metadata.settings().getAsBoolean("readonly", false);
|
readOnly = metadata.settings().getAsBoolean("readonly", false);
|
||||||
|
@ -1563,15 +1572,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
||||||
final BytesReference serializedRepoData =
|
final BytesReference serializedRepoData =
|
||||||
BytesReference.bytes(newRepositoryData.snapshotsToXContent(XContentFactory.jsonBuilder(), version));
|
BytesReference.bytes(newRepositoryData.snapshotsToXContent(XContentFactory.jsonBuilder(), version));
|
||||||
writeAtomic(blobContainer(), indexBlob, serializedRepoData, true);
|
writeAtomic(blobContainer(), indexBlob, serializedRepoData, true);
|
||||||
// write the current generation to the index-latest file
|
maybeWriteIndexLatest(newGen);
|
||||||
final BytesReference genBytes;
|
|
||||||
try (BytesStreamOutput bStream = new BytesStreamOutput()) {
|
|
||||||
bStream.writeLong(newGen);
|
|
||||||
genBytes = bStream.bytes();
|
|
||||||
}
|
|
||||||
logger.debug("Repository [{}] updating index.latest with generation [{}]", metadata.name(), newGen);
|
|
||||||
|
|
||||||
writeAtomic(blobContainer(), INDEX_LATEST_BLOB, genBytes, false);
|
|
||||||
|
|
||||||
// Step 3: Update CS to reflect new repository generation.
|
// Step 3: Update CS to reflect new repository generation.
|
||||||
clusterService.submitStateUpdateTask("set safe repository generation [" + metadata.name() + "][" + newGen + "]",
|
clusterService.submitStateUpdateTask("set safe repository generation [" + metadata.name() + "][" + newGen + "]",
|
||||||
|
@ -1624,6 +1625,24 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
||||||
}, listener::onFailure);
|
}, listener::onFailure);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Write {@code index.latest} blob to support using this repository as the basis of a url repository.
|
||||||
|
*
|
||||||
|
* @param newGen new repository generation
|
||||||
|
*/
|
||||||
|
private void maybeWriteIndexLatest(long newGen) {
|
||||||
|
if (supportURLRepo) {
|
||||||
|
logger.debug("Repository [{}] updating index.latest with generation [{}]", metadata.name(), newGen);
|
||||||
|
try {
|
||||||
|
writeAtomic(blobContainer(), INDEX_LATEST_BLOB, new BytesArray(Numbers.longToBytes(newGen)), false);
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.warn(() -> new ParameterizedMessage("Failed to write index.latest blob. If you do not intend to use this " +
|
||||||
|
"repository as the basis for a URL repository you may turn off attempting to write the index.latest blob by " +
|
||||||
|
"setting repository setting [{}] to [false]", SUPPORT_URL_REPO.getKey()), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Ensures that {@link RepositoryData} for the given {@code safeGeneration} actually physically exists in the repository.
|
* Ensures that {@link RepositoryData} for the given {@code safeGeneration} actually physically exists in the repository.
|
||||||
* This method is used by {@link #writeIndexGen} to make sure that no writes are executed on top of a concurrently modified repository.
|
* This method is used by {@link #writeIndexGen} to make sure that no writes are executed on top of a concurrently modified repository.
|
||||||
|
|
|
@ -118,6 +118,11 @@ public class MockRepository extends FsRepository {
|
||||||
/** Allows blocking on writing the snapshot file at the end of snapshot creation to simulate a died master node */
|
/** Allows blocking on writing the snapshot file at the end of snapshot creation to simulate a died master node */
|
||||||
private volatile boolean blockAndFailOnWriteSnapFile;
|
private volatile boolean blockAndFailOnWriteSnapFile;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Writes to the blob {@code index.latest} at the repository root will fail with an {@link IOException} if {@code true}.
|
||||||
|
*/
|
||||||
|
private volatile boolean failOnIndexLatest = false;
|
||||||
|
|
||||||
private volatile boolean blocked = false;
|
private volatile boolean blocked = false;
|
||||||
|
|
||||||
public MockRepository(RepositoryMetadata metadata, Environment environment,
|
public MockRepository(RepositoryMetadata metadata, Environment environment,
|
||||||
|
@ -205,6 +210,10 @@ public class MockRepository extends FsRepository {
|
||||||
return blocked;
|
return blocked;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setFailOnIndexLatest(boolean failOnIndexLatest) {
|
||||||
|
this.failOnIndexLatest = failOnIndexLatest;
|
||||||
|
}
|
||||||
|
|
||||||
private synchronized boolean blockExecution() {
|
private synchronized boolean blockExecution() {
|
||||||
logger.debug("[{}] Blocking execution", metadata.name());
|
logger.debug("[{}] Blocking execution", metadata.name());
|
||||||
boolean wasBlocked = false;
|
boolean wasBlocked = false;
|
||||||
|
@ -272,6 +281,11 @@ public class MockRepository extends FsRepository {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void maybeIOExceptionOrBlock(String blobName) throws IOException {
|
private void maybeIOExceptionOrBlock(String blobName) throws IOException {
|
||||||
|
if (INDEX_LATEST_BLOB.equals(blobName)) {
|
||||||
|
// Don't mess with the index.latest blob here, failures to write to it are ignored by upstream logic and we have
|
||||||
|
// specific tests that cover the error handling around this blob.
|
||||||
|
return;
|
||||||
|
}
|
||||||
if (blobName.startsWith("__")) {
|
if (blobName.startsWith("__")) {
|
||||||
if (shouldFail(blobName, randomDataFileIOExceptionRate) && (incrementAndGetFailureCount() < maximumNumberOfFailures)) {
|
if (shouldFail(blobName, randomDataFileIOExceptionRate) && (incrementAndGetFailureCount() < maximumNumberOfFailures)) {
|
||||||
logger.info("throwing random IOException for file [{}] at path [{}]", blobName, path());
|
logger.info("throwing random IOException for file [{}] at path [{}]", blobName, path());
|
||||||
|
@ -397,6 +411,9 @@ public class MockRepository extends FsRepository {
|
||||||
public void writeBlobAtomic(final String blobName, final InputStream inputStream, final long blobSize,
|
public void writeBlobAtomic(final String blobName, final InputStream inputStream, final long blobSize,
|
||||||
final boolean failIfAlreadyExists) throws IOException {
|
final boolean failIfAlreadyExists) throws IOException {
|
||||||
final Random random = RandomizedContext.current().getRandom();
|
final Random random = RandomizedContext.current().getRandom();
|
||||||
|
if (failOnIndexLatest && BlobStoreRepository.INDEX_LATEST_BLOB.equals(blobName)) {
|
||||||
|
throw new IOException("Random IOException");
|
||||||
|
}
|
||||||
if (blobName.startsWith("index-") && blockOnWriteIndexFile) {
|
if (blobName.startsWith("index-") && blockOnWriteIndexFile) {
|
||||||
blockExecutionAndFail(blobName);
|
blockExecutionAndFail(blobName);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue