* Fix Bug With RepositoryData Caching This fixes a really subtle bug with caching `RepositoryData` that can corrupt a repository. We were caching `RepositoryData` serialized in the newest metadata format. This lead to a confusing situation where numeric shard generations would be cached in `ShardGenerations` that were not written to the repository because the repository or cluster did not yet support `ShardGenerations`. In the case where shard generations are not actually supported yet, these cached numeric generations are not safe and there's multiple scenarios where they would be incorrect, leading to the repository trying to read shard level metadata from index-N that don't exist. This commit makes it so that cached metadata is always in the same format as the metadata in the repository. Relates #57798
This commit is contained in:
parent
7a06a13d99
commit
004eb8bd7e
|
@ -28,10 +28,16 @@ import org.elasticsearch.cluster.ClusterStateUpdateTask;
|
|||
import org.elasticsearch.cluster.metadata.Metadata;
|
||||
import org.elasticsearch.cluster.metadata.RepositoriesMetadata;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||
import org.elasticsearch.common.xcontent.DeprecationHandler;
|
||||
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||
import org.elasticsearch.common.xcontent.json.JsonXContent;
|
||||
import org.elasticsearch.repositories.IndexId;
|
||||
import org.elasticsearch.repositories.RepositoriesService;
|
||||
import org.elasticsearch.repositories.Repository;
|
||||
import org.elasticsearch.repositories.RepositoryData;
|
||||
|
@ -40,6 +46,7 @@ import org.elasticsearch.repositories.ShardGenerations;
|
|||
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.StandardOpenOption;
|
||||
|
@ -49,6 +56,7 @@ import java.util.function.Function;
|
|||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertFileExists;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
|
@ -351,6 +359,66 @@ public class CorruptedBlobStoreRepositoryIT extends AbstractSnapshotIntegTestCas
|
|||
expectThrows(RepositoryException.class, () -> getRepositoryData(otherRepo));
|
||||
}
|
||||
|
||||
public void testHandleSnapshotErrorWithBwCFormat() throws IOException {
|
||||
final String repoName = "test-repo";
|
||||
final Path repoPath = randomRepoPath();
|
||||
createRepository(repoName, "fs", repoPath);
|
||||
|
||||
// Workaround to simulate BwC situation: taking a snapshot without indices here so that we don't create any new version shard
|
||||
// generations (the existence of which would short-circuit checks for the repo containing old version snapshots)
|
||||
final String oldVersionSnapshot = "old-version-snapshot";
|
||||
final CreateSnapshotResponse createSnapshotResponse = client().admin().cluster()
|
||||
.prepareCreateSnapshot(repoName, oldVersionSnapshot).setIndices().setWaitForCompletion(true).get();
|
||||
assertThat(createSnapshotResponse.getSnapshotInfo().totalShards(), is(0));
|
||||
|
||||
logger.info("--> writing downgraded RepositoryData");
|
||||
final RepositoryData repositoryData = getRepositoryData(repoName);
|
||||
final XContentBuilder jsonBuilder = JsonXContent.contentBuilder();
|
||||
repositoryData.snapshotsToXContent(jsonBuilder, false);
|
||||
final RepositoryData downgradedRepoData = RepositoryData.snapshotsFromXContent(JsonXContent.jsonXContent.createParser(
|
||||
NamedXContentRegistry.EMPTY,
|
||||
DeprecationHandler.THROW_UNSUPPORTED_OPERATION,
|
||||
Strings.toString(jsonBuilder).replace(Version.CURRENT.toString(), SnapshotsService.OLD_SNAPSHOT_FORMAT.toString())),
|
||||
repositoryData.getGenId());
|
||||
Files.write(repoPath.resolve(BlobStoreRepository.INDEX_FILE_PREFIX + repositoryData.getGenId()),
|
||||
BytesReference.toBytes(BytesReference.bytes(
|
||||
downgradedRepoData.snapshotsToXContent(XContentFactory.jsonBuilder(), false))),
|
||||
StandardOpenOption.TRUNCATE_EXISTING);
|
||||
|
||||
logger.info("--> recreating repository to clear caches");
|
||||
client().admin().cluster().prepareDeleteRepository(repoName).get();
|
||||
createRepository(repoName, "fs", repoPath);
|
||||
|
||||
final String indexName = "test-index";
|
||||
createIndex(indexName);
|
||||
|
||||
assertCreateSnapshotSuccess(repoName, "snapshot-1");
|
||||
|
||||
// In the old metadata version the shard level metadata could be moved to the next generation for all sorts of reasons, this should
|
||||
// not break subsequent repository operations
|
||||
logger.info("--> move shard level metadata to new generation");
|
||||
final IndexId indexId = getRepositoryData(repoName).resolveIndexId(indexName);
|
||||
final Path shardPath = repoPath.resolve("indices").resolve(indexId.getId()).resolve("0");
|
||||
final Path initialShardMetaPath = shardPath.resolve(BlobStoreRepository.INDEX_FILE_PREFIX + "0");
|
||||
assertFileExists(initialShardMetaPath);
|
||||
Files.move(initialShardMetaPath, shardPath.resolve(BlobStoreRepository.INDEX_FILE_PREFIX + "1"));
|
||||
|
||||
logger.info("--> delete old version snapshot");
|
||||
client().admin().cluster().prepareDeleteSnapshot(repoName, oldVersionSnapshot).get();
|
||||
|
||||
assertCreateSnapshotSuccess(repoName, "snapshot-2");
|
||||
}
|
||||
|
||||
private void assertCreateSnapshotSuccess(String repoName, String snapshotName) {
|
||||
logger.info("--> create another snapshot");
|
||||
final SnapshotInfo snapshotInfo = client().admin().cluster().prepareCreateSnapshot(repoName, snapshotName)
|
||||
.setWaitForCompletion(true).get().getSnapshotInfo();
|
||||
assertThat(snapshotInfo.state(), is(SnapshotState.SUCCESS));
|
||||
final int successfulShards = snapshotInfo.successfulShards();
|
||||
assertThat(successfulShards, greaterThan(0));
|
||||
assertThat(successfulShards, equalTo(snapshotInfo.totalShards()));
|
||||
}
|
||||
|
||||
private void assertRepositoryBlocked(Client client, String repo, String existingSnapshot) {
|
||||
logger.info("--> try to delete snapshot");
|
||||
final RepositoryException repositoryException3 = expectThrows(RepositoryException.class,
|
||||
|
|
|
@ -1226,7 +1226,9 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
loaded = repositoryDataFromCachedEntry(cached);
|
||||
} else {
|
||||
loaded = getRepositoryData(genToLoad);
|
||||
cacheRepositoryData(loaded);
|
||||
// We can cache in the most recent version here without regard to the actual repository metadata version since we're
|
||||
// only caching the information that we just wrote and thus won't accidentally cache any information that isn't safe
|
||||
cacheRepositoryData(loaded, true);
|
||||
}
|
||||
listener.onResponse(loaded);
|
||||
return;
|
||||
|
@ -1262,15 +1264,16 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
* generation will always contain the same {@link RepositoryData}.
|
||||
*
|
||||
* @param updated RepositoryData to cache if newer than the cache contents
|
||||
* @param writeShardGens whether to cache shard generation values
|
||||
*/
|
||||
private void cacheRepositoryData(RepositoryData updated) {
|
||||
private void cacheRepositoryData(RepositoryData updated, boolean writeShardGens) {
|
||||
if (cacheRepositoryData && bestEffortConsistency == false) {
|
||||
final BytesReference serialized;
|
||||
BytesStreamOutput out = new BytesStreamOutput();
|
||||
try {
|
||||
try (StreamOutput tmp = CompressorFactory.COMPRESSOR.streamOutput(out);
|
||||
XContentBuilder builder = XContentFactory.jsonBuilder(tmp)) {
|
||||
updated.snapshotsToXContent(builder, true);
|
||||
updated.snapshotsToXContent(builder, writeShardGens);
|
||||
}
|
||||
serialized = out.bytes();
|
||||
final int len = serialized.length();
|
||||
|
@ -1556,7 +1559,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
@Override
|
||||
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
|
||||
final RepositoryData writtenRepositoryData = filteredRepositoryData.withGenId(newGen);
|
||||
cacheRepositoryData(writtenRepositoryData);
|
||||
cacheRepositoryData(writtenRepositoryData, writeShardGens);
|
||||
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.supply(listener, () -> {
|
||||
// Delete all now outdated index files up to 1000 blobs back from the new generation.
|
||||
// If there are more than 1000 dangling index-N cleanup functionality on repo delete will take care of them.
|
||||
|
|
|
@ -93,6 +93,10 @@ public abstract class AbstractSnapshotIntegTestCase extends ESIntegTestCase {
|
|||
skipRepoConsistencyCheckReason = reason;
|
||||
}
|
||||
|
||||
protected RepositoryData getRepositoryData(String repository) {
|
||||
return getRepositoryData(internalCluster().getMasterNodeInstance(RepositoriesService.class).repository(repository));
|
||||
}
|
||||
|
||||
protected RepositoryData getRepositoryData(Repository repository) {
|
||||
ThreadPool threadPool = internalCluster().getInstance(ThreadPool.class, internalCluster().getMasterName());
|
||||
final PlainActionFuture<RepositoryData> repositoryData = PlainActionFuture.newFuture();
|
||||
|
|
Loading…
Reference in New Issue