This is intended as a stop-gap solution/improvement to #38941 that prevents repo modifications without an intermittent master failover from causing inconsistent (outdated due to inconsistent listing of index-N blobs) `RepositoryData` to be written. Tracking the latest repository generation will move to the cluster state in a separate pull request. This is intended as a low-risk change to be backported as far as possible and motived by the recently increased chance of #38941 causing trouble via SLM (see https://github.com/elastic/elasticsearch/issues/47520). Closes #47834 Closes #49048
This commit is contained in:
parent
a370008856
commit
fc505aaa76
|
@ -113,6 +113,7 @@ import java.util.concurrent.BlockingQueue;
|
|||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
|
@ -372,7 +373,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
} else {
|
||||
try {
|
||||
final Map<String, BlobMetaData> rootBlobs = blobContainer().listBlobs();
|
||||
final RepositoryData repositoryData = getRepositoryData(latestGeneration(rootBlobs.keySet()));
|
||||
final RepositoryData repositoryData = safeRepositoryData(repositoryStateId, rootBlobs);
|
||||
// Cache the indices that were found before writing out the new index-N blob so that a stuck master will never
|
||||
// delete an index that was created by another master node after writing this index-N blob.
|
||||
final Map<String, BlobContainer> foundIndices = blobStore().blobContainer(indicesPath()).children();
|
||||
|
@ -383,6 +384,30 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Loads {@link RepositoryData} ensuring that it is consistent with the given {@code rootBlobs} as well of the assumed generation.
|
||||
*
|
||||
* @param repositoryStateId Expected repository generation
|
||||
* @param rootBlobs Blobs at the repository root
|
||||
* @return RepositoryData
|
||||
*/
|
||||
private RepositoryData safeRepositoryData(long repositoryStateId, Map<String, BlobMetaData> rootBlobs) {
|
||||
final long generation = latestGeneration(rootBlobs.keySet());
|
||||
final long genToLoad = latestKnownRepoGen.updateAndGet(known -> Math.max(known, repositoryStateId));
|
||||
if (genToLoad > generation) {
|
||||
// It's always a possibility to not see the latest index-N in the listing here on an eventually consistent blob store, just
|
||||
// debug log it. Any blobs leaked as a result of an inconsistent listing here will be cleaned up in a subsequent cleanup or
|
||||
// snapshot delete run anyway.
|
||||
logger.debug("Determined repository's generation from its contents to [" + generation + "] but " +
|
||||
"current generation is at least [" + genToLoad + "]");
|
||||
}
|
||||
if (genToLoad != repositoryStateId) {
|
||||
throw new RepositoryException(metadata.name(), "concurrent modification of the index-N file, expected current generation [" +
|
||||
repositoryStateId + "], actual current generation [" + genToLoad + "]");
|
||||
}
|
||||
return getRepositoryData(genToLoad);
|
||||
}
|
||||
|
||||
/**
|
||||
* After updating the {@link RepositoryData} each of the shards directories is individually first moved to the next shard generation
|
||||
* and then has all now unreferenced blobs in it deleted.
|
||||
|
@ -610,14 +635,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
if (isReadOnly()) {
|
||||
throw new RepositoryException(metadata.name(), "cannot run cleanup on readonly repository");
|
||||
}
|
||||
final RepositoryData repositoryData = getRepositoryData();
|
||||
if (repositoryData.getGenId() != repositoryStateId) {
|
||||
// Check that we are working on the expected repository version before gathering the data to clean up
|
||||
throw new RepositoryException(metadata.name(), "concurrent modification of the repository before cleanup started, " +
|
||||
"expected current generation [" + repositoryStateId + "], actual current generation ["
|
||||
+ repositoryData.getGenId() + "]");
|
||||
}
|
||||
Map<String, BlobMetaData> rootBlobs = blobContainer().listBlobs();
|
||||
final RepositoryData repositoryData = safeRepositoryData(repositoryStateId, rootBlobs);
|
||||
final Map<String, BlobContainer> foundIndices = blobStore().blobContainer(indicesPath()).children();
|
||||
final Set<String> survivingIndexIds =
|
||||
repositoryData.getIndices().values().stream().map(IndexId::getId).collect(Collectors.toSet());
|
||||
|
@ -903,13 +922,37 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
}
|
||||
}
|
||||
|
||||
// Tracks the latest known repository generation in a best-effort way to detect inconsistent listing of root level index-N blobs
|
||||
// and concurrent modifications.
|
||||
// Protected for use in MockEventuallyConsistentRepository
|
||||
protected final AtomicLong latestKnownRepoGen = new AtomicLong(RepositoryData.EMPTY_REPO_GEN);
|
||||
|
||||
@Override
|
||||
public RepositoryData getRepositoryData() {
|
||||
// Retry loading RepositoryData in a loop in case we run into concurrent modifications of the repository.
|
||||
while (true) {
|
||||
final long generation;
|
||||
try {
|
||||
return getRepositoryData(latestIndexBlobId());
|
||||
generation = latestIndexBlobId();
|
||||
} catch (IOException ioe) {
|
||||
throw new RepositoryException(metadata.name(), "Could not determine repository generation from root blobs", ioe);
|
||||
}
|
||||
final long genToLoad = latestKnownRepoGen.updateAndGet(known -> Math.max(known, generation));
|
||||
if (genToLoad > generation) {
|
||||
logger.info("Determined repository generation [" + generation
|
||||
+ "] from repository contents but correct generation must be at least [" + genToLoad + "]");
|
||||
}
|
||||
try {
|
||||
return getRepositoryData(genToLoad);
|
||||
} catch (RepositoryException e) {
|
||||
if (genToLoad != latestKnownRepoGen.get()) {
|
||||
logger.warn("Failed to load repository data generation [" + genToLoad +
|
||||
"] because a concurrent operation moved the current generation to [" + latestKnownRepoGen.get() + "]", e);
|
||||
continue;
|
||||
}
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private RepositoryData getRepositoryData(long indexGen) {
|
||||
|
@ -926,6 +969,12 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
return RepositoryData.snapshotsFromXContent(parser, indexGen);
|
||||
}
|
||||
} catch (IOException ioe) {
|
||||
// If we fail to load the generation we tracked in latestKnownRepoGen we reset it.
|
||||
// This is done as a fail-safe in case a user manually deletes the contents of the repository in which case subsequent
|
||||
// operations must start from the EMPTY_REPO_GEN again
|
||||
if (latestKnownRepoGen.compareAndSet(indexGen, RepositoryData.EMPTY_REPO_GEN)) {
|
||||
logger.warn("Resetting repository generation tracker because we failed to read generation [" + indexGen + "]", ioe);
|
||||
}
|
||||
throw new RepositoryException(metadata.name(), "could not read repository data from index blob", ioe);
|
||||
}
|
||||
}
|
||||
|
@ -951,11 +1000,21 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
"] - possibly due to simultaneous snapshot deletion requests");
|
||||
}
|
||||
final long newGen = currentGen + 1;
|
||||
if (latestKnownRepoGen.get() >= newGen) {
|
||||
throw new IllegalArgumentException(
|
||||
"Tried writing generation [" + newGen + "] but repository is at least at generation [" + newGen + "] already");
|
||||
}
|
||||
// write the index file
|
||||
final String indexBlob = INDEX_FILE_PREFIX + Long.toString(newGen);
|
||||
logger.debug("Repository [{}] writing new index generational blob [{}]", metadata.name(), indexBlob);
|
||||
writeAtomic(indexBlob,
|
||||
BytesReference.bytes(repositoryData.snapshotsToXContent(XContentFactory.jsonBuilder(), writeShardGens)), true);
|
||||
final long latestKnownGen = latestKnownRepoGen.updateAndGet(known -> Math.max(known, newGen));
|
||||
if (newGen < latestKnownGen) {
|
||||
// Don't mess up the index.latest blob
|
||||
throw new IllegalStateException(
|
||||
"Wrote generation [" + newGen + "] but latest known repo gen concurrently changed to [" + latestKnownGen + "]");
|
||||
}
|
||||
// write the current generation to the index-latest file
|
||||
final BytesReference genBytes;
|
||||
try (BytesStreamOutput bStream = new BytesStreamOutput()) {
|
||||
|
|
|
@ -396,7 +396,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
|
|||
final StepListener<CreateSnapshotResponse> createAnotherSnapshotResponseStepListener = new StepListener<>();
|
||||
|
||||
continueOrDie(deleteSnapshotStepListener, acknowledgedResponse -> masterNode.client.admin().cluster()
|
||||
.prepareCreateSnapshot(repoName, snapshotName).execute(createAnotherSnapshotResponseStepListener));
|
||||
.prepareCreateSnapshot(repoName, snapshotName).setWaitForCompletion(true).execute(createAnotherSnapshotResponseStepListener));
|
||||
continueOrDie(createAnotherSnapshotResponseStepListener, createSnapshotResponse ->
|
||||
assertEquals(createSnapshotResponse.getSnapshotInfo().state(), SnapshotState.SUCCESS));
|
||||
|
||||
|
@ -1146,7 +1146,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
|
|||
} else {
|
||||
return metaData -> {
|
||||
final Repository repository = new MockEventuallyConsistentRepository(
|
||||
metaData, xContentRegistry(), deterministicTaskQueue.getThreadPool(), blobStoreContext);
|
||||
metaData, xContentRegistry(), deterministicTaskQueue.getThreadPool(), blobStoreContext, random());
|
||||
repository.start();
|
||||
return repository;
|
||||
};
|
||||
|
|
|
@ -43,9 +43,11 @@ import java.io.IOException;
|
|||
import java.io.InputStream;
|
||||
import java.nio.file.NoSuchFileException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.function.Function;
|
||||
|
@ -63,18 +65,22 @@ import static org.hamcrest.Matchers.equalTo;
|
|||
*/
|
||||
public class MockEventuallyConsistentRepository extends BlobStoreRepository {
|
||||
|
||||
private final Random random;
|
||||
|
||||
private final Context context;
|
||||
|
||||
private final NamedXContentRegistry namedXContentRegistry;
|
||||
|
||||
public MockEventuallyConsistentRepository(
|
||||
RepositoryMetaData metadata,
|
||||
NamedXContentRegistry namedXContentRegistry,
|
||||
ThreadPool threadPool,
|
||||
Context context) {
|
||||
final RepositoryMetaData metadata,
|
||||
final NamedXContentRegistry namedXContentRegistry,
|
||||
final ThreadPool threadPool,
|
||||
final Context context,
|
||||
final Random random) {
|
||||
super(metadata, false, namedXContentRegistry, threadPool);
|
||||
this.context = context;
|
||||
this.namedXContentRegistry = namedXContentRegistry;
|
||||
this.random = random;
|
||||
}
|
||||
|
||||
// Filters out all actions that are super-seeded by subsequent actions
|
||||
|
@ -111,6 +117,9 @@ public class MockEventuallyConsistentRepository extends BlobStoreRepository {
|
|||
*/
|
||||
public static final class Context {
|
||||
|
||||
// Eventual consistency is only simulated as long as this flag is false
|
||||
private boolean consistent;
|
||||
|
||||
private final List<BlobStoreAction> actions = new ArrayList<>();
|
||||
|
||||
/**
|
||||
|
@ -121,6 +130,7 @@ public class MockEventuallyConsistentRepository extends BlobStoreRepository {
|
|||
final List<BlobStoreAction> consistentActions = consistentView(actions);
|
||||
actions.clear();
|
||||
actions.addAll(consistentActions);
|
||||
consistent = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -244,14 +254,14 @@ public class MockEventuallyConsistentRepository extends BlobStoreRepository {
|
|||
ensureNotClosed();
|
||||
final String thisPath = path.buildAsString();
|
||||
synchronized (context.actions) {
|
||||
return consistentView(context.actions).stream()
|
||||
return maybeMissLatestIndexN(consistentView(context.actions).stream()
|
||||
.filter(
|
||||
action -> action.path.startsWith(thisPath) && action.path.substring(thisPath.length()).indexOf('/') == -1
|
||||
&& action.operation == Operation.PUT)
|
||||
.collect(
|
||||
Collectors.toMap(
|
||||
action -> action.path.substring(thisPath.length()),
|
||||
action -> new PlainBlobMetaData(action.path.substring(thisPath.length()), action.data.length)));
|
||||
action -> new PlainBlobMetaData(action.path.substring(thisPath.length()), action.data.length))));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -272,9 +282,21 @@ public class MockEventuallyConsistentRepository extends BlobStoreRepository {
|
|||
|
||||
@Override
|
||||
public Map<String, BlobMetaData> listBlobsByPrefix(String blobNamePrefix) {
|
||||
return
|
||||
listBlobs().entrySet().stream().filter(entry -> entry.getKey().startsWith(blobNamePrefix)).collect(
|
||||
Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
|
||||
return maybeMissLatestIndexN(
|
||||
listBlobs().entrySet().stream().filter(entry -> entry.getKey().startsWith(blobNamePrefix))
|
||||
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
|
||||
}
|
||||
|
||||
// Randomly filter out the latest /index-N blob from a listing to test that tracking of it in latestKnownRepoGen
|
||||
// overrides an inconsistent listing
|
||||
private Map<String, BlobMetaData> maybeMissLatestIndexN(Map<String, BlobMetaData> listing) {
|
||||
// Only filter out latest index-N at the repo root and only as long as we're not in a forced consistent state
|
||||
if (path.parent() == null && context.consistent == false && random.nextBoolean()) {
|
||||
final Map<String, BlobMetaData> filtered = new HashMap<>(listing);
|
||||
filtered.remove(BlobStoreRepository.INDEX_FILE_PREFIX + latestKnownRepoGen.get());
|
||||
return Collections.unmodifiableMap(filtered);
|
||||
}
|
||||
return listing;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -50,7 +50,7 @@ public class MockEventuallyConsistentRepositoryTests extends ESTestCase {
|
|||
MockEventuallyConsistentRepository.Context blobStoreContext = new MockEventuallyConsistentRepository.Context();
|
||||
try (BlobStoreRepository repository = new MockEventuallyConsistentRepository(
|
||||
new RepositoryMetaData("testRepo", "mockEventuallyConsistent", Settings.EMPTY),
|
||||
xContentRegistry(), mock(ThreadPool.class), blobStoreContext)) {
|
||||
xContentRegistry(), mock(ThreadPool.class), blobStoreContext, random())) {
|
||||
repository.start();
|
||||
final BlobContainer blobContainer = repository.blobStore().blobContainer(repository.basePath());
|
||||
final String blobName = randomAlphaOfLength(10);
|
||||
|
@ -70,7 +70,7 @@ public class MockEventuallyConsistentRepositoryTests extends ESTestCase {
|
|||
MockEventuallyConsistentRepository.Context blobStoreContext = new MockEventuallyConsistentRepository.Context();
|
||||
try (BlobStoreRepository repository = new MockEventuallyConsistentRepository(
|
||||
new RepositoryMetaData("testRepo", "mockEventuallyConsistent", Settings.EMPTY),
|
||||
xContentRegistry(), mock(ThreadPool.class), blobStoreContext)) {
|
||||
xContentRegistry(), mock(ThreadPool.class), blobStoreContext, random())) {
|
||||
repository.start();
|
||||
final BlobContainer blobContainer = repository.blobStore().blobContainer(repository.basePath());
|
||||
final String blobName = randomAlphaOfLength(10);
|
||||
|
@ -86,7 +86,7 @@ public class MockEventuallyConsistentRepositoryTests extends ESTestCase {
|
|||
MockEventuallyConsistentRepository.Context blobStoreContext = new MockEventuallyConsistentRepository.Context();
|
||||
try (BlobStoreRepository repository = new MockEventuallyConsistentRepository(
|
||||
new RepositoryMetaData("testRepo", "mockEventuallyConsistent", Settings.EMPTY),
|
||||
xContentRegistry(), mock(ThreadPool.class), blobStoreContext)) {
|
||||
xContentRegistry(), mock(ThreadPool.class), blobStoreContext, random())) {
|
||||
repository.start();
|
||||
final BlobContainer blobContainer = repository.blobStore().blobContainer(repository.basePath());
|
||||
final String blobName = randomAlphaOfLength(10);
|
||||
|
@ -104,7 +104,7 @@ public class MockEventuallyConsistentRepositoryTests extends ESTestCase {
|
|||
MockEventuallyConsistentRepository.Context blobStoreContext = new MockEventuallyConsistentRepository.Context();
|
||||
try (BlobStoreRepository repository = new MockEventuallyConsistentRepository(
|
||||
new RepositoryMetaData("testRepo", "mockEventuallyConsistent", Settings.EMPTY),
|
||||
xContentRegistry(), mock(ThreadPool.class), blobStoreContext)) {
|
||||
xContentRegistry(), mock(ThreadPool.class), blobStoreContext, random())) {
|
||||
repository.start();
|
||||
final BlobContainer container = repository.blobStore().blobContainer(repository.basePath());
|
||||
final String blobName = randomAlphaOfLength(10);
|
||||
|
@ -121,7 +121,7 @@ public class MockEventuallyConsistentRepositoryTests extends ESTestCase {
|
|||
MockEventuallyConsistentRepository.Context blobStoreContext = new MockEventuallyConsistentRepository.Context();
|
||||
try (BlobStoreRepository repository = new MockEventuallyConsistentRepository(
|
||||
new RepositoryMetaData("testRepo", "mockEventuallyConsistent", Settings.EMPTY),
|
||||
xContentRegistry(), mock(ThreadPool.class), blobStoreContext)) {
|
||||
xContentRegistry(), mock(ThreadPool.class), blobStoreContext, random())) {
|
||||
repository.start();
|
||||
final BlobContainer container =
|
||||
repository.blobStore().blobContainer(repository.basePath().add("indices").add("someindex").add("0"));
|
||||
|
@ -143,7 +143,7 @@ public class MockEventuallyConsistentRepositoryTests extends ESTestCase {
|
|||
new ThreadPool.Info(ThreadPool.Names.SNAPSHOT, ThreadPool.ThreadPoolType.FIXED, randomIntBetween(1, 10)));
|
||||
try (BlobStoreRepository repository = new MockEventuallyConsistentRepository(
|
||||
new RepositoryMetaData("testRepo", "mockEventuallyConsistent", Settings.EMPTY),
|
||||
xContentRegistry(), threadPool, blobStoreContext)) {
|
||||
xContentRegistry(), threadPool, blobStoreContext, random())) {
|
||||
repository.start();
|
||||
|
||||
// We create a snap- blob for snapshot "foo" in the first generation
|
||||
|
|
Loading…
Reference in New Issue