Speed up RepositoryData Serialization (#62684) (#62703)

Make serializing `RepositoryData` a little faster and split up/document the code for it a little
as well given how massive this method has gotten at this point.
This commit is contained in:
Armin Braun 2020-09-21 17:29:56 +02:00 committed by GitHub
parent a06339ffae
commit 13e28b85ff
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 235 additions and 135 deletions

View File

@ -23,6 +23,7 @@ import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.collect.ImmutableOpenIntMap;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
@ -149,9 +150,11 @@ public class Version implements Comparable<Version>, ToXContentFragment {
public static final Version CURRENT = V_7_10_0;
private static final ImmutableOpenIntMap<Version> idToVersion;
private static final ImmutableOpenMap<String, Version> stringToVersion;
static {
final ImmutableOpenIntMap.Builder<Version> builder = ImmutableOpenIntMap.builder();
final ImmutableOpenMap.Builder<String, Version> builderByString = ImmutableOpenMap.builder();
for (final Field declaredField : Version.class.getFields()) {
if (declaredField.getType().equals(Version.class)) {
@ -178,6 +181,7 @@ public class Version implements Comparable<Version>, ToXContentFragment {
}
}
final Version maybePrevious = builder.put(version.id, version);
builderByString.put(version.toString(), version);
assert maybePrevious == null :
"expected [" + version.id + "] to be uniquely mapped but saw [" + maybePrevious + "] and [" + version + "]";
} catch (final IllegalAccessException e) {
@ -189,7 +193,9 @@ public class Version implements Comparable<Version>, ToXContentFragment {
+ org.apache.lucene.util.Version.LATEST + "] is still set to [" + CURRENT.luceneVersion + "]";
builder.put(V_EMPTY_ID, V_EMPTY);
builderByString.put(V_EMPTY.toString(), V_EMPTY);
idToVersion = builder.build();
stringToVersion = builderByString.build();
}
public static Version readVersion(StreamInput in) throws IOException {
@ -273,6 +279,14 @@ public class Version implements Comparable<Version>, ToXContentFragment {
if (!Strings.hasLength(version)) {
return Version.CURRENT;
}
final Version cached = stringToVersion.get(version);
if (cached != null) {
return cached;
}
return fromStringSlow(version);
}
private static Version fromStringSlow(String version) {
final boolean snapshot; // this is some BWC for 2.x and before indices
if (snapshot = version.endsWith("-SNAPSHOT")) {
version = version.substring(0, version.length() - 9);

View File

@ -168,7 +168,7 @@ public final class RepositoryData {
* Returns an unmodifiable collection of the snapshot ids.
*/
public Collection<SnapshotId> getSnapshotIds() {
return Collections.unmodifiableCollection(snapshotIds.values());
return snapshotIds.values();
}
/**
@ -463,16 +463,23 @@ public final class RepositoryData {
for (final SnapshotId snapshot : getSnapshotIds()) {
builder.startObject();
builder.field(NAME, snapshot.getName());
builder.field(UUID, snapshot.getUUID());
if (snapshotStates.containsKey(snapshot.getUUID())) {
builder.field(STATE, snapshotStates.get(snapshot.getUUID()).value());
final String snapshotUUID = snapshot.getUUID();
builder.field(UUID, snapshotUUID);
final SnapshotState state = snapshotStates.get(snapshotUUID);
if (state != null) {
builder.field(STATE, state.value());
}
if (shouldWriteIndexGens) {
builder.field(INDEX_METADATA_LOOKUP, indexMetaDataGenerations.lookup.getOrDefault(snapshot, Collections.emptyMap())
.entrySet().stream().collect(Collectors.toMap(entry -> entry.getKey().getId(), Map.Entry::getValue)));
builder.startObject(INDEX_METADATA_LOOKUP);
for (Map.Entry<IndexId, String> entry : indexMetaDataGenerations.lookup.getOrDefault(
snapshot, Collections.emptyMap()).entrySet()) {
builder.field(entry.getKey().getId(), entry.getValue());
}
builder.endObject();
}
if (snapshotVersions.containsKey(snapshot.getUUID())) {
builder.field(VERSION, snapshotVersions.get(snapshot.getUUID()).toString());
final Version version = snapshotVersions.get(snapshotUUID);
if (version != null) {
builder.field(VERSION, version.toString());
}
builder.endObject();
}
@ -522,150 +529,229 @@ public final class RepositoryData {
* from cached bytes that we trust to not contain broken generations.
*/
public static RepositoryData snapshotsFromXContent(XContentParser parser, long genId, boolean fixBrokenShardGens) throws IOException {
if (parser.nextToken() != XContentParser.Token.START_OBJECT) {
throw new ElasticsearchParseException("start object expected");
}
final Map<String, SnapshotId> snapshots = new HashMap<>();
final Map<String, SnapshotState> snapshotStates = new HashMap<>();
final Map<String, Version> snapshotVersions = new HashMap<>();
final Map<IndexId, List<SnapshotId>> indexSnapshots = new HashMap<>();
final Map<String, IndexId> indexLookup = new HashMap<>();
final ShardGenerations.Builder shardGenerations = ShardGenerations.builder();
final Map<String, String> indexMetaIdentifiers = new HashMap<>();
final Map<SnapshotId, Map<String, String>> indexMetaLookup = new HashMap<>();
if (parser.nextToken() == XContentParser.Token.START_OBJECT) {
while (parser.nextToken() == XContentParser.Token.FIELD_NAME) {
String field = parser.currentName();
if (SNAPSHOTS.equals(field)) {
if (parser.nextToken() == XContentParser.Token.START_ARRAY) {
while (parser.nextToken() != XContentParser.Token.END_ARRAY) {
String name = null;
String uuid = null;
SnapshotState state = null;
Map<String, String> metaGenerations = new HashMap<>();
Version version = null;
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
String currentFieldName = parser.currentName();
parser.nextToken();
if (NAME.equals(currentFieldName)) {
name = parser.text();
} else if (UUID.equals(currentFieldName)) {
uuid = parser.text();
} else if (STATE.equals(currentFieldName)) {
state = SnapshotState.fromValue(parser.numberValue().byteValue());
} else if (INDEX_METADATA_LOOKUP.equals(currentFieldName)) {
metaGenerations.putAll(parser.mapStrings());
} else if (VERSION.equals(currentFieldName)) {
version = Version.fromString(parser.text());
}
}
final SnapshotId snapshotId = new SnapshotId(name, uuid);
if (state != null) {
snapshotStates.put(uuid, state);
}
if (version != null) {
snapshotVersions.put(uuid, version);
}
snapshots.put(snapshotId.getUUID(), snapshotId);
if (metaGenerations.isEmpty() == false) {
indexMetaLookup.put(snapshotId, metaGenerations);
}
}
} else {
throw new ElasticsearchParseException("expected array for [" + field + "]");
}
} else if (INDICES.equals(field)) {
if (parser.nextToken() != XContentParser.Token.START_OBJECT) {
throw new ElasticsearchParseException("start object expected [indices]");
}
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
final String indexName = parser.currentName();
final List<SnapshotId> snapshotIds = new ArrayList<>();
final List<String> gens = new ArrayList<>();
IndexId indexId = null;
if (parser.nextToken() != XContentParser.Token.START_OBJECT) {
throw new ElasticsearchParseException("start object expected index[" + indexName + "]");
}
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
final String indexMetaFieldName = parser.currentName();
parser.nextToken();
if (INDEX_ID.equals(indexMetaFieldName)) {
indexId = new IndexId(indexName, parser.text());
} else if (SNAPSHOTS.equals(indexMetaFieldName)) {
if (parser.currentToken() != XContentParser.Token.START_ARRAY) {
throw new ElasticsearchParseException("start array expected [snapshots]");
}
while (parser.nextToken() != XContentParser.Token.END_ARRAY) {
String uuid = null;
// the old format pre 5.4.1 which contains the snapshot name and uuid
if (parser.currentToken() == XContentParser.Token.START_OBJECT) {
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
String currentFieldName = parser.currentName();
parser.nextToken();
if (UUID.equals(currentFieldName)) {
uuid = parser.text();
}
}
} else {
// the new format post 5.4.1 that only contains the snapshot uuid,
// since we already have the name/uuid combo in the snapshots array
uuid = parser.text();
}
SnapshotId snapshotId = snapshots.get(uuid);
if (snapshotId != null) {
snapshotIds.add(snapshotId);
} else {
// A snapshotted index references a snapshot which does not exist in
// the list of snapshots. This can happen when multiple clusters in
// different versions create or delete snapshot in the same repository.
throw new ElasticsearchParseException("Detected a corrupted repository, index " + indexId
+ " references an unknown snapshot uuid [" + uuid + "]");
}
}
} else if (SHARD_GENERATIONS.equals(indexMetaFieldName)) {
XContentParserUtils.ensureExpectedToken(
XContentParser.Token.START_ARRAY, parser.currentToken(), parser::getTokenLocation);
while (parser.nextToken() != XContentParser.Token.END_ARRAY) {
gens.add(parser.textOrNull());
}
}
}
assert indexId != null;
indexSnapshots.put(indexId, Collections.unmodifiableList(snapshotIds));
for (int i = 0; i < gens.size(); i++) {
String parsedGen = gens.get(i);
if (fixBrokenShardGens) {
parsedGen = ShardGenerations.fixShardGeneration(parsedGen);
}
if (parsedGen != null) {
shardGenerations.put(indexId, i, parsedGen);
}
}
}
} else if (INDEX_METADATA_IDENTIFIERS.equals(field)) {
Map<String, String> indexMetaIdentifiers = null;
while (parser.nextToken() == XContentParser.Token.FIELD_NAME) {
final String field = parser.currentName();
switch (field) {
case SNAPSHOTS:
parseSnapshots(parser, snapshots, snapshotStates, snapshotVersions, indexMetaLookup);
break;
case INDICES:
parseIndices(parser, fixBrokenShardGens, snapshots, indexSnapshots, indexLookup, shardGenerations);
break;
case INDEX_METADATA_IDENTIFIERS:
if (parser.nextToken() != XContentParser.Token.START_OBJECT) {
throw new ElasticsearchParseException("start object expected [" + INDEX_METADATA_IDENTIFIERS + "]");
}
indexMetaIdentifiers.putAll(parser.mapStrings());
} else if (MIN_VERSION.equals(field)) {
indexMetaIdentifiers = parser.mapStrings();
break;
case MIN_VERSION:
if (parser.nextToken() != XContentParser.Token.VALUE_STRING) {
throw new ElasticsearchParseException("version string expected [min_version]");
}
final Version version = Version.fromString(parser.text());
assert SnapshotsService.useShardGenerations(version);
} else {
throw new ElasticsearchParseException("unknown field name [" + field + "]");
}
break;
default:
XContentParserUtils.throwUnknownField(field, parser.getTokenLocation());
}
} else {
throw new ElasticsearchParseException("start object expected");
}
final Map<String, IndexId> indexLookup =
indexSnapshots.keySet().stream().collect(Collectors.toMap(IndexId::getId, Function.identity()));
return new RepositoryData(genId, snapshots, snapshotStates, snapshotVersions, indexSnapshots, shardGenerations.build(),
new IndexMetaDataGenerations(indexMetaLookup.entrySet().stream().collect(
Collectors.toMap(Map.Entry::getKey, e -> e.getValue().entrySet().stream()
.collect(Collectors.toMap(entry -> indexLookup.get(entry.getKey()), Map.Entry::getValue)))), indexMetaIdentifiers));
buildIndexMetaGenerations(indexMetaLookup, indexLookup, indexMetaIdentifiers));
}
/**
* Builds {@link IndexMetaDataGenerations} instance from the information parsed previously.
*
* @param indexMetaLookup map of {@link SnapshotId} to map of index id (as returned by {@link IndexId#getId}) that defines the
* index metadata generations for the snapshot that was parsed by {@link #parseSnapshots}
* @param indexLookup map of index uuid (as returned by {@link IndexId#getId}) to {@link IndexId} that was parsed by
* {@link #parseIndices}
* @param indexMetaIdentifiers map of index generation to index meta identifiers parsed by {@link #snapshotsFromXContent}
* @return index meta generations instance
*/
private static IndexMetaDataGenerations buildIndexMetaGenerations(Map<SnapshotId, Map<String, String>> indexMetaLookup,
Map<String, IndexId> indexLookup,
Map<String, String> indexMetaIdentifiers) {
if (indexMetaLookup.isEmpty()) {
return IndexMetaDataGenerations.EMPTY;
}
// Build a new map that instead of indexing the per-snapshot index generations by index id string, is indexed by IndexId
final Map<SnapshotId, Map<IndexId, String>> indexGenerations = new HashMap<>(indexMetaLookup.size());
for (Map.Entry<SnapshotId, Map<String, String>> snapshotIdMapEntry : indexMetaLookup.entrySet()) {
final Map<String, String> val = snapshotIdMapEntry.getValue();
final Map<IndexId, String> forSnapshot = new HashMap<>(val.size());
for (Map.Entry<String, String> generationEntry : val.entrySet()) {
forSnapshot.put(indexLookup.get(generationEntry.getKey()), generationEntry.getValue());
}
indexGenerations.put(snapshotIdMapEntry.getKey(), forSnapshot);
}
return new IndexMetaDataGenerations(indexGenerations, indexMetaIdentifiers);
}
/**
* Parses the "snapshots" field and fills maps for the various per snapshot properties. This method must run before
* {@link #parseIndices} which will rely on the maps of snapshot properties to have been populated already.
*
* @param parser x-content parse
* @param snapshots map of snapshot uuid to {@link SnapshotId}
* @param snapshotStates map of snapshot uuid to {@link SnapshotState}
* @param snapshotVersions map of snapshot uuid to {@link Version} that the snapshot was taken in
* @param indexMetaLookup map of {@link SnapshotId} to map of index id (as returned by {@link IndexId#getId}) that defines the index
* metadata generations for the snapshot
*/
private static void parseSnapshots(XContentParser parser, Map<String, SnapshotId> snapshots, Map<String, SnapshotState> snapshotStates,
Map<String, Version> snapshotVersions,
Map<SnapshotId, Map<String, String>> indexMetaLookup) throws IOException {
if (parser.nextToken() != XContentParser.Token.START_ARRAY) {
throw new ElasticsearchParseException("expected array for [" + SNAPSHOTS + "]");
}
while (parser.nextToken() != XContentParser.Token.END_ARRAY) {
String name = null;
String uuid = null;
SnapshotState state = null;
Map<String, String> metaGenerations = null;
Version version = null;
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
String currentFieldName = parser.currentName();
parser.nextToken();
switch (currentFieldName) {
case NAME:
name = parser.text();
break;
case UUID:
uuid = parser.text();
break;
case STATE:
state = SnapshotState.fromValue((byte) parser.intValue());
break;
case INDEX_METADATA_LOOKUP:
metaGenerations = parser.mapStrings();
break;
case VERSION:
version = Version.fromString(parser.text());
break;
}
}
final SnapshotId snapshotId = new SnapshotId(name, uuid);
if (state != null) {
snapshotStates.put(uuid, state);
}
if (version != null) {
snapshotVersions.put(uuid, version);
}
snapshots.put(uuid, snapshotId);
if (metaGenerations != null && metaGenerations.isEmpty() == false) {
indexMetaLookup.put(snapshotId, metaGenerations);
}
}
}
/**
* Parses information about all indices tracked in the repository and populates {@code indexSnapshots}, {@code indexLookup} and
* {@code shardGenerations}.
*
* @param parser x-content parser
* @param fixBrokenShardGens whether or not to fix broken shard generation (see {@link #snapshotsFromXContent} for details)
* @param snapshots map of snapshot uuid to {@link SnapshotId} that was populated by {@link #parseSnapshots}
* @param indexSnapshots map of {@link IndexId} to list of {@link SnapshotId} that contain the given index
* @param indexLookup map of index uuid (as returned by {@link IndexId#getId}) to {@link IndexId}
* @param shardGenerations shard generations builder that is populated index by this method
*/
private static void parseIndices(XContentParser parser, boolean fixBrokenShardGens, Map<String, SnapshotId> snapshots,
Map<IndexId, List<SnapshotId>> indexSnapshots, Map<String, IndexId> indexLookup,
ShardGenerations.Builder shardGenerations) throws IOException {
if (parser.nextToken() != XContentParser.Token.START_OBJECT) {
throw new ElasticsearchParseException("start object expected [indices]");
}
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
final String indexName = parser.currentName();
final List<SnapshotId> snapshotIds = new ArrayList<>();
final List<String> gens = new ArrayList<>();
IndexId indexId = null;
if (parser.nextToken() != XContentParser.Token.START_OBJECT) {
throw new ElasticsearchParseException("start object expected index[" + indexName + "]");
}
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
final String indexMetaFieldName = parser.currentName();
final XContentParser.Token currentToken = parser.nextToken();
switch (indexMetaFieldName) {
case INDEX_ID:
indexId = new IndexId(indexName, parser.text());
break;
case SNAPSHOTS:
if (currentToken != XContentParser.Token.START_ARRAY) {
throw new ElasticsearchParseException("start array expected [snapshots]");
}
XContentParser.Token currToken;
while ((currToken = parser.nextToken()) != XContentParser.Token.END_ARRAY) {
final String uuid;
// the old format pre 5.4.1 which contains the snapshot name and uuid
if (currToken == XContentParser.Token.START_OBJECT) {
uuid = parseLegacySnapshotUUID(parser);
} else {
// the new format post 5.4.1 that only contains the snapshot uuid,
// since we already have the name/uuid combo in the snapshots array
uuid = parser.text();
}
final SnapshotId snapshotId = snapshots.get(uuid);
if (snapshotId == null) {
// A snapshotted index references a snapshot which does not exist in
// the list of snapshots. This can happen when multiple clusters in
// different versions create or delete snapshot in the same repository.
throw new ElasticsearchParseException("Detected a corrupted repository, index " + indexId
+ " references an unknown snapshot uuid [" + uuid + "]");
}
snapshotIds.add(snapshotId);
}
break;
case SHARD_GENERATIONS:
XContentParserUtils.ensureExpectedToken(
XContentParser.Token.START_ARRAY, currentToken, parser::getTokenLocation);
while (parser.nextToken() != XContentParser.Token.END_ARRAY) {
gens.add(parser.textOrNull());
}
break;
}
}
assert indexId != null;
indexSnapshots.put(indexId, Collections.unmodifiableList(snapshotIds));
indexLookup.put(indexId.getId(), indexId);
for (int i = 0; i < gens.size(); i++) {
String parsedGen = gens.get(i);
if (fixBrokenShardGens) {
parsedGen = ShardGenerations.fixShardGeneration(parsedGen);
}
if (parsedGen != null) {
shardGenerations.put(indexId, i, parsedGen);
}
}
}
}
private static String parseLegacySnapshotUUID(XContentParser parser) throws IOException {
String uuid = null;
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
String currentFieldName = parser.currentName();
parser.nextToken();
if (UUID.equals(currentFieldName)) {
uuid = parser.text();
}
}
return uuid;
}
}