From 395538f5083d2cb39e5ed94acc899391dbfe0071 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Mon, 7 Sep 2020 13:25:40 +0200 Subject: [PATCH] Improve Snapshot State Machine Performance (#62000) (#62049) Just a few random things to optimize motivated by somewhat sub-standard performance for large snapshot cluster states with many concurrent snapshots observed in production. --- .../snapshots/SnapshotShardsService.java | 4 ++-- .../snapshots/SnapshotsService.java | 19 +++++++++++-------- ...SnapshotsInProgressSerializationTests.java | 6 ++++-- 3 files changed, 17 insertions(+), 12 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java index a2c99f5f576..f0ffedf5141 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java @@ -205,8 +205,8 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements // Add all new shards to start processing on final ShardId shardId = shard.key; final ShardSnapshotStatus shardSnapshotStatus = shard.value; - if (localNodeId.equals(shardSnapshotStatus.nodeId()) - && shardSnapshotStatus.state() == ShardState.INIT + if (shardSnapshotStatus.state() == ShardState.INIT + && localNodeId.equals(shardSnapshotStatus.nodeId()) && snapshotShards.containsKey(shardId) == false) { logger.trace("[{}] - Adding shard to the queue", shardId); if (startedShards == null) { diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index 9c482021a39..812bbafce1e 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -2171,6 +2171,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus boolean changed = false; final String repoName = deleteEntry.repository(); + // Computing the new assignments can be quite costly, only do it once below if actually needed + ImmutableOpenMap shardAssignments = null; for (SnapshotsInProgress.Entry entry : snapshotsInProgress.entries()) { if (entry.repository().equals(repoName)) { if (entry.state().completed() == false) { @@ -2186,9 +2188,11 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus // No shards can be updated in this snapshot so we just add it as is again snapshotEntries.add(entry); } else { - final ImmutableOpenMap shardAssignments = shards(snapshotsInProgress, - updatedDeletions, currentState.metadata(), currentState.routingTable(), entry.indices(), - entry.version().onOrAfter(SHARD_GEN_IN_REPO_DATA_VERSION), repositoryData, repoName); + if (shardAssignments == null) { + shardAssignments = shards(snapshotsInProgress, + updatedDeletions, currentState.metadata(), currentState.routingTable(), entry.indices(), + entry.version().onOrAfter(SHARD_GEN_IN_REPO_DATA_VERSION), repositoryData, repoName); + } final ImmutableOpenMap.Builder updatedAssignmentsBuilder = ImmutableOpenMap.builder(entry.shards()); for (ShardId shardId : canBeUpdated) { @@ -2296,7 +2300,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus } else { final IndexRoutingTable indexRoutingTable = routingTable.index(indexName); for (int i = 0; i < indexMetadata.getNumberOfShards(); i++) { - ShardId shardId = new ShardId(indexMetadata.getIndex(), i); + final ShardId shardId = indexRoutingTable.shard(i).shardId(); final String shardRepoGeneration; if (useShardGenerations) { if (isNewIndex) { @@ -2474,11 +2478,10 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus } else { final String updatedRepository = updateSnapshotState.snapshot().getRepository(); final Set reusedShardIds = reusedShardIdsByRepo.computeIfAbsent(updatedRepository, k -> new HashSet<>()); - if (entry.repository().equals(updatedRepository) && - entry.state().completed() == false && reusedShardIds.contains(finishedShardId) == false - && entry.shards().keys().contains(finishedShardId)) { + if (entry.state().completed() == false && entry.repository().equals(updatedRepository) + && reusedShardIds.contains(finishedShardId) == false) { final ShardSnapshotStatus existingStatus = entry.shards().get(finishedShardId); - if (existingStatus.state() != ShardState.QUEUED) { + if (existingStatus == null || existingStatus.state() != ShardState.QUEUED) { continue; } if (updated == false) { diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotsInProgressSerializationTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotsInProgressSerializationTests.java index 9de96b72407..9af30436f2e 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotsInProgressSerializationTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotsInProgressSerializationTests.java @@ -74,8 +74,10 @@ public class SnapshotsInProgressSerializationTests extends AbstractDiffableWireS ShardId shardId = new ShardId(idx, j); String nodeId = randomAlphaOfLength(10); ShardState shardState = randomFrom(ShardState.values()); - builder.put(shardId, new SnapshotsInProgress.ShardSnapshotStatus(nodeId, shardState, - shardState.failed() ? randomAlphaOfLength(10) : null, "1")); + builder.put(shardId, + shardState == ShardState.QUEUED ? SnapshotsInProgress.ShardSnapshotStatus.UNASSIGNED_QUEUED : + new SnapshotsInProgress.ShardSnapshotStatus(nodeId, shardState, + shardState.failed() ? randomAlphaOfLength(10) : null, "1")); } } ImmutableOpenMap shards = builder.build();