Stop Serializing RepositoryData Twice when Writing (#60107) (#60269)

We can save one round of serializing `RepositoryData` on the write path.
This also leads to somewhat better compression because we compress larger chunks
in one go potentially when compared to serializing and compressing in one go.
Also, fixed the double wrapping of collections when copying the repository
data instance via the `withGenId`.
This commit is contained in:
Armin Braun 2020-07-28 11:42:14 +02:00 committed by GitHub
parent a55c869aab
commit d39622e17e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 38 additions and 26 deletions

View File

@ -69,7 +69,8 @@ public final class RepositoryData {
* An instance initialized for an empty repository.
*/
public static final RepositoryData EMPTY = new RepositoryData(EMPTY_REPO_GEN, Collections.emptyMap(), Collections.emptyMap(),
Collections.emptyMap(), Collections.emptyMap(), ShardGenerations.EMPTY, IndexMetaDataGenerations.EMPTY);
Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), ShardGenerations.EMPTY,
IndexMetaDataGenerations.EMPTY);
/**
* The generational id of the index file from which the repository data was read.
@ -107,13 +108,23 @@ public final class RepositoryData {
public RepositoryData(long genId, Map<String, SnapshotId> snapshotIds, Map<String, SnapshotState> snapshotStates,
Map<String, Version> snapshotVersions, Map<IndexId, List<SnapshotId>> indexSnapshots,
ShardGenerations shardGenerations, IndexMetaDataGenerations indexMetaDataGenerations) {
this(genId, Collections.unmodifiableMap(snapshotIds), Collections.unmodifiableMap(snapshotStates),
Collections.unmodifiableMap(snapshotVersions),
Collections.unmodifiableMap(
indexSnapshots.keySet().stream().collect(Collectors.toMap(IndexId::getName, Function.identity()))),
Collections.unmodifiableMap(indexSnapshots), shardGenerations, indexMetaDataGenerations);
}
private RepositoryData(long genId, Map<String, SnapshotId> snapshotIds, Map<String, SnapshotState> snapshotStates,
Map<String, Version> snapshotVersions, Map<String, IndexId> indices,
Map<IndexId, List<SnapshotId>> indexSnapshots, ShardGenerations shardGenerations,
IndexMetaDataGenerations indexMetaDataGenerations) {
this.genId = genId;
this.snapshotIds = Collections.unmodifiableMap(snapshotIds);
this.snapshotStates = Collections.unmodifiableMap(snapshotStates);
this.indices = Collections.unmodifiableMap(indexSnapshots.keySet().stream()
.collect(Collectors.toMap(IndexId::getName, Function.identity())));
this.indexSnapshots = Collections.unmodifiableMap(indexSnapshots);
this.shardGenerations = Objects.requireNonNull(shardGenerations);
this.snapshotIds = snapshotIds;
this.snapshotStates = snapshotStates;
this.indices = indices;
this.indexSnapshots = indexSnapshots;
this.shardGenerations = shardGenerations;
this.indexMetaDataGenerations = indexMetaDataGenerations;
this.snapshotVersions = snapshotVersions;
assert indices.values().containsAll(shardGenerations.indices()) : "ShardGenerations contained indices "
@ -304,8 +315,8 @@ public final class RepositoryData {
if (newGeneration == genId) {
return this;
}
return new RepositoryData(
newGeneration, snapshotIds, snapshotStates, snapshotVersions, indexSnapshots, shardGenerations, indexMetaDataGenerations);
return new RepositoryData(newGeneration, snapshotIds, snapshotStates, snapshotVersions, indices, indexSnapshots, shardGenerations,
indexMetaDataGenerations);
}
/**

View File

@ -82,7 +82,6 @@ import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
@ -1265,9 +1264,11 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
loaded = repositoryDataFromCachedEntry(cached);
} else {
loaded = getRepositoryData(genToLoad);
// We can cache in the most recent version here without regard to the actual repository metadata version since we're
// only caching the information that we just wrote and thus won't accidentally cache any information that isn't safe
cacheRepositoryData(loaded, Version.CURRENT);
// We can cache serialized in the most recent version here without regard to the actual repository metadata version
// since we're only caching the information that we just wrote and thus won't accidentally cache any information that
// isn't safe
cacheRepositoryData(
BytesReference.bytes(loaded.snapshotsToXContent(XContentFactory.jsonBuilder(), Version.CURRENT)), genToLoad);
}
listener.onResponse(loaded);
return;
@ -1302,17 +1303,16 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
* modification can lead to moving from a higher {@code N} to a lower {@code N} value which mean we can't safely assume that a given
* generation will always contain the same {@link RepositoryData}.
*
* @param updated RepositoryData to cache if newer than the cache contents
* @param version version of the repository metadata that was cached
* @param updated serialized RepositoryData to cache if newer than the cache contents
* @param generation repository generation of the given repository data
*/
private void cacheRepositoryData(RepositoryData updated, Version version) {
private void cacheRepositoryData(BytesReference updated, long generation) {
if (cacheRepositoryData && bestEffortConsistency == false) {
final BytesReference serialized;
BytesStreamOutput out = new BytesStreamOutput();
try {
try (StreamOutput tmp = CompressorFactory.COMPRESSOR.streamOutput(out);
XContentBuilder builder = XContentFactory.jsonBuilder(tmp)) {
updated.snapshotsToXContent(builder, version);
try (StreamOutput tmp = CompressorFactory.COMPRESSOR.streamOutput(out)) {
updated.writeTo(tmp);
}
serialized = out.bytes();
final int len = serialized.length();
@ -1334,10 +1334,10 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
return;
}
latestKnownRepositoryData.updateAndGet(known -> {
if (known != null && known.v1() > updated.getGenId()) {
if (known != null && known.v1() > generation) {
return known;
}
return new Tuple<>(updated.getGenId(), serialized);
return new Tuple<>(generation, serialized);
});
}
}
@ -1548,6 +1548,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
})), listener::onFailure);
filterRepositoryDataStep.whenComplete(filteredRepositoryData -> {
final long newGen = setPendingStep.result();
final RepositoryData newRepositoryData = filteredRepositoryData.withGenId(newGen);
if (latestKnownRepoGen.get() >= newGen) {
throw new IllegalArgumentException(
"Tried writing generation [" + newGen + "] but repository is at least at generation [" + latestKnownRepoGen.get()
@ -1559,8 +1560,9 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
}
final String indexBlob = INDEX_FILE_PREFIX + Long.toString(newGen);
logger.debug("Repository [{}] writing new index generational blob [{}]", metadata.name(), indexBlob);
writeAtomic(blobContainer(), indexBlob,
BytesReference.bytes(filteredRepositoryData.snapshotsToXContent(XContentFactory.jsonBuilder(), version)), true);
final BytesReference serializedRepoData =
BytesReference.bytes(newRepositoryData.snapshotsToXContent(XContentFactory.jsonBuilder(), version));
writeAtomic(blobContainer(), indexBlob, serializedRepoData, true);
// write the current generation to the index-latest file
final BytesReference genBytes;
try (BytesStreamOutput bStream = new BytesStreamOutput()) {
@ -1600,8 +1602,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
final RepositoryData writtenRepositoryData = filteredRepositoryData.withGenId(newGen);
cacheRepositoryData(writtenRepositoryData, version);
cacheRepositoryData(serializedRepoData, newGen);
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.supply(listener, () -> {
// Delete all now outdated index files up to 1000 blobs back from the new generation.
// If there are more than 1000 dangling index-N cleanup functionality on repo delete will take care of them.
@ -1616,7 +1617,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
} catch (IOException e) {
logger.warn(() -> new ParameterizedMessage("Failed to clean up old index blobs {}", oldIndexN), e);
}
return writtenRepositoryData;
return newRepositoryData;
}));
}
});