Add Caching for RepositoryData in BlobStoreRepository (#52341) (#52566)

Cache latest `RepositoryData` on heap when it's absolutely safe to do so (i.e. when the repository is in strictly consistent mode).

`RepositoryData` can safely be assumed to not grow to a size that would cause trouble because we often have at least two copies of it loaded at the same time when doing repository operations. Also, concurrent snapshot API status requests currently load it independently of each other and so on, making it safe to cache on heap and assume as "small" IMO.

The benefits of this move are:
* Much faster repository status API calls
   * listing all snapshot names becomes instant
   * Other operations are sped up massively too because they mostly operate in two steps: load repository data then load multiple other blobs to get the additional data
* Additional cloud cost savings
* Better resiliency, saving another spot where an IO issue could break the snapshot
* We can simplify a number of spots in the current code that currently pass around the repository data in tricky ways to avoid loading it multiple times in follow ups.
This commit is contained in:
Armin Braun 2020-02-21 10:20:07 +01:00 committed by GitHub
parent aff693bc9f
commit 0a09e15959
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 102 additions and 7 deletions

View File

@ -75,6 +75,8 @@ public class S3BlobStoreRepositoryTests extends ESMockAPIBasedRepositoryIntegTes
.put(super.repositorySettings())
.put(S3Repository.BUCKET_SETTING.getKey(), "bucket")
.put(S3Repository.CLIENT_NAME.getKey(), "test")
// Don't cache repository data because some tests manually modify the repository data
.put(BlobStoreRepository.CACHE_REPOSITORY_DATA.getKey(), false)
.build();
}

View File

