Just a few random things to optimize motivated by somewhat sub-standard performance for large snapshot cluster states with many concurrent snapshots observed in production.
This commit is contained in:
parent
fa8e76abb1
commit
395538f508
|
@ -205,8 +205,8 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements
|
|||
// Add all new shards to start processing on
|
||||
final ShardId shardId = shard.key;
|
||||
final ShardSnapshotStatus shardSnapshotStatus = shard.value;
|
||||
if (localNodeId.equals(shardSnapshotStatus.nodeId())
|
||||
&& shardSnapshotStatus.state() == ShardState.INIT
|
||||
if (shardSnapshotStatus.state() == ShardState.INIT
|
||||
&& localNodeId.equals(shardSnapshotStatus.nodeId())
|
||||
&& snapshotShards.containsKey(shardId) == false) {
|
||||
logger.trace("[{}] - Adding shard to the queue", shardId);
|
||||
if (startedShards == null) {
|
||||
|
|
|
@ -2171,6 +2171,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
boolean changed = false;
|
||||
|
||||
final String repoName = deleteEntry.repository();
|
||||
// Computing the new assignments can be quite costly, only do it once below if actually needed
|
||||
ImmutableOpenMap<ShardId, ShardSnapshotStatus> shardAssignments = null;
|
||||
for (SnapshotsInProgress.Entry entry : snapshotsInProgress.entries()) {
|
||||
if (entry.repository().equals(repoName)) {
|
||||
if (entry.state().completed() == false) {
|
||||
|
@ -2186,9 +2188,11 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
// No shards can be updated in this snapshot so we just add it as is again
|
||||
snapshotEntries.add(entry);
|
||||
} else {
|
||||
final ImmutableOpenMap<ShardId, ShardSnapshotStatus> shardAssignments = shards(snapshotsInProgress,
|
||||
updatedDeletions, currentState.metadata(), currentState.routingTable(), entry.indices(),
|
||||
entry.version().onOrAfter(SHARD_GEN_IN_REPO_DATA_VERSION), repositoryData, repoName);
|
||||
if (shardAssignments == null) {
|
||||
shardAssignments = shards(snapshotsInProgress,
|
||||
updatedDeletions, currentState.metadata(), currentState.routingTable(), entry.indices(),
|
||||
entry.version().onOrAfter(SHARD_GEN_IN_REPO_DATA_VERSION), repositoryData, repoName);
|
||||
}
|
||||
final ImmutableOpenMap.Builder<ShardId, ShardSnapshotStatus> updatedAssignmentsBuilder =
|
||||
ImmutableOpenMap.builder(entry.shards());
|
||||
for (ShardId shardId : canBeUpdated) {
|
||||
|
@ -2296,7 +2300,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
} else {
|
||||
final IndexRoutingTable indexRoutingTable = routingTable.index(indexName);
|
||||
for (int i = 0; i < indexMetadata.getNumberOfShards(); i++) {
|
||||
ShardId shardId = new ShardId(indexMetadata.getIndex(), i);
|
||||
final ShardId shardId = indexRoutingTable.shard(i).shardId();
|
||||
final String shardRepoGeneration;
|
||||
if (useShardGenerations) {
|
||||
if (isNewIndex) {
|
||||
|
@ -2474,11 +2478,10 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
} else {
|
||||
final String updatedRepository = updateSnapshotState.snapshot().getRepository();
|
||||
final Set<ShardId> reusedShardIds = reusedShardIdsByRepo.computeIfAbsent(updatedRepository, k -> new HashSet<>());
|
||||
if (entry.repository().equals(updatedRepository) &&
|
||||
entry.state().completed() == false && reusedShardIds.contains(finishedShardId) == false
|
||||
&& entry.shards().keys().contains(finishedShardId)) {
|
||||
if (entry.state().completed() == false && entry.repository().equals(updatedRepository)
|
||||
&& reusedShardIds.contains(finishedShardId) == false) {
|
||||
final ShardSnapshotStatus existingStatus = entry.shards().get(finishedShardId);
|
||||
if (existingStatus.state() != ShardState.QUEUED) {
|
||||
if (existingStatus == null || existingStatus.state() != ShardState.QUEUED) {
|
||||
continue;
|
||||
}
|
||||
if (updated == false) {
|
||||
|
|
|
@ -74,8 +74,10 @@ public class SnapshotsInProgressSerializationTests extends AbstractDiffableWireS
|
|||
ShardId shardId = new ShardId(idx, j);
|
||||
String nodeId = randomAlphaOfLength(10);
|
||||
ShardState shardState = randomFrom(ShardState.values());
|
||||
builder.put(shardId, new SnapshotsInProgress.ShardSnapshotStatus(nodeId, shardState,
|
||||
shardState.failed() ? randomAlphaOfLength(10) : null, "1"));
|
||||
builder.put(shardId,
|
||||
shardState == ShardState.QUEUED ? SnapshotsInProgress.ShardSnapshotStatus.UNASSIGNED_QUEUED :
|
||||
new SnapshotsInProgress.ShardSnapshotStatus(nodeId, shardState,
|
||||
shardState.failed() ? randomAlphaOfLength(10) : null, "1"));
|
||||
}
|
||||
}
|
||||
ImmutableOpenMap<ShardId, SnapshotsInProgress.ShardSnapshotStatus> shards = builder.build();
|
||||
|
|
Loading…
Reference in New Issue