Simplify searchable snapshot shard allocation (#61911)

Simplifies allocation for snapshot-backed shards by always making the recovery source "from snapshot" for those
snapshot-backed shards (instead of "recover from local or from empty store"). Also let's the balancer pick a node which
to allocate the snapshot-backed shard to (which takes number of shards on each node into account unlike the current
implementation which just picks whatever node we are allowed to allocate to, with no notion of "balancing" at all).
This commit is contained in:
Yannick Welsch 2020-09-04 15:43:55 +02:00
parent 7863df88e3
commit 6d08b55d4e
9 changed files with 105 additions and 55 deletions

View File

@ -214,6 +214,9 @@ public abstract class RecoverySource implements Writeable, ToXContentObject {
* recovery from a snapshot
*/
public static class SnapshotRecoverySource extends RecoverySource {
public static final String NO_API_RESTORE_UUID = "_no_api_";
private final String restoreUUID;
private final Snapshot snapshot;
private final IndexId index;

View File

@ -45,7 +45,11 @@ public class RestoreInProgressAllocationDecider extends AllocationDecider {
return allocation.decision(Decision.YES, NAME, "ignored as shard is not being recovered from a snapshot");
}
RecoverySource.SnapshotRecoverySource source = (RecoverySource.SnapshotRecoverySource) recoverySource;
final RecoverySource.SnapshotRecoverySource source = (RecoverySource.SnapshotRecoverySource) recoverySource;
if (source.restoreUUID().equals(RecoverySource.SnapshotRecoverySource.NO_API_RESTORE_UUID)) {
return allocation.decision(Decision.YES, NAME, "not an API-level restore");
}
final RestoreInProgress restoresInProgress = allocation.custom(RestoreInProgress.TYPE);
if (restoresInProgress != null) {

View File

@ -24,6 +24,7 @@ import org.elasticsearch.action.support.GroupedActionListener;
import org.elasticsearch.blobstore.cache.BlobStoreCacheService;
import org.elasticsearch.blobstore.cache.CachedBlob;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.common.CheckedRunnable;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.blobstore.BlobContainer;
@ -84,6 +85,7 @@ import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SN
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_CACHE_EXCLUDED_FILE_TYPES_SETTING;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_CACHE_PREWARM_ENABLED_SETTING;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_INDEX_ID_SETTING;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_INDEX_NAME_SETTING;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_REPOSITORY_SETTING;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_SNAPSHOT_ID_SETTING;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_SNAPSHOT_NAME_SETTING;
@ -195,6 +197,8 @@ public class SearchableSnapshotDirectory extends BaseDirectory {
public boolean loadSnapshot(RecoveryState recoveryState) {
assert recoveryState != null;
assert recoveryState instanceof SearchableSnapshotRecoveryState;
assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.SNAPSHOT
|| recoveryState.getRecoverySource().getType() == RecoverySource.Type.PEER : recoveryState.getRecoverySource().getType();
assert assertCurrentThreadMayLoadSnapshot();
// noinspection ConstantConditions in case assertions are disabled
if (recoveryState instanceof SearchableSnapshotRecoveryState == false) {
@ -506,6 +510,7 @@ public class SearchableSnapshotDirectory extends BaseDirectory {
) throws IOException {
if (SNAPSHOT_REPOSITORY_SETTING.exists(indexSettings.getSettings()) == false
|| SNAPSHOT_INDEX_NAME_SETTING.exists(indexSettings.getSettings()) == false
|| SNAPSHOT_INDEX_ID_SETTING.exists(indexSettings.getSettings()) == false
|| SNAPSHOT_SNAPSHOT_NAME_SETTING.exists(indexSettings.getSettings()) == false
|| SNAPSHOT_SNAPSHOT_ID_SETTING.exists(indexSettings.getSettings()) == false) {
@ -537,7 +542,10 @@ public class SearchableSnapshotDirectory extends BaseDirectory {
}
final BlobStoreRepository blobStoreRepository = (BlobStoreRepository) repository;
final IndexId indexId = new IndexId(indexSettings.getIndex().getName(), SNAPSHOT_INDEX_ID_SETTING.get(indexSettings.getSettings()));
final IndexId indexId = new IndexId(
SNAPSHOT_INDEX_NAME_SETTING.get(indexSettings.getSettings()),
SNAPSHOT_INDEX_ID_SETTING.get(indexSettings.getSettings())
);
final SnapshotId snapshotId = new SnapshotId(
SNAPSHOT_SNAPSHOT_NAME_SETTING.get(indexSettings.getSettings()),
SNAPSHOT_SNAPSHOT_ID_SETTING.get(indexSettings.getSettings())

View File

@ -5,21 +5,27 @@
*/
package org.elasticsearch.xpack.searchablesnapshots;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.routing.allocation.AllocateUnassignedDecision;
import org.elasticsearch.cluster.routing.allocation.AllocationDecision;
import org.elasticsearch.cluster.routing.allocation.ExistingShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.FailedShard;
import org.elasticsearch.cluster.routing.allocation.NodeAllocationResult;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.snapshots.Snapshot;
import org.elasticsearch.snapshots.SnapshotId;
import java.util.ArrayList;
import java.util.List;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_INDEX_ID_SETTING;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_INDEX_NAME_SETTING;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_REPOSITORY_SETTING;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_SNAPSHOT_ID_SETTING;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_SNAPSHOT_NAME_SETTING;
public class SearchableSnapshotAllocator implements ExistingShardsAllocator {
public static final String ALLOCATOR_NAME = "searchable_snapshot_allocator";
@ -36,55 +42,51 @@ public class SearchableSnapshotAllocator implements ExistingShardsAllocator {
RoutingAllocation allocation,
UnassignedAllocationHandler unassignedAllocationHandler
) {
final AllocateUnassignedDecision allocateUnassignedDecision = decideAllocation(allocation, shardRouting);
assert allocateUnassignedDecision.isDecisionTaken();
if (shardRouting.primary()
&& (shardRouting.recoverySource().getType() == RecoverySource.Type.EXISTING_STORE
|| shardRouting.recoverySource().getType() == RecoverySource.Type.EMPTY_STORE)) {
// we always force snapshot recovery source to use the snapshot-based recovery process on the node
if (allocateUnassignedDecision.getAllocationDecision() == AllocationDecision.YES) {
if (shardRouting.primary() && shardRouting.recoverySource().getType() == RecoverySource.Type.EXISTING_STORE) {
// we don't care what the allocation ID is since we know that these shards cannot really be stale, so we can
// safely ignore the allocation ID with a forced-stale allocation and allow this shard to fall through to the balanced
// shards allocator
unassignedAllocationHandler.updateUnassigned(
final Settings indexSettings = allocation.metadata().index(shardRouting.index()).getSettings();
final IndexId indexId = new IndexId(
SNAPSHOT_INDEX_NAME_SETTING.get(indexSettings),
SNAPSHOT_INDEX_ID_SETTING.get(indexSettings)
);
final SnapshotId snapshotId = new SnapshotId(
SNAPSHOT_SNAPSHOT_NAME_SETTING.get(indexSettings),
SNAPSHOT_SNAPSHOT_ID_SETTING.get(indexSettings)
);
final String repository = SNAPSHOT_REPOSITORY_SETTING.get(indexSettings);
final Snapshot snapshot = new Snapshot(repository, snapshotId);
shardRouting = unassignedAllocationHandler.updateUnassigned(
shardRouting.unassignedInfo(),
RecoverySource.ExistingStoreRecoverySource.FORCE_STALE_PRIMARY_INSTANCE,
new RecoverySource.SnapshotRecoverySource(
RecoverySource.SnapshotRecoverySource.NO_API_RESTORE_UUID,
snapshot,
Version.CURRENT,
indexId
),
allocation.changes()
);
}
} else {
final AllocateUnassignedDecision allocateUnassignedDecision = decideAllocation(allocation, shardRouting);
if (allocateUnassignedDecision.isDecisionTaken() && allocateUnassignedDecision.getAllocationDecision() != AllocationDecision.YES) {
unassignedAllocationHandler.removeAndIgnore(allocateUnassignedDecision.getAllocationStatus(), allocation.changes());
}
}
private static AllocateUnassignedDecision decideAllocation(RoutingAllocation allocation, ShardRouting shardRouting) {
private AllocateUnassignedDecision decideAllocation(RoutingAllocation allocation, ShardRouting shardRouting) {
assert shardRouting.unassigned();
assert ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_SETTING.get(
allocation.metadata().getIndexSafe(shardRouting.index()).getSettings()
).equals(ALLOCATOR_NAME);
Decision.Type bestDecision = Decision.Type.NO;
RoutingNode bestNode = null;
final List<NodeAllocationResult> nodeAllocationResults = allocation.debugDecision()
? new ArrayList<>(allocation.routingNodes().size())
: null;
for (final RoutingNode routingNode : allocation.routingNodes()) {
final Decision decision = allocation.deciders().canAllocate(shardRouting, routingNode, allocation);
if (decision.type() == Decision.Type.YES || (decision.type() == Decision.Type.THROTTLE && bestDecision != Decision.Type.YES)) {
bestDecision = decision.type();
bestNode = routingNode;
}
if (nodeAllocationResults != null) {
nodeAllocationResults.add(new NodeAllocationResult(routingNode.node(), null, decision));
}
}
if (bestDecision == Decision.Type.YES) {
return AllocateUnassignedDecision.yes(bestNode.node(), null, nodeAllocationResults, false);
} else if (bestDecision == Decision.Type.THROTTLE) {
return AllocateUnassignedDecision.throttle(nodeAllocationResults);
} else {
return AllocateUnassignedDecision.no(UnassignedInfo.AllocationStatus.DECIDERS_NO, nodeAllocationResults);
}
// let BalancedShardsAllocator take care of allocating this shard
// TODO: once we have persistent cache, choose a node that has existing data
return AllocateUnassignedDecision.NOT_TAKEN;
}
@Override

View File

@ -110,6 +110,12 @@ public class SearchableSnapshots extends Plugin implements IndexStorePlugin, Eng
Setting.Property.PrivateIndex,
Setting.Property.NotCopyableOnResize
);
public static final Setting<String> SNAPSHOT_INDEX_NAME_SETTING = Setting.simpleString(
"index.store.snapshot.index_name",
Setting.Property.IndexScope,
Setting.Property.PrivateIndex,
Setting.Property.NotCopyableOnResize
);
public static final Setting<String> SNAPSHOT_INDEX_ID_SETTING = Setting.simpleString(
"index.store.snapshot.index_uuid",
Setting.Property.IndexScope,
@ -172,6 +178,7 @@ public class SearchableSnapshots extends Plugin implements IndexStorePlugin, Eng
SNAPSHOT_REPOSITORY_SETTING,
SNAPSHOT_SNAPSHOT_NAME_SETTING,
SNAPSHOT_SNAPSHOT_ID_SETTING,
SNAPSHOT_INDEX_NAME_SETTING,
SNAPSHOT_INDEX_ID_SETTING,
SNAPSHOT_CACHE_ENABLED_SETTING,
SNAPSHOT_CACHE_PREWARM_ENABLED_SETTING,

View File

@ -118,6 +118,7 @@ public class TransportMountSearchableSnapshotAction extends TransportMasterNodeA
.put(SearchableSnapshots.SNAPSHOT_REPOSITORY_SETTING.getKey(), repoName)
.put(SearchableSnapshots.SNAPSHOT_SNAPSHOT_NAME_SETTING.getKey(), snapshotId.getName())
.put(SearchableSnapshots.SNAPSHOT_SNAPSHOT_ID_SETTING.getKey(), snapshotId.getUUID())
.put(SearchableSnapshots.SNAPSHOT_INDEX_NAME_SETTING.getKey(), indexId.getName())
.put(SearchableSnapshots.SNAPSHOT_INDEX_ID_SETTING.getKey(), indexId.getId())
.put(INDEX_STORE_TYPE_SETTING.getKey(), SearchableSnapshotsConstants.SNAPSHOT_DIRECTORY_FACTORY_KEY)
.put(IndexMetadata.SETTING_BLOCKS_WRITE, true)

View File

@ -10,10 +10,12 @@ import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.common.TriConsumer;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.lucene.store.ESIndexInputTestCase;
import org.elasticsearch.common.settings.Settings;
@ -30,6 +32,7 @@ import org.elasticsearch.index.store.cache.TestUtils.NoopBlobStoreCacheService;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.indices.recovery.SearchableSnapshotRecoveryState;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.snapshots.Snapshot;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
@ -645,11 +648,16 @@ public class SearchableSnapshotDirectoryStatsTests extends ESIndexInputTestCase
assertThat(directory.getStats(fileName), nullValue());
ShardRouting shardRouting = TestShardRouting.newShardRouting(
randomAlphaOfLength(10),
0,
new ShardId(randomAlphaOfLength(10), randomAlphaOfLength(10), 0),
randomAlphaOfLength(10),
true,
ShardRoutingState.INITIALIZING
ShardRoutingState.INITIALIZING,
new RecoverySource.SnapshotRecoverySource(
UUIDs.randomBase64UUID(),
new Snapshot("repo", new SnapshotId(randomAlphaOfLength(8), UUIDs.randomBase64UUID())),
Version.CURRENT,
new IndexId("some_index", UUIDs.randomBase64UUID(random()))
)
);
DiscoveryNode targetNode = new DiscoveryNode("local", buildNewFakeTransportAddress(), Version.CURRENT);
RecoveryState recoveryState = new SearchableSnapshotRecoveryState(shardRouting, targetNode, null);

View File

@ -41,6 +41,7 @@ import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.TestShardRouting;
@ -84,6 +85,7 @@ import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil;
import org.elasticsearch.repositories.fs.FsRepository;
import org.elasticsearch.snapshots.Snapshot;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.test.DummyShardLock;
import org.elasticsearch.test.ESTestCase;
@ -125,6 +127,7 @@ import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SN
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_CACHE_EXCLUDED_FILE_TYPES_SETTING;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_CACHE_PREWARM_ENABLED_SETTING;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_INDEX_ID_SETTING;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_INDEX_NAME_SETTING;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_REPOSITORY_SETTING;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_SNAPSHOT_ID_SETTING;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_SNAPSHOT_NAME_SETTING;
@ -749,6 +752,7 @@ public class SearchableSnapshotDirectoryTests extends ESTestCase {
public void testRequiresAdditionalSettings() {
final List<Setting<String>> requiredSettings = org.elasticsearch.common.collect.List.of(
SNAPSHOT_REPOSITORY_SETTING,
SNAPSHOT_INDEX_NAME_SETTING,
SNAPSHOT_INDEX_ID_SETTING,
SNAPSHOT_SNAPSHOT_NAME_SETTING,
SNAPSHOT_SNAPSHOT_ID_SETTING
@ -912,11 +916,16 @@ public class SearchableSnapshotDirectoryTests extends ESTestCase {
private SearchableSnapshotRecoveryState createRecoveryState() {
ShardRouting shardRouting = TestShardRouting.newShardRouting(
randomAlphaOfLength(10),
0,
new ShardId(randomAlphaOfLength(10), randomAlphaOfLength(10), 0),
randomAlphaOfLength(10),
true,
ShardRoutingState.INITIALIZING
ShardRoutingState.INITIALIZING,
new RecoverySource.SnapshotRecoverySource(
UUIDs.randomBase64UUID(),
new Snapshot("repo", new SnapshotId(randomAlphaOfLength(8), UUIDs.randomBase64UUID())),
Version.CURRENT,
new IndexId("some_index", UUIDs.randomBase64UUID(random()))
)
);
DiscoveryNode targetNode = new DiscoveryNode("local", buildNewFakeTransportAddress(), Version.CURRENT);
SearchableSnapshotRecoveryState recoveryState = new SearchableSnapshotRecoveryState(shardRouting, targetNode, null);

View File

@ -8,9 +8,11 @@ package org.elasticsearch.index.store.cache;
import org.apache.lucene.store.IndexInput;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.support.FilterBlobContainer;
import org.elasticsearch.common.lucene.store.ESIndexInputTestCase;
@ -26,6 +28,7 @@ import org.elasticsearch.index.store.cache.TestUtils.NoopBlobStoreCacheService;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.indices.recovery.SearchableSnapshotRecoveryState;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.snapshots.Snapshot;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
@ -242,11 +245,16 @@ public class CachedBlobContainerIndexInputTests extends ESIndexInputTestCase {
private SearchableSnapshotRecoveryState createRecoveryState() {
ShardRouting shardRouting = TestShardRouting.newShardRouting(
randomAlphaOfLength(10),
0,
new ShardId(randomAlphaOfLength(10), randomAlphaOfLength(10), 0),
randomAlphaOfLength(10),
true,
ShardRoutingState.INITIALIZING
ShardRoutingState.INITIALIZING,
new RecoverySource.SnapshotRecoverySource(
UUIDs.randomBase64UUID(),
new Snapshot("repo", new SnapshotId(randomAlphaOfLength(8), UUIDs.randomBase64UUID())),
Version.CURRENT,
new IndexId("some_index", UUIDs.randomBase64UUID(random()))
)
);
DiscoveryNode targetNode = new DiscoveryNode("local", buildNewFakeTransportAddress(), Version.CURRENT);
return new SearchableSnapshotRecoveryState(shardRouting, targetNode, null);