Fix broken numeric shard generations when reading them from the wire or physically from the physical repository. This should be the cheapest way to clean up broken shard generations in a BwC and safe-to-backport manner for now. We can potentially further optimize this by also not doing the checks on the generations based on the versions we see in the `RepositoryData` but I don't think it matters much since we will read `RepositoryData` from cache in almost all cases. Closes #57798
This commit is contained in:
parent
7a1300a09e
commit
0987c0a5f3
|
@ -52,6 +52,7 @@ import java.nio.file.Path;
|
|||
import java.nio.file.StandardOpenOption;
|
||||
import java.util.Collections;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
|
@ -379,7 +380,7 @@ public class CorruptedBlobStoreRepositoryIT extends AbstractSnapshotIntegTestCas
|
|||
NamedXContentRegistry.EMPTY,
|
||||
DeprecationHandler.THROW_UNSUPPORTED_OPERATION,
|
||||
Strings.toString(jsonBuilder).replace(Version.CURRENT.toString(), SnapshotsService.OLD_SNAPSHOT_FORMAT.toString())),
|
||||
repositoryData.getGenId());
|
||||
repositoryData.getGenId(), randomBoolean());
|
||||
Files.write(repoPath.resolve(BlobStoreRepository.INDEX_FILE_PREFIX + repositoryData.getGenId()),
|
||||
BytesReference.toBytes(BytesReference.bytes(
|
||||
downgradedRepoData.snapshotsToXContent(XContentFactory.jsonBuilder(), false))),
|
||||
|
@ -409,6 +410,75 @@ public class CorruptedBlobStoreRepositoryIT extends AbstractSnapshotIntegTestCas
|
|||
assertCreateSnapshotSuccess(repoName, "snapshot-2");
|
||||
}
|
||||
|
||||
public void testRepairBrokenShardGenerations() throws IOException {
|
||||
final String repoName = "test-repo";
|
||||
final Path repoPath = randomRepoPath();
|
||||
createRepository(repoName, "fs", repoPath);
|
||||
|
||||
// Workaround to simulate BwC situation: taking a snapshot without indices here so that we don't create any new version shard
|
||||
// generations (the existence of which would short-circuit checks for the repo containing old version snapshots)
|
||||
final String oldVersionSnapshot = "old-version-snapshot";
|
||||
final CreateSnapshotResponse createSnapshotResponse = client().admin().cluster()
|
||||
.prepareCreateSnapshot(repoName, oldVersionSnapshot).setIndices().setWaitForCompletion(true).get();
|
||||
assertThat(createSnapshotResponse.getSnapshotInfo().totalShards(), is(0));
|
||||
|
||||
logger.info("--> writing downgraded RepositoryData");
|
||||
final RepositoryData repositoryData = getRepositoryData(repoName);
|
||||
final XContentBuilder jsonBuilder = JsonXContent.contentBuilder();
|
||||
repositoryData.snapshotsToXContent(jsonBuilder, false);
|
||||
final RepositoryData downgradedRepoData = RepositoryData.snapshotsFromXContent(JsonXContent.jsonXContent.createParser(
|
||||
NamedXContentRegistry.EMPTY,
|
||||
DeprecationHandler.THROW_UNSUPPORTED_OPERATION,
|
||||
Strings.toString(jsonBuilder).replace(Version.CURRENT.toString(), SnapshotsService.OLD_SNAPSHOT_FORMAT.toString())),
|
||||
repositoryData.getGenId(), randomBoolean());
|
||||
Files.write(repoPath.resolve(BlobStoreRepository.INDEX_FILE_PREFIX + repositoryData.getGenId()),
|
||||
BytesReference.toBytes(BytesReference.bytes(
|
||||
downgradedRepoData.snapshotsToXContent(XContentFactory.jsonBuilder(), false))),
|
||||
StandardOpenOption.TRUNCATE_EXISTING);
|
||||
|
||||
logger.info("--> recreating repository to clear caches");
|
||||
client().admin().cluster().prepareDeleteRepository(repoName).get();
|
||||
createRepository(repoName, "fs", repoPath);
|
||||
|
||||
final String indexName = "test-index";
|
||||
createIndex(indexName);
|
||||
|
||||
assertCreateSnapshotSuccess(repoName, "snapshot-1");
|
||||
|
||||
logger.info("--> delete old version snapshot");
|
||||
client().admin().cluster().prepareDeleteSnapshot(repoName, oldVersionSnapshot).get();
|
||||
|
||||
logger.info("--> move shard level metadata to new generation and make RepositoryData point at an older generation");
|
||||
final IndexId indexId = getRepositoryData(repoName).resolveIndexId(indexName);
|
||||
final Path shardPath = repoPath.resolve("indices").resolve(indexId.getId()).resolve("0");
|
||||
final Path initialShardMetaPath = shardPath.resolve(BlobStoreRepository.INDEX_FILE_PREFIX + "0");
|
||||
assertFileExists(initialShardMetaPath);
|
||||
Files.move(initialShardMetaPath, shardPath.resolve(BlobStoreRepository.INDEX_FILE_PREFIX + randomIntBetween(1, 1000)));
|
||||
|
||||
final RepositoryData repositoryData1 = getRepositoryData(repoName);
|
||||
final Map<String, SnapshotId> snapshotIds =
|
||||
repositoryData1.getSnapshotIds().stream().collect(Collectors.toMap(SnapshotId::getUUID, Function.identity()));
|
||||
final RepositoryData brokenRepoData = new RepositoryData(
|
||||
repositoryData1.getGenId(), snapshotIds, snapshotIds.values().stream().collect(
|
||||
Collectors.toMap(SnapshotId::getUUID, repositoryData1::getSnapshotState)),
|
||||
snapshotIds.values().stream().collect(
|
||||
Collectors.toMap(SnapshotId::getUUID, repositoryData1::getVersion)),
|
||||
repositoryData1.getIndices().values().stream().collect(
|
||||
Collectors.toMap(Function.identity(), repositoryData1::getSnapshots)
|
||||
), ShardGenerations.builder().putAll(repositoryData1.shardGenerations()).put(indexId, 0, "0").build()
|
||||
);
|
||||
Files.write(repoPath.resolve(BlobStoreRepository.INDEX_FILE_PREFIX + repositoryData1.getGenId()),
|
||||
BytesReference.toBytes(BytesReference.bytes(
|
||||
brokenRepoData.snapshotsToXContent(XContentFactory.jsonBuilder(), true))),
|
||||
StandardOpenOption.TRUNCATE_EXISTING);
|
||||
|
||||
logger.info("--> recreating repository to clear caches");
|
||||
client().admin().cluster().prepareDeleteRepository(repoName).get();
|
||||
createRepository(repoName, "fs", repoPath);
|
||||
|
||||
assertCreateSnapshotSuccess(repoName, "snapshot-2");
|
||||
}
|
||||
|
||||
private void assertCreateSnapshotSuccess(String repoName, String snapshotName) {
|
||||
logger.info("--> create another snapshot");
|
||||
final SnapshotInfo snapshotInfo = client().admin().cluster().prepareCreateSnapshot(repoName, snapshotName)
|
||||
|
|
|
@ -427,8 +427,12 @@ public final class RepositoryData {
|
|||
|
||||
/**
|
||||
* Reads an instance of {@link RepositoryData} from x-content, loading the snapshots and indices metadata.
|
||||
*
|
||||
* @param fixBrokenShardGens set to {@code true} to filter out broken shard generations read from the {@code parser} via
|
||||
* {@link ShardGenerations#fixShardGeneration}. Used to disable fixing broken generations when reading
|
||||
* from cached bytes that we trust to not contain broken generations.
|
||||
*/
|
||||
public static RepositoryData snapshotsFromXContent(final XContentParser parser, long genId) throws IOException {
|
||||
public static RepositoryData snapshotsFromXContent(XContentParser parser, long genId, boolean fixBrokenShardGens) throws IOException {
|
||||
final Map<String, SnapshotId> snapshots = new HashMap<>();
|
||||
final Map<String, SnapshotState> snapshotStates = new HashMap<>();
|
||||
final Map<String, Version> snapshotVersions = new HashMap<>();
|
||||
|
@ -532,7 +536,13 @@ public final class RepositoryData {
|
|||
assert indexId != null;
|
||||
indexSnapshots.put(indexId, Collections.unmodifiableList(snapshotIds));
|
||||
for (int i = 0; i < gens.size(); i++) {
|
||||
shardGenerations.put(indexId, i, gens.get(i));
|
||||
String parsedGen = gens.get(i);
|
||||
if (fixBrokenShardGens) {
|
||||
parsedGen = ShardGenerations.fixShardGeneration(parsedGen);
|
||||
}
|
||||
if (parsedGen != null) {
|
||||
shardGenerations.put(indexId, i, parsedGen);
|
||||
}
|
||||
}
|
||||
}
|
||||
} else if (MIN_VERSION.equals(field)) {
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
package org.elasticsearch.repositories;
|
||||
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
|
@ -30,6 +31,7 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.regex.Pattern;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public final class ShardGenerations {
|
||||
|
@ -54,6 +56,24 @@ public final class ShardGenerations {
|
|||
this.shardGenerations = shardGenerations;
|
||||
}
|
||||
|
||||
private static final Pattern IS_NUMBER = Pattern.compile("^\\d+$");
|
||||
|
||||
/**
|
||||
* Filters out unreliable numeric shard generations read from {@link RepositoryData} or {@link IndexShardSnapshotStatus}, returning
|
||||
* {@code null} in their place.
|
||||
* @see <a href="https://github.com/elastic/elasticsearch/issues/57798">Issue #57988</a>
|
||||
*
|
||||
* @param shardGeneration shard generation to fix
|
||||
* @return given shard generation or {@code null} if it was filtered out or {@code null} was passed
|
||||
*/
|
||||
@Nullable
|
||||
public static String fixShardGeneration(@Nullable String shardGeneration) {
|
||||
if (shardGeneration == null) {
|
||||
return null;
|
||||
}
|
||||
return IS_NUMBER.matcher(shardGeneration).matches() ? null : shardGeneration;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the total number of shards tracked by this instance.
|
||||
*/
|
||||
|
|
|
@ -1307,7 +1307,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
return RepositoryData.snapshotsFromXContent(
|
||||
XContentType.JSON.xContent().createParser(NamedXContentRegistry.EMPTY,
|
||||
LoggingDeprecationHandler.INSTANCE,
|
||||
CompressorFactory.COMPRESSOR.streamInput(cacheEntry.v2().streamInput())), cacheEntry.v1());
|
||||
CompressorFactory.COMPRESSOR.streamInput(cacheEntry.v2().streamInput())), cacheEntry.v1(), false);
|
||||
}
|
||||
|
||||
private RepositoryException corruptedStateException(@Nullable Exception cause) {
|
||||
|
@ -1372,7 +1372,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
try (InputStream blob = blobContainer().readBlob(snapshotsIndexBlobName);
|
||||
XContentParser parser = XContentType.JSON.xContent().createParser(NamedXContentRegistry.EMPTY,
|
||||
LoggingDeprecationHandler.INSTANCE, blob)) {
|
||||
return RepositoryData.snapshotsFromXContent(parser, indexGen);
|
||||
return RepositoryData.snapshotsFromXContent(parser, indexGen, true);
|
||||
}
|
||||
} catch (IOException ioe) {
|
||||
if (bestEffortConsistency) {
|
||||
|
@ -1663,7 +1663,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
final ShardId shardId = store.shardId();
|
||||
final long startTime = threadPool.absoluteTimeInMillis();
|
||||
try {
|
||||
final String generation = snapshotStatus.generation();
|
||||
final String generation = ShardGenerations.fixShardGeneration(snapshotStatus.generation());
|
||||
logger.debug("[{}] [{}] snapshot to [{}] [{}] ...", shardId, snapshotId, metadata.name(), generation);
|
||||
final BlobContainer shardContainer = shardContainer(indexId, shardId);
|
||||
final Set<String> blobs;
|
||||
|
@ -2137,6 +2137,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
private Tuple<BlobStoreIndexShardSnapshots, String> buildBlobStoreIndexShardSnapshots(Set<String> blobs,
|
||||
BlobContainer shardContainer,
|
||||
@Nullable String generation) throws IOException {
|
||||
assert ShardGenerations.fixShardGeneration(generation) == generation
|
||||
: "Generation must not be numeric but received [" + generation + "]";
|
||||
if (generation != null) {
|
||||
if (generation.equals(ShardGenerations.NEW_SHARD_GEN)) {
|
||||
return new Tuple<>(BlobStoreIndexShardSnapshots.EMPTY, ShardGenerations.NEW_SHARD_GEN);
|
||||
|
|
|
@ -77,7 +77,7 @@ public class RepositoryDataTests extends ESTestCase {
|
|||
repositoryData.snapshotsToXContent(builder, true);
|
||||
try (XContentParser parser = createParser(JsonXContent.jsonXContent, BytesReference.bytes(builder))) {
|
||||
long gen = (long) randomIntBetween(0, 500);
|
||||
RepositoryData fromXContent = RepositoryData.snapshotsFromXContent(parser, gen);
|
||||
RepositoryData fromXContent = RepositoryData.snapshotsFromXContent(parser, gen, randomBoolean());
|
||||
assertEquals(repositoryData, fromXContent);
|
||||
assertEquals(gen, fromXContent.getGenId());
|
||||
}
|
||||
|
@ -97,14 +97,14 @@ public class RepositoryDataTests extends ESTestCase {
|
|||
IndexId indexId = new IndexId(randomAlphaOfLength(7), UUIDs.randomBase64UUID());
|
||||
newIndices.add(indexId);
|
||||
indices.add(indexId);
|
||||
builder.put(indexId, 0, "1");
|
||||
builder.put(indexId, 0, UUIDs.randomBase64UUID(random()));
|
||||
}
|
||||
int numOld = randomIntBetween(1, indexIdMap.size());
|
||||
List<String> indexNames = new ArrayList<>(indexIdMap.keySet());
|
||||
for (int i = 0; i < numOld; i++) {
|
||||
final IndexId indexId = indexIdMap.get(indexNames.get(i));
|
||||
indices.add(indexId);
|
||||
builder.put(indexId, 0, "2");
|
||||
builder.put(indexId, 0, UUIDs.randomBase64UUID(random()));
|
||||
}
|
||||
RepositoryData newRepoData = repositoryData.addSnapshot(newSnapshot,
|
||||
randomFrom(SnapshotState.SUCCESS, SnapshotState.PARTIAL, SnapshotState.FAILED),
|
||||
|
@ -187,7 +187,7 @@ public class RepositoryDataTests extends ESTestCase {
|
|||
repositoryData.snapshotsToXContent(builder, true);
|
||||
RepositoryData parsedRepositoryData;
|
||||
try (XContentParser xParser = createParser(builder)) {
|
||||
parsedRepositoryData = RepositoryData.snapshotsFromXContent(xParser, repositoryData.getGenId());
|
||||
parsedRepositoryData = RepositoryData.snapshotsFromXContent(xParser, repositoryData.getGenId(), randomBoolean());
|
||||
}
|
||||
assertEquals(repositoryData, parsedRepositoryData);
|
||||
|
||||
|
@ -226,7 +226,7 @@ public class RepositoryDataTests extends ESTestCase {
|
|||
|
||||
try (XContentParser xParser = createParser(corruptedBuilder)) {
|
||||
ElasticsearchParseException e = expectThrows(ElasticsearchParseException.class, () ->
|
||||
RepositoryData.snapshotsFromXContent(xParser, corruptedRepositoryData.getGenId()));
|
||||
RepositoryData.snapshotsFromXContent(xParser, corruptedRepositoryData.getGenId(), randomBoolean()));
|
||||
assertThat(e.getMessage(), equalTo("Detected a corrupted repository, index " + corruptedIndexId + " references an unknown " +
|
||||
"snapshot uuid [_does_not_exist]"));
|
||||
}
|
||||
|
@ -263,7 +263,7 @@ public class RepositoryDataTests extends ESTestCase {
|
|||
|
||||
try (XContentParser xParser = createParser(builder)) {
|
||||
ElasticsearchParseException e = expectThrows(ElasticsearchParseException.class, () ->
|
||||
RepositoryData.snapshotsFromXContent(xParser, randomNonNegativeLong()));
|
||||
RepositoryData.snapshotsFromXContent(xParser, randomNonNegativeLong(), randomBoolean()));
|
||||
assertThat(e.getMessage(), equalTo("Detected a corrupted repository, " +
|
||||
"index [docs/_id] references an unknown snapshot uuid [null]"));
|
||||
}
|
||||
|
|
|
@ -132,7 +132,7 @@ public final class BlobStoreTestUtil {
|
|||
try (InputStream blob = blobContainer.readBlob("index-" + latestGen);
|
||||
XContentParser parser = XContentType.JSON.xContent().createParser(NamedXContentRegistry.EMPTY,
|
||||
LoggingDeprecationHandler.INSTANCE, blob)) {
|
||||
repositoryData = RepositoryData.snapshotsFromXContent(parser, latestGen);
|
||||
repositoryData = RepositoryData.snapshotsFromXContent(parser, latestGen, false);
|
||||
}
|
||||
assertIndexUUIDs(blobContainer, repositoryData);
|
||||
assertSnapshotUUIDs(repository, repositoryData);
|
||||
|
|
|
@ -75,7 +75,7 @@ public abstract class AbstractRepository {
|
|||
// EMPTY is safe here because RepositoryData#fromXContent calls namedObject
|
||||
try (XContentParser parser = XContentHelper.createParser(NamedXContentRegistry.EMPTY,
|
||||
LoggingDeprecationHandler.INSTANCE, out.bytes(), XContentType.JSON)) {
|
||||
return RepositoryData.snapshotsFromXContent(parser, indexFileGeneration);
|
||||
return RepositoryData.snapshotsFromXContent(parser, indexFileGeneration, true);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
terminal.println("Failed to read " + snapshotsIndexBlobName + " file");
|
||||
|
|
Loading…
Reference in New Issue