Add Check for Metadata Existence in BlobStoreRepository (#59141) (#59216)

In order to ensure that we do not write a broken piece of `RepositoryData`
because the phyiscal repository generation was moved ahead more than one step
by erroneous concurrent writing to a repository we must check whether or not
the current assumed repository generation exists in the repository physically.
Without this check we run the risk of writing on top of stale cached repository data.

Relates #56911
This commit is contained in:
Armin Braun 2020-07-08 14:25:01 +02:00 committed by GitHub
parent 3e32d060bf
commit 9268b25789
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 271 additions and 36 deletions

View File

@ -69,6 +69,15 @@ public class URLBlobContainer extends AbstractBlobContainer {
return this.path;
}
/**
* This operation is not supported by URLBlobContainer
*/
@Override
public boolean blobExists(String blobName) {
assert false : "should never be called for a read-only url repo";
throw new UnsupportedOperationException("URL repository doesn't support this operation");
}
/**
* This operation is not supported by URLBlobContainer
*/

View File

@ -59,7 +59,8 @@ public class AzureBlobContainer extends AbstractBlobContainer {
this.threadPool = threadPool;
}
private boolean blobExists(String blobName) {
@Override
public boolean blobExists(String blobName) {
logger.trace("blobExists({})", blobName);
try {
return blobStore.blobExists(buildKey(blobName));

View File

@ -163,10 +163,11 @@ public class AzureBlobStore implements BlobStore {
public boolean blobExists(String blob) throws URISyntaxException, StorageException {
// Container name must be lower case.
final Tuple<CloudBlobClient, Supplier<OperationContext>> client = client();
final OperationContext context = hookMetricCollector(client.v2().get(), getMetricsCollector);
final CloudBlobContainer blobContainer = client.v1().getContainerReference(container);
return SocketAccess.doPrivilegedException(() -> {
final CloudBlockBlob azureBlob = blobContainer.getBlockBlobReference(blob);
return azureBlob.exists(null, null, client.v2().get());
return azureBlob.exists(null, null, context);
});
}

View File

@ -22,6 +22,7 @@ package org.elasticsearch.repositories.gcs;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobMetadata;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStoreException;
import org.elasticsearch.common.blobstore.DeleteResult;
import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
@ -42,6 +43,15 @@ class GoogleCloudStorageBlobContainer extends AbstractBlobContainer {
this.path = path.buildAsString();
}
@Override
public boolean blobExists(String blobName) {
try {
return blobStore.blobExists(buildKey(blobName));
} catch (Exception e) {
throw new BlobStoreException("Failed to check if blob [" + blobName + "] exists", e);
}
}
@Override
public Map<String, BlobMetadata> listBlobs() throws IOException {
return blobStore.listBlobs(path);

View File

@ -198,6 +198,18 @@ class GoogleCloudStorageBlobStore implements BlobStore {
return mapBuilder.immutableMap();
}
/**
* Returns true if the blob exists in the specific bucket
*
* @param blobName name of the blob
* @return true iff the blob exists
*/
boolean blobExists(String blobName) throws IOException {
final BlobId blobId = BlobId.of(bucketName, blobName);
final Blob blob = SocketAccess.doPrivilegedIOException(() -> client().get(blobId));
return blob != null;
}
/**
* Returns an {@link java.io.InputStream} for the given blob name
*

View File

@ -337,7 +337,9 @@ public class GoogleCloudStorageBlobStoreRepositoryTests extends ESMockAPIBasedRe
@Override
public void maybeTrack(final String request, Headers requestHeaders) {
if (Regex.simpleMatch("GET /storage/v1/b/*/o*", request)) {
if (Regex.simpleMatch("GET /storage/v1/b/*/o/*", request)) {
trackRequest("GET");
} else if (Regex.simpleMatch("GET /storage/v1/b/*/o*", request)) {
trackRequest("LIST");
} else if (Regex.simpleMatch("GET /download/storage/v1/b/*", request)) {
trackRequest("GET");

View File

@ -63,6 +63,11 @@ final class HdfsBlobContainer extends AbstractBlobContainer {
// TODO: See if we can get precise result reporting.
private static final DeleteResult DELETE_RESULT = new DeleteResult(1L, 0L);
@Override
public boolean blobExists(String blobName) throws IOException {
return store.execute(fileContext -> fileContext.util().exists(new Path(path, blobName)));
}
@Override
public DeleteResult delete() throws IOException {
store.execute(fileContext -> fileContext.delete(path, true));

View File

@ -29,7 +29,6 @@ import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil;
import org.elasticsearch.test.ESTestCase;
import javax.security.auth.Subject;
@ -132,6 +131,6 @@ public class HdfsBlobStoreContainerTests extends ESTestCase {
byte[] data = randomBytes(randomIntBetween(10, scaledRandomIntBetween(1024, 1 << 16)));
writeBlob(container, "foo", new BytesArray(data), randomBoolean());
assertArrayEquals(readBlobFully(container, "foo", data.length), data);
assertTrue(BlobStoreTestUtil.blobExists(container, "foo"));
assertTrue(container.blobExists("foo"));
}
}

View File

@ -42,6 +42,7 @@ import org.elasticsearch.common.Strings;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobMetadata;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStoreException;
import org.elasticsearch.common.blobstore.DeleteResult;
import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
import org.elasticsearch.common.blobstore.support.PlainBlobMetadata;
@ -84,6 +85,15 @@ class S3BlobContainer extends AbstractBlobContainer {
this.keyPath = path.buildAsString();
}
@Override
public boolean blobExists(String blobName) {
try (AmazonS3Reference clientReference = blobStore.clientReference()) {
return SocketAccess.doPrivileged(() -> clientReference.client().doesObjectExist(blobStore.bucket(), buildKey(blobName)));
} catch (final Exception e) {
throw new BlobStoreException("Failed to check if blob [" + blobName +"] exists", e);
}
}
@Override
public InputStream readBlob(String blobName) throws IOException {
return new S3RetryingInputStream(blobStore, buildKey(blobName));

View File

@ -27,7 +27,6 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil;
import org.elasticsearch.repositories.blobstore.ESBlobStoreRepositoryIntegTestCase;
import java.io.IOException;
@ -131,7 +130,7 @@ public class FsBlobStoreRepositoryIT extends ESBlobStoreRepositoryIntegTestCase
byte[] data = randomBytes(randomIntBetween(10, scaledRandomIntBetween(1024, 1 << 16)));
writeBlob(container, "test", new BytesArray(data));
assertArrayEquals(readBlobFully(container, "test", data.length), data);
assertTrue(BlobStoreTestUtil.blobExists(container, "test"));
assertTrue(container.blobExists("test"));
}
}
}

View File

@ -0,0 +1,110 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.snapshots;
import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.env.Environment;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.snapshots.mockstore.MockRepository;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.InternalSettingsPlugin;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.MockHttpTransport;
import org.elasticsearch.test.NodeConfigurationSource;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.nio.MockNioTransportPlugin;
import org.junit.After;
import org.junit.Before;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.function.Function;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.containsString;
public class MultiClusterRepoAccessIT extends AbstractSnapshotIntegTestCase {
private InternalTestCluster secondCluster;
private Path repoPath;
@Before
public void startSecondCluster() throws IOException, InterruptedException {
repoPath = randomRepoPath();
secondCluster = new InternalTestCluster(randomLong(), createTempDir(), true, true, 0,
0, "second_cluster", new NodeConfigurationSource() {
@Override
public Settings nodeSettings(int nodeOrdinal) {
return Settings.builder().put(MultiClusterRepoAccessIT.this.nodeSettings(nodeOrdinal))
.put(NetworkModule.TRANSPORT_TYPE_KEY, getTestTransportType())
.put(Environment.PATH_REPO_SETTING.getKey(), repoPath).build();
}
@Override
public Path nodeConfigPath(int nodeOrdinal) {
return null;
}
}, 0, "leader", Arrays.asList(ESIntegTestCase.TestSeedPlugin.class,
MockHttpTransport.TestPlugin.class, MockTransportService.TestPlugin.class,
MockNioTransportPlugin.class, InternalSettingsPlugin.class, MockRepository.Plugin.class), Function.identity());
secondCluster.beforeTest(random(), 0);
}
@After
public void stopSecondCluster() throws IOException {
IOUtils.close(secondCluster);
}
public void testConcurrentDeleteFromOtherCluster() throws InterruptedException {
internalCluster().startMasterOnlyNode();
internalCluster().startDataOnlyNode();
final String repoNameOnFirstCluster = "test-repo";
final String repoNameOnSecondCluster = randomBoolean() ? "test-repo" : "other-repo";
createRepository(repoNameOnFirstCluster, "fs", repoPath);
secondCluster.startMasterOnlyNode();
secondCluster.startDataOnlyNode();
secondCluster.client().admin().cluster().preparePutRepository(repoNameOnSecondCluster).setType("fs")
.setSettings(Settings.builder().put("location", repoPath)).get();
createIndexWithRandomDocs("test-idx-1", randomIntBetween(1, 100));
createFullSnapshot(repoNameOnFirstCluster, "snap-1");
createIndexWithRandomDocs("test-idx-2", randomIntBetween(1, 100));
createFullSnapshot(repoNameOnFirstCluster, "snap-2");
createIndexWithRandomDocs("test-idx-3", randomIntBetween(1, 100));
createFullSnapshot(repoNameOnFirstCluster, "snap-3");
secondCluster.client().admin().cluster().prepareDeleteSnapshot(repoNameOnSecondCluster, "snap-1").get();
secondCluster.client().admin().cluster().prepareDeleteSnapshot(repoNameOnSecondCluster, "snap-2").get();
final SnapshotException sne = expectThrows(SnapshotException.class, () ->
client().admin().cluster().prepareCreateSnapshot(repoNameOnFirstCluster, "snap-4").setWaitForCompletion(true)
.execute().actionGet());
assertThat(sne.getMessage(), containsString("failed to update snapshot in repository"));
final RepositoryException cause = (RepositoryException) sne.getCause();
assertThat(cause.getMessage(), containsString("[" + repoNameOnFirstCluster +
"] concurrent modification of the index-N file, expected current generation [2] but it was not found in the repository"));
assertAcked(client().admin().cluster().prepareDeleteRepository(repoNameOnFirstCluster).get());
createRepository(repoNameOnFirstCluster, "fs", repoPath);
createFullSnapshot(repoNameOnFirstCluster, "snap-5");
}
}

View File

@ -38,6 +38,15 @@ public interface BlobContainer {
*/
BlobPath path();
/**
* Tests whether a blob with the given blob name exists in the container.
*
* @param blobName
* The name of the blob whose existence is to be determined.
* @return {@code true} if a blob exists in the {@link BlobContainer} with the given name, and {@code false} otherwise.
*/
boolean blobExists(String blobName) throws IOException;
/**
* Creates a new {@link InputStream} for the given blob name.
*

View File

@ -148,6 +148,11 @@ public class FsBlobContainer extends AbstractBlobContainer {
return new BufferedInputStream(inputStream, blobStore.bufferSizeInBytes());
}
@Override
public boolean blobExists(String blobName) {
return Files.exists(path.resolve(blobName));
}
@Override
public InputStream readBlob(String name) throws IOException {
final Path resolvedPath = path.resolve(name);

View File

@ -46,6 +46,11 @@ public abstract class FilterBlobContainer implements BlobContainer {
return delegate.path();
}
@Override
public boolean blobExists(String blobName) throws IOException {
return delegate.blobExists(blobName);
}
@Override
public InputStream readBlob(String blobName) throws IOException {
return delegate.readBlob(blobName);

View File

@ -1520,6 +1520,9 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
+ "] already");
}
// write the index file
if (ensureSafeGenerationExists(expectedGen, listener::onFailure) == false) {
return;
}
final String indexBlob = INDEX_FILE_PREFIX + Long.toString(newGen);
logger.debug("Repository [{}] writing new index generational blob [{}]", metadata.name(), indexBlob);
writeAtomic(indexBlob,
@ -1586,6 +1589,37 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
}, listener::onFailure);
}
/**
* Ensures that {@link RepositoryData} for the given {@code safeGeneration} actually physically exists in the repository.
* This method is used by {@link #writeIndexGen} to make sure that no writes are executed on top of a concurrently modified repository.
* This check is necessary because {@link RepositoryData} is mostly read from the cached value in {@link #latestKnownRepositoryData}
* which could be stale in the broken situation of a concurrent write to the repository.
*
* @param safeGeneration generation to verify existence for
* @param onFailure callback to invoke with failure in case the repository generation is not physically found in the repository
*/
private boolean ensureSafeGenerationExists(long safeGeneration, Consumer<Exception> onFailure) throws IOException {
logger.debug("Ensure generation [{}] that is the basis for this write exists in [{}]", safeGeneration, metadata.name());
if (safeGeneration != RepositoryData.EMPTY_REPO_GEN && blobContainer().blobExists(INDEX_FILE_PREFIX + safeGeneration) == false) {
final Exception exception = new RepositoryException(metadata.name(),
"concurrent modification of the index-N file, expected current generation [" + safeGeneration +
"] but it was not found in the repository");
markRepoCorrupted(safeGeneration, exception, new ActionListener<Void>() {
@Override
public void onResponse(Void aVoid) {
onFailure.accept(exception);
}
@Override
public void onFailure(Exception e) {
onFailure.accept(e);
}
});
return false;
}
return true;
}
private RepositoryMetadata getRepoMetadata(ClusterState state) {
final RepositoryMetadata repositoryMetadata =
state.getMetadata().<RepositoriesMetadata>custom(RepositoriesMetadata.TYPE).repository(metadata.name());

View File

@ -37,7 +37,6 @@ import org.elasticsearch.common.xcontent.ToXContentFragment;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.translog.BufferedChecksumStreamOutput;
import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil;
import org.elasticsearch.repositories.blobstore.ChecksumBlobStoreFormat;
import org.elasticsearch.test.ESTestCase;
@ -197,10 +196,10 @@ public class BlobStoreFormatTests extends ESTestCase {
});
// signalling
block.await(5, TimeUnit.SECONDS);
assertFalse(BlobStoreTestUtil.blobExists(blobContainer, "test-blob"));
assertFalse(blobContainer.blobExists("test-blob"));
unblock.countDown();
future.get();
assertTrue(BlobStoreTestUtil.blobExists(blobContainer, "test-blob"));
assertTrue(blobContainer.blobExists("test-blob"));
} finally {
threadPool.shutdown();
}

View File

@ -195,6 +195,16 @@ public class MockEventuallyConsistentRepository extends BlobStoreRepository {
return path;
}
@Override
public boolean blobExists(String blobName) {
try {
readBlob(blobName);
return true;
} catch (NoSuchFileException ignored) {
return false;
}
}
@Override
public InputStream readBlob(String name) throws NoSuchFileException {
ensureNotClosed();

View File

@ -87,7 +87,18 @@ public class GoogleCloudStorageHttpHandler implements HttpHandler {
try {
// Request body is closed in the finally block
final BytesReference requestBody = Streams.readFully(Streams.noCloseStream(exchange.getRequestBody()));
if (Regex.simpleMatch("GET /storage/v1/b/" + bucket + "/o*", request)) {
if (Regex.simpleMatch("GET /storage/v1/b/" + bucket + "/o/*", request)) {
final String key = exchange.getRequestURI().getPath().replace("/storage/v1/b/" + bucket + "/o/", "");
final BytesReference blob = blobs.get(key);
if (blob == null) {
exchange.sendResponseHeaders(RestStatus.NOT_FOUND.getStatus(), -1);
} else {
final byte[] response = buildBlobInfoJson(key, blob.length()).getBytes(UTF_8);
exchange.getResponseHeaders().add("Content-Type", "application/json; charset=utf-8");
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length);
exchange.getResponseBody().write(response);
}
} else if (Regex.simpleMatch("GET /storage/v1/b/" + bucket + "/o*", request)) {
// List Objects https://cloud.google.com/storage/docs/json_api/v1/objects/list
final Map<String, String> params = new HashMap<>();
RestUtils.decodeQueryString(exchange.getRequestURI().getQuery(), 0, params);
@ -104,12 +115,7 @@ public class GoogleCloudStorageHttpHandler implements HttpHandler {
if (delimiterPos > -1) {
prefixes.add("\"" + blobName.substring(0, prefix.length() + delimiterPos + 1) + "\"");
} else {
listOfBlobs.add("{\"kind\":\"storage#object\","
+ "\"bucket\":\"" + bucket + "\","
+ "\"name\":\"" + blobName + "\","
+ "\"id\":\"" + blobName + "\","
+ "\"size\":\"" + blob.getValue().length() + "\""
+ "}");
listOfBlobs.add(buildBlobInfoJson(blobName, blob.getValue().length()));
}
}
}
@ -252,6 +258,15 @@ public class GoogleCloudStorageHttpHandler implements HttpHandler {
}
}
private String buildBlobInfoJson(String blobName, int size) {
return "{\"kind\":\"storage#object\","
+ "\"bucket\":\"" + bucket + "\","
+ "\"name\":\"" + blobName + "\","
+ "\"id\":\"" + blobName + "\","
+ "\"size\":\"" + size + "\""
+ "}";
}
public Map<String, BytesReference> blobs() {
return blobs;
}

View File

@ -84,7 +84,14 @@ public class S3HttpHandler implements HttpHandler {
assert read == -1 : "Request body should have been empty but saw [" + read + "]";
}
try {
if (Regex.simpleMatch("POST /" + path + "/*?uploads", request)) {
if (Regex.simpleMatch("HEAD /" + path + "/*", request)) {
final BytesReference blob = blobs.get(exchange.getRequestURI().getPath());
if (blob == null) {
exchange.sendResponseHeaders(RestStatus.NOT_FOUND.getStatus(), -1);
} else {
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), -1);
}
} else if (Regex.simpleMatch("POST /" + path + "/*?uploads", request)) {
final String uploadId = UUIDs.randomBase64UUID();
byte[] response = ("<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n" +
"<InitiateMultipartUploadResult>\n" +

View File

@ -249,9 +249,9 @@ public abstract class AbstractThirdPartyRepositoryTestCase extends ESSingleNodeT
executor.execute(ActionRunnable.supply(future, () -> {
final BlobStore blobStore = repo.blobStore();
return blobStore.blobContainer(repo.basePath().add("indices")).children().containsKey("foo")
&& BlobStoreTestUtil.blobExists(blobStore.blobContainer(repo.basePath().add("indices").add("foo")), "bar")
&& BlobStoreTestUtil.blobExists(blobStore.blobContainer(repo.basePath()), "meta-foo.dat")
&& BlobStoreTestUtil.blobExists(blobStore.blobContainer(repo.basePath()), "snap-foo.dat");
&& blobStore.blobContainer(repo.basePath().add("indices").add("foo")).blobExists("bar")
&& blobStore.blobContainer(repo.basePath()).blobExists("meta-foo.dat")
&& blobStore.blobContainer(repo.basePath()).blobExists("snap-foo.dat");
}));
return future.actionGet();
}

View File

@ -96,18 +96,6 @@ public final class BlobStoreTestUtil {
BlobStoreTestUtil.assertConsistency(repo, repo.threadPool().executor(ThreadPool.Names.GENERIC));
}
private static final byte[] SINK = new byte[1024];
public static boolean blobExists(BlobContainer container, String blobName) throws IOException {
try (InputStream input = container.readBlob(blobName)) {
// Drain input stream fully to avoid warnings from SDKs like S3 that don't like closing streams mid-way
while (input.read(SINK) >= 0);
return true;
} catch (NoSuchFileException e) {
return false;
}
}
/**
* Assert that there are no unreferenced indices or unreferenced root-level metadata blobs in any repository.
* TODO: Expand the logic here to also check for unreferenced segment blobs and shard level metadata

View File

@ -225,8 +225,8 @@ public abstract class ESBlobStoreRepositoryIntegTestCase extends ESIntegTestCase
assertArrayEquals(readBlobFully(containerFoo, "test", data1.length), data1);
assertArrayEquals(readBlobFully(containerBar, "test", data2.length), data2);
assertTrue(BlobStoreTestUtil.blobExists(containerFoo, "test"));
assertTrue(BlobStoreTestUtil.blobExists(containerBar, "test"));
assertTrue(containerFoo.blobExists("test"));
assertTrue(containerBar.blobExists("test"));
containerBar.delete();
containerFoo.delete();
}
@ -445,7 +445,7 @@ public abstract class ESBlobStoreRepositoryIntegTestCase extends ESIntegTestCase
for (IndexId indexId : repositoryData.actionGet().getIndices().values()) {
if (indexId.getName().equals("test-idx-3")) {
assertFalse(BlobStoreTestUtil.blobExists(indicesBlobContainer.get(), indexId.getId())); // deleted index
assertFalse(indicesBlobContainer.get().blobExists(indexId.getId())); // deleted index
}
}

View File

@ -153,6 +153,11 @@ public final class TestUtils {
throw unsupportedException();
}
@Override
public boolean blobExists(String blobName) {
throw unsupportedException();
}
@Override
public InputStream readBlob(String blobName) {
throw unsupportedException();