Fail searchable snapshot shards on invalid license (#60722)

Implements license degradation behavior for searchable snapshots. Snapshot-backed shards are failed when the license becomes invalid, and shards won't be reallocated. After valid license is put in place again, shards are allocated again.
This commit is contained in:
Yannick Welsch 2020-08-05 13:06:21 +02:00
parent 67f6f34c23
commit 9f6f66f156
14 changed files with 302 additions and 17 deletions

View File

@ -435,6 +435,7 @@ public class ShardStateAction {
final String allocationId;
final long primaryTerm;
final String message;
@Nullable
final Exception failure;
final boolean markAsStale;
@ -453,7 +454,7 @@ public class ShardStateAction {
}
public FailedShardEntry(ShardId shardId, String allocationId, long primaryTerm,
String message, Exception failure, boolean markAsStale) {
String message, @Nullable Exception failure, boolean markAsStale) {
this.shardId = shardId;
this.allocationId = allocationId;
this.primaryTerm = primaryTerm;

View File

@ -32,7 +32,7 @@ public class FailedShard {
private final Exception failure;
private final boolean markAsStale;
public FailedShard(ShardRouting routingEntry, String message, Exception failure, boolean markAsStale) {
public FailedShard(ShardRouting routingEntry, String message, @Nullable Exception failure, boolean markAsStale) {
assert routingEntry.assignedToNode() : "only assigned shards can be failed " + routingEntry;
this.routingEntry = routingEntry;
this.message = message;
@ -43,7 +43,7 @@ public class FailedShard {
@Override
public String toString() {
return "failed shard, shard " + routingEntry + ", message [" + message + "], failure [" +
ExceptionsHelper.detailedMessage(failure) + "], markAsStale [" + markAsStale + "]";
failure == null ? "null" : ExceptionsHelper.detailedMessage(failure) + "], markAsStale [" + markAsStale + "]";
}
/**

View File

@ -30,6 +30,7 @@ import org.elasticsearch.cluster.LocalNodeMasterListener;
import org.elasticsearch.cluster.NodeConnectionsService;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.OperationRouting;
import org.elasticsearch.cluster.routing.RerouteService;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
@ -62,6 +63,8 @@ public class ClusterService extends AbstractLifecycleComponent {
private final String nodeName;
private RerouteService rerouteService;
public ClusterService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) {
this(settings, clusterSettings, new MasterService(settings, clusterSettings, threadPool),
new ClusterApplierService(Node.NODE_NAME_SETTING.get(settings), settings, clusterSettings, threadPool));
@ -84,6 +87,15 @@ public class ClusterService extends AbstractLifecycleComponent {
clusterApplierService.setNodeConnectionsService(nodeConnectionsService);
}
public void setRerouteService(RerouteService rerouteService) {
assert this.rerouteService == null : "RerouteService is already set";
this.rerouteService = rerouteService;
}
public RerouteService getRerouteService() {
assert this.rerouteService != null : "RerouteService not set";
return rerouteService;
}
@Override
protected synchronized void doStart() {
clusterApplierService.start();

View File

@ -1022,6 +1022,8 @@ public abstract class StreamInput extends InputStream {
}
}
@Nullable
@SuppressWarnings("unchecked")
public <T extends Exception> T readException() throws IOException {
if (readBoolean()) {
int key = readVInt();

View File

@ -489,6 +489,10 @@ public class Node implements Closeable {
.flatMap(Collection::stream)
.collect(Collectors.toList());
final RerouteService rerouteService
= new BatchedRerouteService(clusterService, clusterModule.getAllocationService()::reroute);
clusterService.setRerouteService(rerouteService);
final IndicesService indicesService =
new IndicesService(settings, pluginsService, nodeEnvironment, xContentRegistry, analysisModule.getAnalysisRegistry(),
clusterModule.getIndexNameExpressionResolver(), indicesModule.getMapperRegistry(), namedWriteableRegistry,
@ -571,8 +575,6 @@ public class Node implements Closeable {
RestoreService restoreService = new RestoreService(clusterService, repositoryService, clusterModule.getAllocationService(),
metadataCreateIndexService, metadataIndexUpgradeService, clusterService.getClusterSettings(), shardLimitValidator);
final RerouteService rerouteService
= new BatchedRerouteService(clusterService, clusterModule.getAllocationService()::reroute);
final DiskThresholdMonitor diskThresholdMonitor = new DiskThresholdMonitor(settings, clusterService::state,
clusterService.getClusterSettings(), client, threadPool::relativeTimeInMillis, rerouteService);
clusterInfoService.addListener(diskThresholdMonitor::onNewInfo);

View File

@ -16,14 +16,14 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
class PostStartTrialResponse extends ActionResponse {
public class PostStartTrialResponse extends ActionResponse {
// Nodes Prior to 6.3 did not have NEED_ACKNOWLEDGEMENT as part of status
enum Pre63Status {
UPGRADED_TO_TRIAL,
TRIAL_ALREADY_ACTIVATED;
}
enum Status {
public enum Status {
UPGRADED_TO_TRIAL(true, null, RestStatus.OK),
TRIAL_ALREADY_ACTIVATED(false, "Operation failed: Trial was already activated.", RestStatus.FORBIDDEN),
NEED_ACKNOWLEDGEMENT(false,"Operation failed: Needs acknowledgement.", RestStatus.OK);

View File

@ -20,6 +20,7 @@ import org.elasticsearch.cluster.metadata.IndexTemplateMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.allocation.ExistingShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Module;
@ -55,6 +56,7 @@ import org.elasticsearch.plugins.AnalysisPlugin;
import org.elasticsearch.plugins.ClusterPlugin;
import org.elasticsearch.plugins.DiscoveryPlugin;
import org.elasticsearch.plugins.EnginePlugin;
import org.elasticsearch.plugins.IndexStorePlugin;
import org.elasticsearch.plugins.IngestPlugin;
import org.elasticsearch.plugins.MapperPlugin;
import org.elasticsearch.plugins.NetworkPlugin;
@ -99,7 +101,7 @@ import java.util.stream.Collectors;
import static java.util.stream.Collectors.toList;
public class LocalStateCompositeXPackPlugin extends XPackPlugin implements ScriptPlugin, ActionPlugin, IngestPlugin, NetworkPlugin,
ClusterPlugin, DiscoveryPlugin, MapperPlugin, AnalysisPlugin, PersistentTaskPlugin, EnginePlugin {
ClusterPlugin, DiscoveryPlugin, MapperPlugin, AnalysisPlugin, PersistentTaskPlugin, EnginePlugin, IndexStorePlugin {
private XPackLicenseState licenseState;
private SSLService sslService;
@ -174,7 +176,8 @@ public class LocalStateCompositeXPackPlugin extends XPackPlugin implements Scrip
filterPlugins(Plugin.class).stream().forEach(p ->
components.addAll(p.createComponents(client, clusterService, threadPool, resourceWatcherService, scriptService,
xContentRegistry, environment, nodeEnvironment, namedWriteableRegistry, expressionResolver, null))
xContentRegistry, environment, nodeEnvironment, namedWriteableRegistry, expressionResolver,
repositoriesServiceSupplier))
);
return components;
}
@ -496,6 +499,20 @@ public class LocalStateCompositeXPackPlugin extends XPackPlugin implements Scrip
.collect(Collectors.toList());
}
@Override
public Map<String, ExistingShardsAllocator> getExistingShardsAllocators() {
final Map<String, ExistingShardsAllocator> allocators = new HashMap<>();
filterPlugins(ClusterPlugin.class).stream().forEach(p -> allocators.putAll(p.getExistingShardsAllocators()));
return allocators;
}
@Override
public Map<String, IndexStorePlugin.DirectoryFactory> getDirectoryFactories() {
final Map<String, IndexStorePlugin.DirectoryFactory> factories = new HashMap<>();
filterPlugins(IndexStorePlugin.class).stream().forEach(p -> factories.putAll(p.getDirectoryFactories()));
return factories;
}
private <T> List<T> filterPlugins(Class<T> type) {
return plugins.stream().filter(x -> type.isAssignableFrom(x.getClass())).map(p -> ((T)p))
.collect(Collectors.toList());

View File

@ -0,0 +1,95 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.searchablesnapshots;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.routing.RerouteService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.license.LicenseStateListener;
import org.elasticsearch.license.XPackLicenseState;
import java.util.HashSet;
import java.util.Set;
public class FailShardsOnInvalidLicenseClusterListener implements LicenseStateListener, IndexEventListener {
private static final Logger logger = LogManager.getLogger(FailShardsOnInvalidLicenseClusterListener.class);
private final XPackLicenseState xPackLicenseState;
private final RerouteService rerouteService;
final Set<IndexShard> shardsToFail = new HashSet<>();
private boolean allowed;
public FailShardsOnInvalidLicenseClusterListener(XPackLicenseState xPackLicenseState, RerouteService rerouteService) {
this.xPackLicenseState = xPackLicenseState;
this.rerouteService = rerouteService;
this.allowed = xPackLicenseState.isAllowed(XPackLicenseState.Feature.SEARCHABLE_SNAPSHOTS);
xPackLicenseState.addListener(this);
}
@Override
public synchronized void afterIndexShardStarted(IndexShard indexShard) {
shardsToFail.add(indexShard);
failActiveShardsIfNecessary();
}
@Override
public synchronized void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard, Settings indexSettings) {
if (indexShard != null) {
shardsToFail.remove(indexShard);
}
}
@Override
public synchronized void licenseStateChanged() {
final boolean allowed = xPackLicenseState.isAllowed(XPackLicenseState.Feature.SEARCHABLE_SNAPSHOTS);
if (allowed && this.allowed == false) {
rerouteService.reroute("reroute after license activation", Priority.NORMAL, new ActionListener<ClusterState>() {
@Override
public void onResponse(ClusterState clusterState) {
logger.trace("successful reroute after license activation");
}
@Override
public void onFailure(Exception e) {
logger.debug("unsuccessful reroute after license activation");
}
});
}
this.allowed = allowed;
failActiveShardsIfNecessary();
}
private void failActiveShardsIfNecessary() {
assert Thread.holdsLock(this);
if (allowed == false) {
for (IndexShard indexShard : shardsToFail) {
try {
indexShard.failShard("invalid license", null);
} catch (AlreadyClosedException ignored) {
// ignore
} catch (Exception e) {
logger.warn(new ParameterizedMessage("Could not close shard {} due to invalid license", indexShard.shardId()), e);
}
}
shardsToFail.clear();
}
}
}

View File

@ -0,0 +1,59 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.searchablesnapshots;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import java.util.function.BooleanSupplier;
public class SearchableSnapshotAllocationDecider extends AllocationDecider {
static final String NAME = "searchable_snapshots";
private final BooleanSupplier hasValidLicenseSupplier;
public SearchableSnapshotAllocationDecider(BooleanSupplier hasValidLicenseSupplier) {
this.hasValidLicenseSupplier = hasValidLicenseSupplier;
}
@Override
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
return allowAllocation(allocation.metadata().getIndexSafe(shardRouting.index()), allocation);
}
@Override
public Decision canAllocate(ShardRouting shardRouting, RoutingAllocation allocation) {
return allowAllocation(allocation.metadata().getIndexSafe(shardRouting.index()), allocation);
}
@Override
public Decision canAllocate(IndexMetadata indexMetadata, RoutingNode node, RoutingAllocation allocation) {
return allowAllocation(indexMetadata, allocation);
}
@Override
public Decision canForceAllocatePrimary(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
return allowAllocation(allocation.metadata().getIndexSafe(shardRouting.index()), allocation);
}
private Decision allowAllocation(IndexMetadata indexMetadata, RoutingAllocation allocation) {
if (SearchableSnapshotsConstants.isSearchableSnapshotStore(indexMetadata.getSettings())) {
if (hasValidLicenseSupplier.getAsBoolean()) {
return allocation.decision(Decision.YES, NAME, "valid license for searchable snapshots");
} else {
return allocation.decision(Decision.NO, NAME, "invalid license for searchable snapshots");
}
} else {
return allocation.decision(Decision.YES, NAME, "decider only applicable for indices backed by searchable snapshots");
}
}
}

View File

@ -12,6 +12,7 @@ import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.allocation.ExistingShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
@ -141,6 +142,7 @@ public class SearchableSnapshots extends Plugin implements IndexStorePlugin, Eng
private volatile Supplier<RepositoriesService> repositoriesServiceSupplier;
private final SetOnce<CacheService> cacheService = new SetOnce<>();
private final SetOnce<ThreadPool> threadPool = new SetOnce<>();
private final SetOnce<FailShardsOnInvalidLicenseClusterListener> failShardsListener = new SetOnce<>();
private final Settings settings;
private final boolean transportClientMode;
@ -195,6 +197,9 @@ public class SearchableSnapshots extends Plugin implements IndexStorePlugin, Eng
this.cacheService.set(cacheService);
this.repositoriesServiceSupplier = repositoriesServiceSupplier;
this.threadPool.set(threadPool);
this.failShardsListener.set(
new FailShardsOnInvalidLicenseClusterListener(getLicenseState(), clusterService.getRerouteService())
);
return org.elasticsearch.common.collect.List.of(cacheService);
} else {
this.repositoriesServiceSupplier = () -> {
@ -222,6 +227,7 @@ public class SearchableSnapshots extends Plugin implements IndexStorePlugin, Eng
public void onIndexModule(IndexModule indexModule) {
if (SearchableSnapshotsConstants.isSearchableSnapshotStore(indexModule.getSettings())) {
indexModule.addIndexEventListener(new SearchableSnapshotIndexEventListener());
indexModule.addIndexEventListener(failShardsListener.get());
}
}
@ -297,6 +303,22 @@ public class SearchableSnapshots extends Plugin implements IndexStorePlugin, Eng
}
}
// overridable by tests
protected XPackLicenseState getLicenseState() {
return XPackPlugin.getSharedLicenseState();
}
@Override
public Collection<AllocationDecider> createAllocationDeciders(Settings settings, ClusterSettings clusterSettings) {
if (SEARCHABLE_SNAPSHOTS_FEATURE_ENABLED) {
return org.elasticsearch.common.collect.List.of(
new SearchableSnapshotAllocationDecider(() -> getLicenseState().isAllowed(XPackLicenseState.Feature.SEARCHABLE_SNAPSHOTS))
);
} else {
return Collections.emptyList();
}
}
public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
if (SEARCHABLE_SNAPSHOTS_FEATURE_ENABLED) {
return org.elasticsearch.common.collect.List.of(executorBuilders());

View File

@ -31,7 +31,6 @@ import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.license.LicenseService;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin;
import org.elasticsearch.xpack.core.XPackSettings;
import org.elasticsearch.xpack.searchablesnapshots.cache.CacheService;
@ -51,7 +50,7 @@ public abstract class BaseSearchableSnapshotsIntegTestCase extends ESIntegTestCa
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return org.elasticsearch.common.collect.List.of(SearchableSnapshots.class, LocalStateCompositeXPackPlugin.class);
return org.elasticsearch.common.collect.List.of(LocalStateSearchableSnapshots.class);
}
@Override

View File

@ -0,0 +1,30 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.searchablesnapshots;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin;
import java.nio.file.Path;
public class LocalStateSearchableSnapshots extends LocalStateCompositeXPackPlugin {
public LocalStateSearchableSnapshots(final Settings settings, final Path configPath) {
super(settings, configPath);
LocalStateSearchableSnapshots thisVar = this;
plugins.add(new SearchableSnapshots(settings) {
@Override
protected XPackLicenseState getLicenseState() {
return thisVar.getLicenseState();
}
});
}
}

View File

@ -30,12 +30,22 @@ import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
import org.elasticsearch.action.support.DefaultShardOperationFailedException;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.license.DeleteLicenseAction;
import org.elasticsearch.license.License;
import org.elasticsearch.license.LicensesMetadata;
import org.elasticsearch.license.PostStartBasicAction;
import org.elasticsearch.license.PostStartBasicRequest;
import org.elasticsearch.license.PostStartTrialAction;
import org.elasticsearch.license.PostStartTrialRequest;
import org.elasticsearch.license.PostStartTrialResponse;
import org.elasticsearch.protocol.xpack.license.DeleteLicenseRequest;
import org.elasticsearch.snapshots.SnapshotInfo;
import org.elasticsearch.test.ESIntegTestCase;
@ -49,6 +59,7 @@ import org.elasticsearch.xpack.searchablesnapshots.action.SearchableSnapshotsSta
import org.elasticsearch.xpack.searchablesnapshots.action.SearchableSnapshotsStatsResponse;
import org.junit.Before;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
@ -150,4 +161,44 @@ public class SearchableSnapshotsLicenseIntegTests extends BaseSearchableSnapshot
assertThat(cause.getMessage(), containsString("current license is non-compliant for [searchable-snapshots]"));
}
}
public void testShardAllocationOnInvalidLicense() throws Exception {
// check that shards have been failed as part of invalid license
assertBusy(
() -> assertEquals(
ClusterHealthStatus.RED,
client().admin().cluster().prepareHealth(indexName).get().getIndices().get(indexName).getStatus()
)
);
// add a valid license again
// This is a bit of a hack in tests, as we can't readd a trial license
// We force this by clearing the existing basic license first
CountDownLatch latch = new CountDownLatch(1);
internalCluster().getCurrentMasterNodeInstance(ClusterService.class)
.submitStateUpdateTask("remove license", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
return ClusterState.builder(currentState)
.metadata(Metadata.builder(currentState.metadata()).removeCustom(LicensesMetadata.TYPE).build())
.build();
}
@Override
public void onFailure(String source, Exception e) {
throw new AssertionError(e);
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
latch.countDown();
}
});
latch.await();
PostStartTrialRequest startTrialRequest = new PostStartTrialRequest().setType(License.LicenseType.TRIAL.getTypeName())
.acknowledge(true);
PostStartTrialResponse resp = client().execute(PostStartTrialAction.INSTANCE, startTrialRequest).get();
assertEquals(PostStartTrialResponse.Status.UPGRADED_TO_TRIAL, resp.getStatus());
// check if cluster goes green again after valid license has been put in place
ensureGreen(indexName);
}
}

View File

@ -45,7 +45,6 @@ import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin;
import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotAction;
import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotRequest;
@ -83,11 +82,7 @@ public class SearchableSnapshotsPrewarmingIntegTests extends ESSingleNodeTestCas
@Override
protected Collection<Class<? extends Plugin>> getPlugins() {
return org.elasticsearch.common.collect.List.of(
SearchableSnapshots.class,
LocalStateCompositeXPackPlugin.class,
TrackingRepositoryPlugin.class
);
return org.elasticsearch.common.collect.List.of(LocalStateSearchableSnapshots.class, TrackingRepositoryPlugin.class);
}
@Override