With parallel snapshots incoming (but also in isolation) it makes sense to clean up `SnapshotsInProgress` construction. We don't need to pre-compute the waiting shards for every entry. We rarely use this information (only on routing changes) and in the one spot we did we now simply spent the extra cycles for looping over all shards instead of just the waiting ones once per routing change tops instead of on every change to `SnapshotsInProgress` (moreover, we would burn the cycles for looping on all nodes even though only the current master cares about the information). In addition to that change I removed some dead code constructors and slighly optimized deserialization.
This commit is contained in:
parent
35ae3d19db
commit
16a47e0d08
|
@ -39,10 +39,8 @@ import org.elasticsearch.snapshots.Snapshot;
|
|||
import org.elasticsearch.snapshots.SnapshotsService;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -97,7 +95,6 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
|
|||
private final ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards;
|
||||
private final List<IndexId> indices;
|
||||
private final List<String> dataStreams;
|
||||
private final ImmutableOpenMap<String, List<ShardId>> waitingIndices;
|
||||
private final long startTime;
|
||||
private final long repositoryStateId;
|
||||
// see #useShardGenerations
|
||||
|
@ -116,14 +113,8 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
|
|||
this.indices = indices;
|
||||
this.dataStreams = dataStreams;
|
||||
this.startTime = startTime;
|
||||
if (shards == null) {
|
||||
this.shards = ImmutableOpenMap.of();
|
||||
this.waitingIndices = ImmutableOpenMap.of();
|
||||
} else {
|
||||
this.shards = shards;
|
||||
this.waitingIndices = findWaitingIndices(shards);
|
||||
assert assertShardsConsistent(state, indices, shards);
|
||||
}
|
||||
this.shards = shards;
|
||||
assert assertShardsConsistent(state, indices, shards);
|
||||
this.repositoryStateId = repositoryStateId;
|
||||
this.failure = failure;
|
||||
this.userMetadata = userMetadata;
|
||||
|
@ -198,10 +189,6 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
|
|||
return indices;
|
||||
}
|
||||
|
||||
public ImmutableOpenMap<String, List<ShardId>> waitingIndices() {
|
||||
return waitingIndices;
|
||||
}
|
||||
|
||||
public boolean includeGlobalState() {
|
||||
return includeGlobalState;
|
||||
}
|
||||
|
@ -320,23 +307,6 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
|
|||
public boolean isFragment() {
|
||||
return false;
|
||||
}
|
||||
|
||||
private ImmutableOpenMap<String, List<ShardId>> findWaitingIndices(ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards) {
|
||||
Map<String, List<ShardId>> waitingIndicesMap = new HashMap<>();
|
||||
for (ObjectObjectCursor<ShardId, ShardSnapshotStatus> entry : shards) {
|
||||
if (entry.value.state() == ShardState.WAITING) {
|
||||
waitingIndicesMap.computeIfAbsent(entry.key.getIndexName(), k -> new ArrayList<>()).add(entry.key);
|
||||
}
|
||||
}
|
||||
if (waitingIndicesMap.isEmpty()) {
|
||||
return ImmutableOpenMap.of();
|
||||
}
|
||||
ImmutableOpenMap.Builder<String, List<ShardId>> waitingIndicesBuilder = ImmutableOpenMap.builder();
|
||||
for (Map.Entry<String, List<ShardId>> entry : waitingIndicesMap.entrySet()) {
|
||||
waitingIndicesBuilder.put(entry.getKey(), Collections.unmodifiableList(entry.getValue()));
|
||||
}
|
||||
return waitingIndicesBuilder.build();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -531,14 +501,10 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
|
|||
boolean includeGlobalState = in.readBoolean();
|
||||
boolean partial = in.readBoolean();
|
||||
State state = State.fromValue(in.readByte());
|
||||
int indices = in.readVInt();
|
||||
List<IndexId> indexBuilder = new ArrayList<>();
|
||||
for (int j = 0; j < indices; j++) {
|
||||
indexBuilder.add(new IndexId(in.readString(), in.readString()));
|
||||
}
|
||||
List<IndexId> indexBuilder = in.readList(IndexId::new);
|
||||
long startTime = in.readLong();
|
||||
ImmutableOpenMap.Builder<ShardId, ShardSnapshotStatus> builder = ImmutableOpenMap.builder();
|
||||
int shards = in.readVInt();
|
||||
ImmutableOpenMap.Builder<ShardId, ShardSnapshotStatus> builder = ImmutableOpenMap.builder(shards);
|
||||
for (int j = 0; j < shards; j++) {
|
||||
ShardId shardId = new ShardId(in);
|
||||
if (in.getVersion().onOrAfter(Version.V_6_0_0_beta1)) {
|
||||
|
|
|
@ -961,18 +961,21 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
private static boolean waitingShardsStartedOrUnassigned(SnapshotsInProgress snapshotsInProgress, ClusterChangedEvent event) {
|
||||
for (SnapshotsInProgress.Entry entry : snapshotsInProgress.entries()) {
|
||||
if (entry.state() == State.STARTED) {
|
||||
for (ObjectCursor<String> index : entry.waitingIndices().keys()) {
|
||||
if (event.indexRoutingTableChanged(index.value)) {
|
||||
IndexRoutingTable indexShardRoutingTable = event.state().getRoutingTable().index(index.value);
|
||||
for (ObjectObjectCursor<ShardId, ShardSnapshotStatus> shardStatus : entry.shards()) {
|
||||
if (shardStatus.value.state() != ShardState.WAITING) {
|
||||
continue;
|
||||
}
|
||||
final ShardId shardId = shardStatus.key;
|
||||
if (event.indexRoutingTableChanged(shardId.getIndexName())) {
|
||||
IndexRoutingTable indexShardRoutingTable =
|
||||
event.state().getRoutingTable().index(shardId.getIndex());
|
||||
if (indexShardRoutingTable == null) {
|
||||
// index got removed concurrently and we have to fail WAITING state shards
|
||||
return true;
|
||||
}
|
||||
for (ShardId shardId : entry.waitingIndices().get(index.value)) {
|
||||
ShardRouting shardRouting = indexShardRoutingTable.shard(shardId.id()).primaryShard();
|
||||
if (shardRouting != null && (shardRouting.started() || shardRouting.unassigned())) {
|
||||
return true;
|
||||
}
|
||||
ShardRouting shardRouting = indexShardRoutingTable.shard(shardId.id()).primaryShard();
|
||||
if (shardRouting != null && (shardRouting.started() || shardRouting.unassigned())) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -29,6 +29,7 @@ import org.elasticsearch.cluster.metadata.IndexMetadata;
|
|||
import org.elasticsearch.cluster.metadata.Metadata;
|
||||
import org.elasticsearch.cluster.metadata.MetadataDeleteIndexService;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.collect.ImmutableOpenMap;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.io.stream.Writeable;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
@ -135,8 +136,8 @@ public class DeleteDataStreamRequestTests extends AbstractWireSerializingTestCas
|
|||
|
||||
private SnapshotsInProgress.Entry createEntry(String dataStreamName, String repo, boolean partial) {
|
||||
return new SnapshotsInProgress.Entry(new Snapshot(repo, new SnapshotId("", "")), false, partial,
|
||||
SnapshotsInProgress.State.STARTED, Collections.emptyList(), Collections.singletonList(dataStreamName), 0, 1, null, null, null
|
||||
, null);
|
||||
SnapshotsInProgress.State.STARTED, Collections.emptyList(), Collections.singletonList(dataStreamName), 0, 1,
|
||||
ImmutableOpenMap.of(), null, null, null);
|
||||
}
|
||||
|
||||
public void testDeleteNonexistentDataStream() {
|
||||
|
|
|
@ -1,82 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.cluster;
|
||||
|
||||
import org.elasticsearch.cluster.SnapshotsInProgress.Entry;
|
||||
import org.elasticsearch.cluster.SnapshotsInProgress.ShardSnapshotStatus;
|
||||
import org.elasticsearch.cluster.SnapshotsInProgress.ShardState;
|
||||
import org.elasticsearch.cluster.SnapshotsInProgress.State;
|
||||
import org.elasticsearch.common.collect.ImmutableOpenMap;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.repositories.IndexId;
|
||||
import org.elasticsearch.snapshots.Snapshot;
|
||||
import org.elasticsearch.snapshots.SnapshotId;
|
||||
import org.elasticsearch.snapshots.SnapshotInfoTests;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.test.VersionUtils;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* Unit tests for the {@link SnapshotsInProgress} class and its inner classes.
|
||||
*/
|
||||
public class SnapshotsInProgressTests extends ESTestCase {
|
||||
|
||||
/**
|
||||
* Makes sure that the indices being waited on before snapshotting commences
|
||||
* are populated with all shards in the relocating or initializing state.
|
||||
*/
|
||||
public void testWaitingIndices() {
|
||||
final Snapshot snapshot = new Snapshot("repo", new SnapshotId("snap", randomAlphaOfLength(5)));
|
||||
final String idx1Name = "idx1";
|
||||
final String idx2Name = "idx2";
|
||||
final String idx3Name = "idx3";
|
||||
final String idx1UUID = randomAlphaOfLength(5);
|
||||
final String idx2UUID = randomAlphaOfLength(5);
|
||||
final String idx3UUID = randomAlphaOfLength(5);
|
||||
final List<IndexId> indices = Arrays.asList(new IndexId(idx1Name, randomAlphaOfLength(5)),
|
||||
new IndexId(idx2Name, randomAlphaOfLength(5)), new IndexId(idx3Name, randomAlphaOfLength(5)));
|
||||
ImmutableOpenMap.Builder<ShardId, ShardSnapshotStatus> shards = ImmutableOpenMap.builder();
|
||||
|
||||
// test more than one waiting shard in an index
|
||||
shards.put(new ShardId(idx1Name, idx1UUID, 0), new ShardSnapshotStatus(randomAlphaOfLength(2), ShardState.WAITING, "1"));
|
||||
shards.put(new ShardId(idx1Name, idx1UUID, 1), new ShardSnapshotStatus(randomAlphaOfLength(2), ShardState.WAITING, "1"));
|
||||
shards.put(new ShardId(idx1Name, idx1UUID, 2), new ShardSnapshotStatus(randomAlphaOfLength(2), randomNonWaitingState(), "", "1"));
|
||||
// test exactly one waiting shard in an index
|
||||
shards.put(new ShardId(idx2Name, idx2UUID, 0), new ShardSnapshotStatus(randomAlphaOfLength(2), ShardState.WAITING, "1"));
|
||||
shards.put(new ShardId(idx2Name, idx2UUID, 1), new ShardSnapshotStatus(randomAlphaOfLength(2), randomNonWaitingState(), "", "1"));
|
||||
// test no waiting shards in an index
|
||||
shards.put(new ShardId(idx3Name, idx3UUID, 0), new ShardSnapshotStatus(randomAlphaOfLength(2), randomNonWaitingState(), "", "1"));
|
||||
Entry entry = new Entry(snapshot, randomBoolean(), randomBoolean(), State.INIT,
|
||||
indices, System.currentTimeMillis(), randomLong(), shards.build(), SnapshotInfoTests.randomUserMetadata(),
|
||||
VersionUtils.randomVersion(random()));
|
||||
|
||||
ImmutableOpenMap<String, List<ShardId>> waitingIndices = entry.waitingIndices();
|
||||
assertEquals(2, waitingIndices.get(idx1Name).size());
|
||||
assertEquals(1, waitingIndices.get(idx2Name).size());
|
||||
assertFalse(waitingIndices.containsKey(idx3Name));
|
||||
}
|
||||
|
||||
private ShardState randomNonWaitingState() {
|
||||
return randomFrom(Arrays.stream(ShardState.values()).filter(s -> s != ShardState.WAITING).collect(Collectors.toSet()));
|
||||
}
|
||||
}
|
|
@ -80,7 +80,7 @@ public class SnapshotsInProgressSerializationTests extends AbstractDiffableWireS
|
|||
}
|
||||
ImmutableOpenMap<ShardId, SnapshotsInProgress.ShardSnapshotStatus> shards = builder.build();
|
||||
return new Entry(snapshot, includeGlobalState, partial, state, indices, dataStreams, startTime, repositoryStateId, shards,
|
||||
SnapshotInfoTests.randomUserMetadata(), VersionUtils.randomVersion(random()));
|
||||
null, SnapshotInfoTests.randomUserMetadata(), VersionUtils.randomVersion(random()));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
Loading…
Reference in New Issue