@ -64,9 +64,11 @@ import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.compress.NotXContentException;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.store.InputStreamIndexInput;
import org.elasticsearch.common.metrics.CounterMetric;
@ -77,6 +79,7 @@ import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
@ -130,6 +133,7 @@ import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
@ -208,8 +212,16 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
public static final Setting<Boolean> ALLOW_CONCURRENT_MODIFICATION =
Setting.boolSetting("allow_concurrent_modifications", false, Setting.Property.Deprecated);
/**
* Setting to disable caching of the latest repository data.
*/
public static final Setting<Boolean> CACHE_REPOSITORY_DATA =
Setting.boolSetting("cache_repository_data", true, Setting.Property.Deprecated);
private final boolean compress;
private final boolean cacheRepositoryData;
private final RateLimiter snapshotRateLimiter;
private final RateLimiter restoreRateLimiter;
@ -284,8 +296,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
snapshotRateLimiter = getRateLimiter(metadata.settings(), "max_snapshot_bytes_per_sec", new ByteSizeValue(40, ByteSizeUnit.MB));
restoreRateLimiter = getRateLimiter(metadata.settings(), "max_restore_bytes_per_sec", new ByteSizeValue(40, ByteSizeUnit.MB));
readOnly = metadata.settings().getAsBoolean("readonly", false);
cacheRepositoryData = CACHE_REPOSITORY_DATA.get(metadata.settings());
indexShardSnapshotFormat = new ChecksumBlobStoreFormat<>(SNAPSHOT_CODEC, SNAPSHOT_NAME_FORMAT,
BlobStoreIndexShardSnapshot::fromXContent, namedXContentRegistry, compress);
indexShardSnapshotsFormat = new ChecksumBlobStoreFormat<>(SNAPSHOT_INDEX_CODEC, SNAPSHOT_INDEX_NAME_FORMAT,
@ -522,13 +533,16 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
* @param rootBlobs Blobs at the repository root
* @return RepositoryData
*/
private RepositoryData safeRepositoryData(long repositoryStateId, Map<String, BlobMetaData> rootBlobs) {
private RepositoryData safeRepositoryData(long repositoryStateId, Map<String, BlobMetaData> rootBlobs) throws IOException {
final long generation = latestGeneration(rootBlobs.keySet());
final long genToLoad;
final Tuple<Long, BytesReference> cached;
if (bestEffortConsistency) {
genToLoad = latestKnownRepoGen.updateAndGet(known -> Math.max(known, repositoryStateId));
cached = null;
} else {
genToLoad = latestKnownRepoGen.get();
cached = latestKnownRepositoryData.get();
}
if (genToLoad > generation) {
// It's always a possibility to not see the latest index-N in the listing here on an eventually consistent blob store, just
@ -541,6 +555,9 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
throw new RepositoryException(metadata.name(), "concurrent modification of the index-N file, expected current generation [" +
repositoryStateId + "], actual current generation [" + genToLoad + "]");
}
if (cached != null && cached.v1() == genToLoad) {
return repositoryDataFromCachedEntry(cached);
}
return getRepositoryData(genToLoad);
}
@ -1069,6 +1086,9 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
// and concurrent modifications.
private final AtomicLong latestKnownRepoGen = new AtomicLong(RepositoryData.UNKNOWN_REPO_GEN);
// Best effort cache of the latest known repository data and its generation, cached serialized as compressed json
private final AtomicReference<Tuple<Long, BytesReference>> latestKnownRepositoryData = new AtomicReference<>();
@Override
public void getRepositoryData(ActionListener<RepositoryData> listener) {
if (latestKnownRepoGen.get() == RepositoryData.CORRUPTED_REPO_GEN) {
@ -1102,7 +1122,16 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
genToLoad = latestKnownRepoGen.get();
}
try {
listener.onResponse(getRepositoryData(genToLoad));
final Tuple<Long, BytesReference> cached = latestKnownRepositoryData.get();
final RepositoryData loaded;
// Caching is not used with #bestEffortConsistency see docs on #cacheRepositoryData for details
if (bestEffortConsistency == false && cached != null && cached.v1() == genToLoad) {
loaded = repositoryDataFromCachedEntry(cached);
} else {
loaded = getRepositoryData(genToLoad);
cacheRepositoryData(loaded);
}
listener.onResponse(loaded);
return;
} catch (RepositoryException e) {
// If the generation to load changed concurrently and we didn't just try loading the same generation before we retry
@ -1128,6 +1157,59 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
}
}
/**
* Puts the given {@link RepositoryData} into the cache if it is of a newer generation and only if the repository is not using
* {@link #bestEffortConsistency}. When using {@link #bestEffortConsistency} the repository is using listing to find the latest
* {@code index-N} blob and there are no hard guarantees that a given repository generation won't be reused since an external
* modification can lead to moving from a higher {@code N} to a lower {@code N} value which mean we can't safely assume that a given
* generation will always contain the same {@link RepositoryData}.
*
* @param updated RepositoryData to cache if newer than the cache contents
*/
private void cacheRepositoryData(RepositoryData updated) {
if (cacheRepositoryData && bestEffortConsistency == false) {
final BytesReference serialized;
BytesStreamOutput out = new BytesStreamOutput();
try {
try (StreamOutput tmp = CompressorFactory.COMPRESSOR.streamOutput(out);
XContentBuilder builder = XContentFactory.jsonBuilder(tmp)) {
updated.snapshotsToXContent(builder, true);
}
serialized = out.bytes();
final int len = serialized.length();
if (len > ByteSizeUnit.KB.toBytes(500)) {
logger.debug("Not caching repository data of size [{}] for repository [{}] because it is larger than 500KB in" +
" serialized size", len, metadata.name());
if (len > ByteSizeUnit.MB.toBytes(5)) {
logger.warn("Your repository metadata blob for repository [{}] is larger than 5MB. Consider moving to a fresh" +
" repository for new snapshots or deleting unneeded snapshots from your repository to ensure stable" +
" repository behavior going forward.", metadata.name());
}
// Set empty repository data to not waste heap for an outdated cached value
latestKnownRepositoryData.set(null);
return;
}
} catch (IOException e) {
assert false : new AssertionError("Impossible, no IO happens here", e);
logger.warn("Failed to serialize repository data", e);
return;
}
latestKnownRepositoryData.updateAndGet(known -> {
if (known != null && known.v1() > updated.getGenId()) {
return known;
}
return new Tuple<>(updated.getGenId(), serialized);
});
}
}
private RepositoryData repositoryDataFromCachedEntry(Tuple<Long, BytesReference> cacheEntry) throws IOException {
return RepositoryData.snapshotsFromXContent(
XContentType.JSON.xContent().createParser(NamedXContentRegistry.EMPTY,
LoggingDeprecationHandler.INSTANCE,
CompressorFactory.COMPRESSOR.streamInput(cacheEntry.v2().streamInput())), cacheEntry.v1());
}
private RepositoryException corruptedStateException(@Nullable Exception cause) {
return new RepositoryException(metadata.name(),
"Could not read repository data because the contents of the repository do not match its " +
@ -1374,6 +1456,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
cacheRepositoryData(filteredRepositoryData.withGenId(newGen));
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.run(listener, () -> {
// Delete all now outdated index files up to 1000 blobs back from the new generation.
// If there are more than 1000 dangling index-N cleanup functionality on repo delete will take care of them.

View File

@ -66,6 +66,8 @@ public class CorruptedBlobStoreRepositoryIT extends AbstractSnapshotIntegTestCas
.setType("fs").setSettings(Settings.builder()
.put("location", repo)
.put("compress", false)
// Don't cache repository data because the test manually modifies the repository data
.put(BlobStoreRepository.CACHE_REPOSITORY_DATA.getKey(), false)
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)));
createIndex("test-idx-1", "test-idx-2");
@ -250,6 +252,8 @@ public class CorruptedBlobStoreRepositoryIT extends AbstractSnapshotIntegTestCas
.setType("fs").setSettings(Settings.builder()
.put("location", repo)
.put("compress", false)
// Don't cache repository data because the test manually modifies the repository data
.put(BlobStoreRepository.CACHE_REPOSITORY_DATA.getKey(), false)
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)));
final String snapshotPrefix = "test-snap-";
@ -315,6 +319,8 @@ public class CorruptedBlobStoreRepositoryIT extends AbstractSnapshotIntegTestCas
assertAcked(client.admin().cluster().preparePutRepository(repoName)
.setType("fs").setSettings(Settings.builder()
.put("location", repo)
// Don't cache repository data because the test manually modifies the repository data
.put(BlobStoreRepository.CACHE_REPOSITORY_DATA.getKey(), false)
.put("compress", false)));
final String snapshot = "test-snap";

View File

@ -36,7 +36,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
*/
public class MockSecureSettings implements SecureSettings {
private Map<String, SecureString> secureStrings = new HashMap<>();
private Map<String, String> secureStrings = new HashMap<>();
private Map<String, byte[]> files = new HashMap<>();
private Map<String, byte[]> sha256Digests = new HashMap<>();
private Set<String> settingNames = new HashSet<>();
@ -65,7 +65,11 @@ public class MockSecureSettings implements SecureSettings {
@Override
public SecureString getString(String setting) {
ensureOpen();
return secureStrings.get(setting);
final String s = secureStrings.get(setting);
if (s == null) {
return null;
}
return new SecureString(s.toCharArray());
}
@Override
@ -81,7 +85,7 @@ public class MockSecureSettings implements SecureSettings {
public void setString(String setting, String value) {
ensureOpen();
secureStrings.put(setting, new SecureString(value.toCharArray()));
secureStrings.put(setting, value);
sha256Digests.put(setting, MessageDigests.sha256().digest(value.getBytes(StandardCharsets.UTF_8)));
settingNames.add(setting);
}