diff --git a/server/src/main/java/org/elasticsearch/plugins/ClusterPlugin.java b/server/src/main/java/org/elasticsearch/plugins/ClusterPlugin.java index 5e58aa5a3a9..61145c7a1d7 100644 --- a/server/src/main/java/org/elasticsearch/plugins/ClusterPlugin.java +++ b/server/src/main/java/org/elasticsearch/plugins/ClusterPlugin.java @@ -70,6 +70,8 @@ public interface ClusterPlugin { * Returns a map of {@link ClusterState.Custom} supplier that should be invoked to initialize the initial clusterstate. * This allows custom clusterstate extensions to be always present and prevents invariants where clusterstates are published * but customs are not initialized. + * + * TODO: Remove this whole concept of InitialClusterStateCustomSupplier, it's not used anymore */ default Map> getInitialClusterStateCustomSupplier() { return Collections.emptyMap(); } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/license/LicenseService.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/license/LicenseService.java index fa0c239aab1..99e6a10ad92 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/license/LicenseService.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/license/LicenseService.java @@ -223,6 +223,7 @@ public class LicenseService extends AbstractLifecycleComponent implements Cluste @Override public ClusterState execute(ClusterState currentState) throws Exception { + XPackPlugin.checkReadyForXPackCustomMetadata(currentState); MetaData currentMetadata = currentState.metaData(); LicensesMetaData licensesMetaData = currentMetadata.custom(LicensesMetaData.TYPE); Version trialVersion = null; @@ -341,7 +342,7 @@ public class LicenseService extends AbstractLifecycleComponent implements Cluste if (clusterService.lifecycleState() == Lifecycle.State.STARTED) { final ClusterState clusterState = clusterService.state(); if (clusterState.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK) == false && - clusterState.nodes().getMasterNode() != null) { + clusterState.nodes().getMasterNode() != null && XPackPlugin.isReadyForXPackCustomMetadata(clusterState)) { final LicensesMetaData currentMetaData = clusterState.metaData().custom(LicensesMetaData.TYPE); boolean noLicense = currentMetaData == null || currentMetaData.getLicense() == null; if (clusterState.getNodes().isLocalNodeElectedMaster() && @@ -374,6 +375,12 @@ public class LicenseService extends AbstractLifecycleComponent implements Cluste final ClusterState previousClusterState = event.previousState(); final ClusterState currentClusterState = event.state(); if (!currentClusterState.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) { + if (XPackPlugin.isReadyForXPackCustomMetadata(currentClusterState) == false) { + logger.debug("cannot add license to cluster as the following nodes might not understand the license metadata: {}", + () -> XPackPlugin.nodesNotReadyForXPackCustomMetadata(currentClusterState)); + return; + } + final LicensesMetaData prevLicensesMetaData = previousClusterState.getMetaData().custom(LicensesMetaData.TYPE); final LicensesMetaData currentLicensesMetaData = currentClusterState.getMetaData().custom(LicensesMetaData.TYPE); if (logger.isDebugEnabled()) { diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/license/StartBasicClusterTask.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/license/StartBasicClusterTask.java index 355482872d6..0cf949a6990 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/license/StartBasicClusterTask.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/license/StartBasicClusterTask.java @@ -13,6 +13,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.common.Nullable; +import org.elasticsearch.xpack.core.XPackPlugin; import java.time.Clock; import java.util.Collections; @@ -59,6 +60,7 @@ public class StartBasicClusterTask extends ClusterStateUpdateTask { @Override public ClusterState execute(ClusterState currentState) throws Exception { + XPackPlugin.checkReadyForXPackCustomMetadata(currentState); LicensesMetaData licensesMetaData = currentState.metaData().custom(LicensesMetaData.TYPE); License currentLicense = LicensesMetaData.extractLicense(licensesMetaData); if (currentLicense == null || currentLicense.type().equals("basic") == false) { diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/license/StartTrialClusterTask.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/license/StartTrialClusterTask.java index 355672dedf7..5c5c03151ba 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/license/StartTrialClusterTask.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/license/StartTrialClusterTask.java @@ -13,6 +13,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.common.Nullable; +import org.elasticsearch.xpack.core.XPackPlugin; import java.time.Clock; import java.util.Collections; @@ -64,6 +65,7 @@ public class StartTrialClusterTask extends ClusterStateUpdateTask { @Override public ClusterState execute(ClusterState currentState) throws Exception { + XPackPlugin.checkReadyForXPackCustomMetadata(currentState); LicensesMetaData currentLicensesMetaData = currentState.metaData().custom(LicensesMetaData.TYPE); if (request.isAcknowledged() == false) { diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/license/StartupSelfGeneratedLicenseTask.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/license/StartupSelfGeneratedLicenseTask.java index 77695f64538..823283ac5a8 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/license/StartupSelfGeneratedLicenseTask.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/license/StartupSelfGeneratedLicenseTask.java @@ -16,6 +16,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.xpack.core.XPackPlugin; import java.time.Clock; import java.util.UUID; @@ -49,6 +50,7 @@ public class StartupSelfGeneratedLicenseTask extends ClusterStateUpdateTask { @Override public ClusterState execute(ClusterState currentState) throws Exception { + XPackPlugin.checkReadyForXPackCustomMetadata(currentState); final MetaData metaData = currentState.metaData(); final LicensesMetaData currentLicensesMetaData = metaData.custom(LicensesMetaData.TYPE); // do not generate a license if any license is present diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackPlugin.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackPlugin.java index 5ee46f3b3c9..77d521e2d43 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackPlugin.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackPlugin.java @@ -9,15 +9,20 @@ import org.apache.logging.log4j.Logger; import org.apache.lucene.util.SetOnce; import org.bouncycastle.operator.OperatorCreationException; import org.elasticsearch.SpecialPermission; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.GenericAction; import org.elasticsearch.action.support.ActionFilter; import org.elasticsearch.client.Client; import org.elasticsearch.client.transport.TransportClient; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Booleans; import org.elasticsearch.common.inject.Binder; import org.elasticsearch.common.inject.Module; import org.elasticsearch.common.inject.multibindings.Multibinder; @@ -33,6 +38,7 @@ import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.env.Environment; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.license.LicenseService; +import org.elasticsearch.license.LicensesMetaData; import org.elasticsearch.license.Licensing; import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.plugins.ExtensiblePlugin; @@ -46,10 +52,13 @@ import org.elasticsearch.xpack.core.action.TransportXPackInfoAction; import org.elasticsearch.xpack.core.action.TransportXPackUsageAction; import org.elasticsearch.xpack.core.action.XPackInfoAction; import org.elasticsearch.xpack.core.action.XPackUsageAction; +import org.elasticsearch.xpack.core.ml.MLMetadataField; import org.elasticsearch.xpack.core.rest.action.RestXPackInfoAction; import org.elasticsearch.xpack.core.rest.action.RestXPackUsageAction; +import org.elasticsearch.xpack.core.security.authc.TokenMetaData; import org.elasticsearch.xpack.core.ssl.SSLConfigurationReloader; import org.elasticsearch.xpack.core.ssl.SSLService; +import org.elasticsearch.xpack.core.watcher.WatcherMetaData; import javax.security.auth.DestroyFailedException; @@ -62,14 +71,19 @@ import java.security.PrivilegedAction; import java.time.Clock; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; public class XPackPlugin extends XPackClientPlugin implements ScriptPlugin, ExtensiblePlugin { private static Logger logger = ESLoggerFactory.getLogger(XPackPlugin.class); private static DeprecationLogger deprecationLogger = new DeprecationLogger(logger); + public static final String XPACK_INSTALLED_NODE_ATTR = "xpack.installed"; + // TODO: clean up this library to not ask for write access to all system properties! static { // invoke this clinit in unbound with permissions to access all system properties @@ -138,6 +152,75 @@ public class XPackPlugin extends XPackClientPlugin implements ScriptPlugin, Exte public static LicenseService getSharedLicenseService() { return licenseService.get(); } public static XPackLicenseState getSharedLicenseState() { return licenseState.get(); } + /** + * Checks if the cluster state allows this node to add x-pack metadata to the cluster state, + * and throws an exception otherwise. + * This check should be called before installing any x-pack metadata to the cluster state, + * to ensure that the other nodes that are part of the cluster will be able to deserialize + * that metadata. Note that if the cluster state already contains x-pack metadata, this + * check assumes that the nodes are already ready to receive additional x-pack metadata. + * Having this check properly in place everywhere allows to install x-pack into a cluster + * using a rolling restart. + */ + public static void checkReadyForXPackCustomMetadata(ClusterState clusterState) { + if (alreadyContainsXPackCustomMetadata(clusterState)) { + return; + } + List notReadyNodes = nodesNotReadyForXPackCustomMetadata(clusterState); + if (notReadyNodes.isEmpty() == false) { + throw new IllegalStateException("The following nodes are not ready yet for enabling x-pack custom metadata: " + notReadyNodes); + } + } + + /** + * Checks if the cluster state allows this node to add x-pack metadata to the cluster state. + * See {@link #checkReadyForXPackCustomMetadata} for more details. + */ + public static boolean isReadyForXPackCustomMetadata(ClusterState clusterState) { + return alreadyContainsXPackCustomMetadata(clusterState) || nodesNotReadyForXPackCustomMetadata(clusterState).isEmpty(); + } + + /** + * Returns the list of nodes that won't allow this node from adding x-pack metadata to the cluster state. + * See {@link #checkReadyForXPackCustomMetadata} for more details. + */ + public static List nodesNotReadyForXPackCustomMetadata(ClusterState clusterState) { + // check that all nodes would be capable of deserializing newly added x-pack metadata + final List notReadyNodes = StreamSupport.stream(clusterState.nodes().spliterator(), false).filter(node -> { + final String xpackInstalledAttr = node.getAttributes().getOrDefault(XPACK_INSTALLED_NODE_ATTR, "false"); + + // The node attribute XPACK_INSTALLED_NODE_ATTR was only introduced in 6.3.0, so when + // we have an older node in this mixed-version cluster without any x-pack metadata, + // we want to prevent x-pack from adding custom metadata + return node.getVersion().before(Version.V_6_3_0) || Booleans.parseBoolean(xpackInstalledAttr) == false; + }).collect(Collectors.toList()); + + return notReadyNodes; + } + + private static boolean alreadyContainsXPackCustomMetadata(ClusterState clusterState) { + final MetaData metaData = clusterState.metaData(); + return metaData.custom(LicensesMetaData.TYPE) != null || + metaData.custom(MLMetadataField.TYPE) != null || + metaData.custom(WatcherMetaData.TYPE) != null || + clusterState.custom(TokenMetaData.TYPE) != null; + } + + @Override + public Settings additionalSettings() { + final String xpackInstalledNodeAttrSetting = "node.attr." + XPACK_INSTALLED_NODE_ATTR; + + if (settings.get(xpackInstalledNodeAttrSetting) != null) { + throw new IllegalArgumentException("Directly setting [" + xpackInstalledNodeAttrSetting + "] is not permitted"); + } + + if (transportClientMode) { + return super.additionalSettings(); + } else { + return Settings.builder().put(super.additionalSettings()).put(xpackInstalledNodeAttrSetting, "true").build(); + } + } + @Override public Collection createGuiceModules() { ArrayList modules = new ArrayList<>(); diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/license/AbstractLicenseServiceTestCase.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/license/AbstractLicenseServiceTestCase.java index 2f110f4f8a9..5bc33ae330a 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/license/AbstractLicenseServiceTestCase.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/license/AbstractLicenseServiceTestCase.java @@ -18,14 +18,16 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.env.Environment; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.watcher.ResourceWatcherService; +import org.elasticsearch.xpack.core.XPackPlugin; import org.elasticsearch.xpack.core.watcher.watch.ClockMock; import org.junit.After; import org.junit.Before; import java.nio.file.Path; +import java.util.Arrays; -import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; +import static java.util.Collections.singletonMap; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -66,6 +68,7 @@ public abstract class AbstractLicenseServiceTestCase extends ESTestCase { when(state.metaData()).thenReturn(metaData); final DiscoveryNode mockNode = getLocalNode(); when(discoveryNodes.getMasterNode()).thenReturn(mockNode); + when(discoveryNodes.spliterator()).thenReturn(Arrays.asList(mockNode).spliterator()); when(discoveryNodes.isLocalNodeElectedMaster()).thenReturn(false); when(state.nodes()).thenReturn(discoveryNodes); when(state.getNodes()).thenReturn(discoveryNodes); // it is really ridiculous we have nodes() and getNodes()... @@ -76,7 +79,8 @@ public abstract class AbstractLicenseServiceTestCase extends ESTestCase { } protected DiscoveryNode getLocalNode() { - return new DiscoveryNode("b", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); + return new DiscoveryNode("b", buildNewFakeTransportAddress(), singletonMap(XPackPlugin.XPACK_INSTALLED_NODE_ATTR, "true"), + emptySet(), Version.CURRENT); } @After diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/XPackPluginTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/XPackPluginTests.java new file mode 100644 index 00000000000..59731cab71d --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/XPackPluginTests.java @@ -0,0 +1,99 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core; + +import org.elasticsearch.Version; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.license.XPackLicenseState; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.VersionUtils; +import org.elasticsearch.xpack.core.security.authc.TokenMetaData; +import org.elasticsearch.xpack.core.ssl.SSLService; + +import java.util.Collections; +import java.util.Map; + +import static org.hamcrest.Matchers.containsString; + +public class XPackPluginTests extends ESTestCase { + + public void testXPackInstalledAttrClash() throws Exception { + Settings.Builder builder = Settings.builder(); + builder.put("node.attr." + XPackPlugin.XPACK_INSTALLED_NODE_ATTR, randomBoolean()); + if (randomBoolean()) { + builder.put(Client.CLIENT_TYPE_SETTING_S.getKey(), "transport"); + } + XPackPlugin xpackPlugin = createXPackPlugin(builder.put("path.home", createTempDir()).build()); + IllegalArgumentException e = expectThrows(IllegalArgumentException.class, xpackPlugin::additionalSettings); + assertThat(e.getMessage(), + containsString("Directly setting [node.attr." + XPackPlugin.XPACK_INSTALLED_NODE_ATTR + "] is not permitted")); + } + + public void testXPackInstalledAttrExists() throws Exception { + XPackPlugin xpackPlugin = createXPackPlugin(Settings.builder().put("path.home", createTempDir()).build()); + assertEquals("true", xpackPlugin.additionalSettings().get("node.attr." + XPackPlugin.XPACK_INSTALLED_NODE_ATTR)); + } + + public void testNodesNotReadyForXPackCustomMetadata() { + boolean compatible; + boolean nodesCompatible = true; + DiscoveryNodes.Builder discoveryNodes = DiscoveryNodes.builder(); + + for (int i = 0; i < randomInt(3); i++) { + final Version version = VersionUtils.randomVersion(random()); + final Map attributes; + if (randomBoolean() && version.onOrAfter(Version.V_6_3_0)) { + attributes = Collections.singletonMap(XPackPlugin.XPACK_INSTALLED_NODE_ATTR, "true"); + } else { + nodesCompatible = false; + attributes = Collections.emptyMap(); + } + + discoveryNodes.add(new DiscoveryNode("node_" + i, buildNewFakeTransportAddress(), attributes, Collections.emptySet(), + Version.CURRENT)); + } + ClusterState.Builder clusterStateBuilder = ClusterState.builder(ClusterName.DEFAULT); + + if (randomBoolean()) { + clusterStateBuilder.putCustom(TokenMetaData.TYPE, new TokenMetaData(Collections.emptyList(), new byte[0])); + compatible = true; + } else { + compatible = nodesCompatible; + } + + ClusterState clusterState = clusterStateBuilder.nodes(discoveryNodes.build()).build(); + + assertEquals(XPackPlugin.nodesNotReadyForXPackCustomMetadata(clusterState).isEmpty(), nodesCompatible); + assertEquals(XPackPlugin.isReadyForXPackCustomMetadata(clusterState), compatible); + + if (compatible == false) { + IllegalStateException e = expectThrows(IllegalStateException.class, + () -> XPackPlugin.checkReadyForXPackCustomMetadata(clusterState)); + assertThat(e.getMessage(), containsString("The following nodes are not ready yet for enabling x-pack custom metadata:")); + } + } + + private XPackPlugin createXPackPlugin(Settings settings) throws Exception { + return new XPackPlugin(settings, null){ + + @Override + protected void setSslService(SSLService sslService) { + // disable + } + + @Override + protected void setLicenseState(XPackLicenseState licenseState) { + // disable + } + }; + } + +} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteDatafeedAction.java index 220d97e89ba..8c9eabe6de1 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteDatafeedAction.java @@ -21,6 +21,7 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.XPackPlugin; import org.elasticsearch.xpack.core.ml.MLMetadataField; import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.action.DeleteDatafeedAction; @@ -120,6 +121,7 @@ public class TransportDeleteDatafeedAction extends TransportMasterNodeAction XPackPlugin.bindFeatureSet(b, SecurityFeatureSet.class)); - + if (enabled == false) { modules.add(b -> { b.bind(Realms.class).toProvider(Providers.of(null)); // for SecurityFeatureSet @@ -903,15 +903,6 @@ public class Security extends Plugin implements ActionPlugin, IngestPlugin, Netw }; } - @Override - public Map> getInitialClusterStateCustomSupplier() { - if (enabled) { - return Collections.singletonMap(TokenMetaData.TYPE, () -> tokenService.get().getTokenMetaData()); - } else { - return Collections.emptyMap(); - } - } - @Override public Function> getFieldFilter() { if (enabled) { diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/TokenService.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/TokenService.java index d23415f87df..2934fb8062d 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/TokenService.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/TokenService.java @@ -8,6 +8,9 @@ package org.elasticsearch.xpack.security.authc; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRefBuilder; +import org.elasticsearch.cluster.ClusterStateUpdateTask; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.common.Priority; import org.elasticsearch.core.internal.io.IOUtils; import org.apache.lucene.util.UnicodeUtil; import org.elasticsearch.ElasticsearchSecurityException; @@ -63,6 +66,7 @@ import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.SearchHit; +import org.elasticsearch.xpack.core.XPackPlugin; import org.elasticsearch.xpack.core.XPackSettings; import org.elasticsearch.xpack.core.XPackField; import org.elasticsearch.xpack.core.security.ScrollHelper; @@ -107,6 +111,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; @@ -1327,6 +1332,8 @@ public final class TokenService extends AbstractComponent { @Override public ClusterState execute(ClusterState currentState) throws Exception { + XPackPlugin.checkReadyForXPackCustomMetadata(currentState); + if (tokenMetaData.equals(currentState.custom(TokenMetaData.TYPE))) { return currentState; } @@ -1347,6 +1354,15 @@ public final class TokenService extends AbstractComponent { return; } + if (state.nodes().isLocalNodeElectedMaster()) { + if (XPackPlugin.isReadyForXPackCustomMetadata(state)) { + installTokenMetadata(state.metaData()); + } else { + logger.debug("cannot add token metadata to cluster as the following nodes might not understand the metadata: {}", + () -> XPackPlugin.nodesNotReadyForXPackCustomMetadata(state)); + } + } + TokenMetaData custom = event.state().custom(TokenMetaData.TYPE); if (custom != null && custom.equals(getTokenMetaData()) == false) { logger.info("refresh keys"); @@ -1360,6 +1376,39 @@ public final class TokenService extends AbstractComponent { }); } + // to prevent too many cluster state update tasks to be queued for doing the same update + private final AtomicBoolean installTokenMetadataInProgress = new AtomicBoolean(false); + + private void installTokenMetadata(MetaData metaData) { + if (metaData.custom(TokenMetaData.TYPE) == null) { + if (installTokenMetadataInProgress.compareAndSet(false, true)) { + clusterService.submitStateUpdateTask("install-token-metadata", new ClusterStateUpdateTask(Priority.URGENT) { + @Override + public ClusterState execute(ClusterState currentState) { + XPackPlugin.checkReadyForXPackCustomMetadata(currentState); + + if (currentState.custom(TokenMetaData.TYPE) == null) { + return ClusterState.builder(currentState).putCustom(TokenMetaData.TYPE, getTokenMetaData()).build(); + } else { + return currentState; + } + } + + @Override + public void onFailure(String source, Exception e) { + installTokenMetadataInProgress.set(false); + logger.error("unable to install token metadata", e); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + installTokenMetadataInProgress.set(false); + } + }); + } + } + } + /** * For testing */ diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/service/TransportWatcherServiceAction.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/service/TransportWatcherServiceAction.java index fa78208494f..6b2bb26ef45 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/service/TransportWatcherServiceAction.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/service/TransportWatcherServiceAction.java @@ -23,6 +23,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.XPackPlugin; import org.elasticsearch.xpack.core.watcher.WatcherMetaData; import org.elasticsearch.xpack.core.watcher.transport.actions.service.WatcherServiceAction; import org.elasticsearch.xpack.core.watcher.transport.actions.service.WatcherServiceRequest; @@ -86,6 +87,8 @@ public class TransportWatcherServiceAction extends TransportMasterNodeAction