Only allow x-pack metadata if all nodes are ready (#30743)
Enables a rolling restart from the OSS distribution to the x-pack based distribution by preventing x-pack code from installing custom metadata into the cluster state until all nodes are capable of deserializing this metadata.
This commit is contained in:
parent
ca999ad569
commit
8145a820c2
|
@ -70,6 +70,8 @@ public interface ClusterPlugin {
|
|||
* Returns a map of {@link ClusterState.Custom} supplier that should be invoked to initialize the initial clusterstate.
|
||||
* This allows custom clusterstate extensions to be always present and prevents invariants where clusterstates are published
|
||||
* but customs are not initialized.
|
||||
*
|
||||
* TODO: Remove this whole concept of InitialClusterStateCustomSupplier, it's not used anymore
|
||||
*/
|
||||
default Map<String, Supplier<ClusterState.Custom>> getInitialClusterStateCustomSupplier() { return Collections.emptyMap(); }
|
||||
}
|
||||
|
|
|
@ -223,6 +223,7 @@ public class LicenseService extends AbstractLifecycleComponent implements Cluste
|
|||
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) throws Exception {
|
||||
XPackPlugin.checkReadyForXPackCustomMetadata(currentState);
|
||||
MetaData currentMetadata = currentState.metaData();
|
||||
LicensesMetaData licensesMetaData = currentMetadata.custom(LicensesMetaData.TYPE);
|
||||
Version trialVersion = null;
|
||||
|
@ -341,7 +342,7 @@ public class LicenseService extends AbstractLifecycleComponent implements Cluste
|
|||
if (clusterService.lifecycleState() == Lifecycle.State.STARTED) {
|
||||
final ClusterState clusterState = clusterService.state();
|
||||
if (clusterState.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK) == false &&
|
||||
clusterState.nodes().getMasterNode() != null) {
|
||||
clusterState.nodes().getMasterNode() != null && XPackPlugin.isReadyForXPackCustomMetadata(clusterState)) {
|
||||
final LicensesMetaData currentMetaData = clusterState.metaData().custom(LicensesMetaData.TYPE);
|
||||
boolean noLicense = currentMetaData == null || currentMetaData.getLicense() == null;
|
||||
if (clusterState.getNodes().isLocalNodeElectedMaster() &&
|
||||
|
@ -374,6 +375,12 @@ public class LicenseService extends AbstractLifecycleComponent implements Cluste
|
|||
final ClusterState previousClusterState = event.previousState();
|
||||
final ClusterState currentClusterState = event.state();
|
||||
if (!currentClusterState.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
|
||||
if (XPackPlugin.isReadyForXPackCustomMetadata(currentClusterState) == false) {
|
||||
logger.debug("cannot add license to cluster as the following nodes might not understand the license metadata: {}",
|
||||
() -> XPackPlugin.nodesNotReadyForXPackCustomMetadata(currentClusterState));
|
||||
return;
|
||||
}
|
||||
|
||||
final LicensesMetaData prevLicensesMetaData = previousClusterState.getMetaData().custom(LicensesMetaData.TYPE);
|
||||
final LicensesMetaData currentLicensesMetaData = currentClusterState.getMetaData().custom(LicensesMetaData.TYPE);
|
||||
if (logger.isDebugEnabled()) {
|
||||
|
|
|
@ -13,6 +13,7 @@ import org.elasticsearch.cluster.ClusterState;
|
|||
import org.elasticsearch.cluster.ClusterStateUpdateTask;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.xpack.core.XPackPlugin;
|
||||
|
||||
import java.time.Clock;
|
||||
import java.util.Collections;
|
||||
|
@ -59,6 +60,7 @@ public class StartBasicClusterTask extends ClusterStateUpdateTask {
|
|||
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) throws Exception {
|
||||
XPackPlugin.checkReadyForXPackCustomMetadata(currentState);
|
||||
LicensesMetaData licensesMetaData = currentState.metaData().custom(LicensesMetaData.TYPE);
|
||||
License currentLicense = LicensesMetaData.extractLicense(licensesMetaData);
|
||||
if (currentLicense == null || currentLicense.type().equals("basic") == false) {
|
||||
|
|
|
@ -13,6 +13,7 @@ import org.elasticsearch.cluster.ClusterState;
|
|||
import org.elasticsearch.cluster.ClusterStateUpdateTask;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.xpack.core.XPackPlugin;
|
||||
|
||||
import java.time.Clock;
|
||||
import java.util.Collections;
|
||||
|
@ -64,6 +65,7 @@ public class StartTrialClusterTask extends ClusterStateUpdateTask {
|
|||
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) throws Exception {
|
||||
XPackPlugin.checkReadyForXPackCustomMetadata(currentState);
|
||||
LicensesMetaData currentLicensesMetaData = currentState.metaData().custom(LicensesMetaData.TYPE);
|
||||
|
||||
if (request.isAcknowledged() == false) {
|
||||
|
|
|
@ -16,6 +16,7 @@ import org.elasticsearch.cluster.service.ClusterService;
|
|||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.logging.Loggers;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.xpack.core.XPackPlugin;
|
||||
|
||||
import java.time.Clock;
|
||||
import java.util.UUID;
|
||||
|
@ -49,6 +50,7 @@ public class StartupSelfGeneratedLicenseTask extends ClusterStateUpdateTask {
|
|||
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) throws Exception {
|
||||
XPackPlugin.checkReadyForXPackCustomMetadata(currentState);
|
||||
final MetaData metaData = currentState.metaData();
|
||||
final LicensesMetaData currentLicensesMetaData = metaData.custom(LicensesMetaData.TYPE);
|
||||
// do not generate a license if any license is present
|
||||
|
|
|
@ -9,15 +9,20 @@ import org.apache.logging.log4j.Logger;
|
|||
import org.apache.lucene.util.SetOnce;
|
||||
import org.bouncycastle.operator.OperatorCreationException;
|
||||
import org.elasticsearch.SpecialPermission;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.ActionRequest;
|
||||
import org.elasticsearch.action.ActionResponse;
|
||||
import org.elasticsearch.action.GenericAction;
|
||||
import org.elasticsearch.action.support.ActionFilter;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.client.transport.TransportClient;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.Booleans;
|
||||
import org.elasticsearch.common.inject.Binder;
|
||||
import org.elasticsearch.common.inject.Module;
|
||||
import org.elasticsearch.common.inject.multibindings.Multibinder;
|
||||
|
@ -33,6 +38,7 @@ import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
|||
import org.elasticsearch.env.Environment;
|
||||
import org.elasticsearch.env.NodeEnvironment;
|
||||
import org.elasticsearch.license.LicenseService;
|
||||
import org.elasticsearch.license.LicensesMetaData;
|
||||
import org.elasticsearch.license.Licensing;
|
||||
import org.elasticsearch.license.XPackLicenseState;
|
||||
import org.elasticsearch.plugins.ExtensiblePlugin;
|
||||
|
@ -46,10 +52,13 @@ import org.elasticsearch.xpack.core.action.TransportXPackInfoAction;
|
|||
import org.elasticsearch.xpack.core.action.TransportXPackUsageAction;
|
||||
import org.elasticsearch.xpack.core.action.XPackInfoAction;
|
||||
import org.elasticsearch.xpack.core.action.XPackUsageAction;
|
||||
import org.elasticsearch.xpack.core.ml.MLMetadataField;
|
||||
import org.elasticsearch.xpack.core.rest.action.RestXPackInfoAction;
|
||||
import org.elasticsearch.xpack.core.rest.action.RestXPackUsageAction;
|
||||
import org.elasticsearch.xpack.core.security.authc.TokenMetaData;
|
||||
import org.elasticsearch.xpack.core.ssl.SSLConfigurationReloader;
|
||||
import org.elasticsearch.xpack.core.ssl.SSLService;
|
||||
import org.elasticsearch.xpack.core.watcher.WatcherMetaData;
|
||||
|
||||
import javax.security.auth.DestroyFailedException;
|
||||
|
||||
|
@ -62,14 +71,19 @@ import java.security.PrivilegedAction;
|
|||
import java.time.Clock;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.StreamSupport;
|
||||
|
||||
public class XPackPlugin extends XPackClientPlugin implements ScriptPlugin, ExtensiblePlugin {
|
||||
|
||||
private static Logger logger = ESLoggerFactory.getLogger(XPackPlugin.class);
|
||||
private static DeprecationLogger deprecationLogger = new DeprecationLogger(logger);
|
||||
|
||||
public static final String XPACK_INSTALLED_NODE_ATTR = "xpack.installed";
|
||||
|
||||
// TODO: clean up this library to not ask for write access to all system properties!
|
||||
static {
|
||||
// invoke this clinit in unbound with permissions to access all system properties
|
||||
|
@ -138,6 +152,75 @@ public class XPackPlugin extends XPackClientPlugin implements ScriptPlugin, Exte
|
|||
public static LicenseService getSharedLicenseService() { return licenseService.get(); }
|
||||
public static XPackLicenseState getSharedLicenseState() { return licenseState.get(); }
|
||||
|
||||
/**
|
||||
* Checks if the cluster state allows this node to add x-pack metadata to the cluster state,
|
||||
* and throws an exception otherwise.
|
||||
* This check should be called before installing any x-pack metadata to the cluster state,
|
||||
* to ensure that the other nodes that are part of the cluster will be able to deserialize
|
||||
* that metadata. Note that if the cluster state already contains x-pack metadata, this
|
||||
* check assumes that the nodes are already ready to receive additional x-pack metadata.
|
||||
* Having this check properly in place everywhere allows to install x-pack into a cluster
|
||||
* using a rolling restart.
|
||||
*/
|
||||
public static void checkReadyForXPackCustomMetadata(ClusterState clusterState) {
|
||||
if (alreadyContainsXPackCustomMetadata(clusterState)) {
|
||||
return;
|
||||
}
|
||||
List<DiscoveryNode> notReadyNodes = nodesNotReadyForXPackCustomMetadata(clusterState);
|
||||
if (notReadyNodes.isEmpty() == false) {
|
||||
throw new IllegalStateException("The following nodes are not ready yet for enabling x-pack custom metadata: " + notReadyNodes);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if the cluster state allows this node to add x-pack metadata to the cluster state.
|
||||
* See {@link #checkReadyForXPackCustomMetadata} for more details.
|
||||
*/
|
||||
public static boolean isReadyForXPackCustomMetadata(ClusterState clusterState) {
|
||||
return alreadyContainsXPackCustomMetadata(clusterState) || nodesNotReadyForXPackCustomMetadata(clusterState).isEmpty();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the list of nodes that won't allow this node from adding x-pack metadata to the cluster state.
|
||||
* See {@link #checkReadyForXPackCustomMetadata} for more details.
|
||||
*/
|
||||
public static List<DiscoveryNode> nodesNotReadyForXPackCustomMetadata(ClusterState clusterState) {
|
||||
// check that all nodes would be capable of deserializing newly added x-pack metadata
|
||||
final List<DiscoveryNode> notReadyNodes = StreamSupport.stream(clusterState.nodes().spliterator(), false).filter(node -> {
|
||||
final String xpackInstalledAttr = node.getAttributes().getOrDefault(XPACK_INSTALLED_NODE_ATTR, "false");
|
||||
|
||||
// The node attribute XPACK_INSTALLED_NODE_ATTR was only introduced in 6.3.0, so when
|
||||
// we have an older node in this mixed-version cluster without any x-pack metadata,
|
||||
// we want to prevent x-pack from adding custom metadata
|
||||
return node.getVersion().before(Version.V_6_3_0) || Booleans.parseBoolean(xpackInstalledAttr) == false;
|
||||
}).collect(Collectors.toList());
|
||||
|
||||
return notReadyNodes;
|
||||
}
|
||||
|
||||
private static boolean alreadyContainsXPackCustomMetadata(ClusterState clusterState) {
|
||||
final MetaData metaData = clusterState.metaData();
|
||||
return metaData.custom(LicensesMetaData.TYPE) != null ||
|
||||
metaData.custom(MLMetadataField.TYPE) != null ||
|
||||
metaData.custom(WatcherMetaData.TYPE) != null ||
|
||||
clusterState.custom(TokenMetaData.TYPE) != null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Settings additionalSettings() {
|
||||
final String xpackInstalledNodeAttrSetting = "node.attr." + XPACK_INSTALLED_NODE_ATTR;
|
||||
|
||||
if (settings.get(xpackInstalledNodeAttrSetting) != null) {
|
||||
throw new IllegalArgumentException("Directly setting [" + xpackInstalledNodeAttrSetting + "] is not permitted");
|
||||
}
|
||||
|
||||
if (transportClientMode) {
|
||||
return super.additionalSettings();
|
||||
} else {
|
||||
return Settings.builder().put(super.additionalSettings()).put(xpackInstalledNodeAttrSetting, "true").build();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<Module> createGuiceModules() {
|
||||
ArrayList<Module> modules = new ArrayList<>();
|
||||
|
|
|
@ -18,14 +18,16 @@ import org.elasticsearch.common.settings.Settings;
|
|||
import org.elasticsearch.env.Environment;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.watcher.ResourceWatcherService;
|
||||
import org.elasticsearch.xpack.core.XPackPlugin;
|
||||
import org.elasticsearch.xpack.core.watcher.watch.ClockMock;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
|
||||
import java.nio.file.Path;
|
||||
import java.util.Arrays;
|
||||
|
||||
import static java.util.Collections.emptyMap;
|
||||
import static java.util.Collections.emptySet;
|
||||
import static java.util.Collections.singletonMap;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
|
@ -66,6 +68,7 @@ public abstract class AbstractLicenseServiceTestCase extends ESTestCase {
|
|||
when(state.metaData()).thenReturn(metaData);
|
||||
final DiscoveryNode mockNode = getLocalNode();
|
||||
when(discoveryNodes.getMasterNode()).thenReturn(mockNode);
|
||||
when(discoveryNodes.spliterator()).thenReturn(Arrays.asList(mockNode).spliterator());
|
||||
when(discoveryNodes.isLocalNodeElectedMaster()).thenReturn(false);
|
||||
when(state.nodes()).thenReturn(discoveryNodes);
|
||||
when(state.getNodes()).thenReturn(discoveryNodes); // it is really ridiculous we have nodes() and getNodes()...
|
||||
|
@ -76,7 +79,8 @@ public abstract class AbstractLicenseServiceTestCase extends ESTestCase {
|
|||
}
|
||||
|
||||
protected DiscoveryNode getLocalNode() {
|
||||
return new DiscoveryNode("b", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
|
||||
return new DiscoveryNode("b", buildNewFakeTransportAddress(), singletonMap(XPackPlugin.XPACK_INSTALLED_NODE_ATTR, "true"),
|
||||
emptySet(), Version.CURRENT);
|
||||
}
|
||||
|
||||
@After
|
||||
|
|
|
@ -0,0 +1,99 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.core;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.license.XPackLicenseState;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.test.VersionUtils;
|
||||
import org.elasticsearch.xpack.core.security.authc.TokenMetaData;
|
||||
import org.elasticsearch.xpack.core.ssl.SSLService;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
|
||||
public class XPackPluginTests extends ESTestCase {
|
||||
|
||||
public void testXPackInstalledAttrClash() throws Exception {
|
||||
Settings.Builder builder = Settings.builder();
|
||||
builder.put("node.attr." + XPackPlugin.XPACK_INSTALLED_NODE_ATTR, randomBoolean());
|
||||
if (randomBoolean()) {
|
||||
builder.put(Client.CLIENT_TYPE_SETTING_S.getKey(), "transport");
|
||||
}
|
||||
XPackPlugin xpackPlugin = createXPackPlugin(builder.put("path.home", createTempDir()).build());
|
||||
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, xpackPlugin::additionalSettings);
|
||||
assertThat(e.getMessage(),
|
||||
containsString("Directly setting [node.attr." + XPackPlugin.XPACK_INSTALLED_NODE_ATTR + "] is not permitted"));
|
||||
}
|
||||
|
||||
public void testXPackInstalledAttrExists() throws Exception {
|
||||
XPackPlugin xpackPlugin = createXPackPlugin(Settings.builder().put("path.home", createTempDir()).build());
|
||||
assertEquals("true", xpackPlugin.additionalSettings().get("node.attr." + XPackPlugin.XPACK_INSTALLED_NODE_ATTR));
|
||||
}
|
||||
|
||||
public void testNodesNotReadyForXPackCustomMetadata() {
|
||||
boolean compatible;
|
||||
boolean nodesCompatible = true;
|
||||
DiscoveryNodes.Builder discoveryNodes = DiscoveryNodes.builder();
|
||||
|
||||
for (int i = 0; i < randomInt(3); i++) {
|
||||
final Version version = VersionUtils.randomVersion(random());
|
||||
final Map<String, String> attributes;
|
||||
if (randomBoolean() && version.onOrAfter(Version.V_6_3_0)) {
|
||||
attributes = Collections.singletonMap(XPackPlugin.XPACK_INSTALLED_NODE_ATTR, "true");
|
||||
} else {
|
||||
nodesCompatible = false;
|
||||
attributes = Collections.emptyMap();
|
||||
}
|
||||
|
||||
discoveryNodes.add(new DiscoveryNode("node_" + i, buildNewFakeTransportAddress(), attributes, Collections.emptySet(),
|
||||
Version.CURRENT));
|
||||
}
|
||||
ClusterState.Builder clusterStateBuilder = ClusterState.builder(ClusterName.DEFAULT);
|
||||
|
||||
if (randomBoolean()) {
|
||||
clusterStateBuilder.putCustom(TokenMetaData.TYPE, new TokenMetaData(Collections.emptyList(), new byte[0]));
|
||||
compatible = true;
|
||||
} else {
|
||||
compatible = nodesCompatible;
|
||||
}
|
||||
|
||||
ClusterState clusterState = clusterStateBuilder.nodes(discoveryNodes.build()).build();
|
||||
|
||||
assertEquals(XPackPlugin.nodesNotReadyForXPackCustomMetadata(clusterState).isEmpty(), nodesCompatible);
|
||||
assertEquals(XPackPlugin.isReadyForXPackCustomMetadata(clusterState), compatible);
|
||||
|
||||
if (compatible == false) {
|
||||
IllegalStateException e = expectThrows(IllegalStateException.class,
|
||||
() -> XPackPlugin.checkReadyForXPackCustomMetadata(clusterState));
|
||||
assertThat(e.getMessage(), containsString("The following nodes are not ready yet for enabling x-pack custom metadata:"));
|
||||
}
|
||||
}
|
||||
|
||||
private XPackPlugin createXPackPlugin(Settings settings) throws Exception {
|
||||
return new XPackPlugin(settings, null){
|
||||
|
||||
@Override
|
||||
protected void setSslService(SSLService sslService) {
|
||||
// disable
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void setLicenseState(XPackLicenseState licenseState) {
|
||||
// disable
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
}
|
|
@ -21,6 +21,7 @@ import org.elasticsearch.common.inject.Inject;
|
|||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.xpack.core.XPackPlugin;
|
||||
import org.elasticsearch.xpack.core.ml.MLMetadataField;
|
||||
import org.elasticsearch.xpack.core.ml.MlMetadata;
|
||||
import org.elasticsearch.xpack.core.ml.action.DeleteDatafeedAction;
|
||||
|
@ -120,6 +121,7 @@ public class TransportDeleteDatafeedAction extends TransportMasterNodeAction<Del
|
|||
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) {
|
||||
XPackPlugin.checkReadyForXPackCustomMetadata(currentState);
|
||||
MlMetadata currentMetadata = MlMetadata.getMlMetadata(currentState);
|
||||
PersistentTasksCustomMetaData persistentTasks =
|
||||
currentState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
|
||||
|
|
|
@ -19,6 +19,7 @@ import org.elasticsearch.common.inject.Inject;
|
|||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.xpack.core.XPackPlugin;
|
||||
import org.elasticsearch.xpack.core.ml.action.FinalizeJobExecutionAction;
|
||||
import org.elasticsearch.xpack.core.ml.MLMetadataField;
|
||||
import org.elasticsearch.xpack.core.ml.MlMetadata;
|
||||
|
@ -57,6 +58,7 @@ public class TransportFinalizeJobExecutionAction extends TransportMasterNodeActi
|
|||
clusterService.submitStateUpdateTask(source, new ClusterStateUpdateTask() {
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) {
|
||||
XPackPlugin.checkReadyForXPackCustomMetadata(currentState);
|
||||
MlMetadata mlMetadata = MlMetadata.getMlMetadata(currentState);
|
||||
MlMetadata.Builder mlMetadataBuilder = new MlMetadata.Builder(mlMetadata);
|
||||
Date finishedTime = new Date();
|
||||
|
|
|
@ -28,6 +28,7 @@ import org.elasticsearch.tasks.Task;
|
|||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.xpack.core.XPackField;
|
||||
import org.elasticsearch.xpack.core.XPackPlugin;
|
||||
import org.elasticsearch.xpack.core.XPackSettings;
|
||||
import org.elasticsearch.xpack.core.ml.MLMetadataField;
|
||||
import org.elasticsearch.xpack.core.ml.MlMetadata;
|
||||
|
@ -141,6 +142,7 @@ public class TransportPutDatafeedAction extends TransportMasterNodeAction<PutDat
|
|||
}
|
||||
|
||||
private ClusterState putDatafeed(PutDatafeedAction.Request request, ClusterState clusterState) {
|
||||
XPackPlugin.checkReadyForXPackCustomMetadata(clusterState);
|
||||
MlMetadata currentMetadata = MlMetadata.getMlMetadata(clusterState);
|
||||
MlMetadata newMetadata = new MlMetadata.Builder(currentMetadata)
|
||||
.putDatafeed(request.getDatafeed(), threadPool.getThreadContext()).build();
|
||||
|
|
|
@ -30,6 +30,7 @@ import org.elasticsearch.common.xcontent.XContentFactory;
|
|||
import org.elasticsearch.env.Environment;
|
||||
import org.elasticsearch.index.analysis.AnalysisRegistry;
|
||||
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
|
||||
import org.elasticsearch.xpack.core.XPackPlugin;
|
||||
import org.elasticsearch.xpack.core.ml.MLMetadataField;
|
||||
import org.elasticsearch.xpack.core.ml.MachineLearningField;
|
||||
import org.elasticsearch.xpack.core.ml.MlMetadata;
|
||||
|
@ -184,6 +185,9 @@ public class JobManager extends AbstractComponent {
|
|||
DEPRECATION_LOGGER.deprecated("Creating jobs with delimited data format is deprecated. Please use xcontent instead.");
|
||||
}
|
||||
|
||||
// pre-flight check, not necessarily required, but avoids figuring this out while on the CS update thread
|
||||
XPackPlugin.checkReadyForXPackCustomMetadata(state);
|
||||
|
||||
MlMetadata currentMlMetadata = MlMetadata.getMlMetadata(state);
|
||||
if (currentMlMetadata.getJobs().containsKey(job.getId())) {
|
||||
actionListener.onFailure(ExceptionsHelper.jobAlreadyExists(job.getId()));
|
||||
|
@ -565,6 +569,7 @@ public class JobManager extends AbstractComponent {
|
|||
}
|
||||
|
||||
private static ClusterState buildNewClusterState(ClusterState currentState, MlMetadata.Builder builder) {
|
||||
XPackPlugin.checkReadyForXPackCustomMetadata(currentState);
|
||||
ClusterState.Builder newState = ClusterState.builder(currentState);
|
||||
newState.metaData(MetaData.builder(currentState.getMetaData()).putCustom(MLMetadataField.TYPE, builder.build()).build());
|
||||
return newState.build();
|
||||
|
|
|
@ -37,13 +37,11 @@ public class MachineLearningTests extends ESTestCase {
|
|||
public void testNoAttributes_givenSameAndMlEnabled() {
|
||||
Settings.Builder builder = Settings.builder();
|
||||
if (randomBoolean()) {
|
||||
builder.put("xpack.ml.enabled", true);
|
||||
builder.put("node.attr.ml.enabled", true);
|
||||
builder.put("xpack.ml.enabled", randomBoolean());
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
int maxOpenJobs = randomIntBetween(5, 15);
|
||||
builder.put("xpack.ml.max_open_jobs", maxOpenJobs);
|
||||
builder.put("node.attr.ml.max_open_jobs", maxOpenJobs);
|
||||
}
|
||||
MachineLearning machineLearning = createMachineLearning(builder.put("path.home", createTempDir()).build());
|
||||
assertNotNull(machineLearning.additionalSettings());
|
||||
|
@ -51,16 +49,8 @@ public class MachineLearningTests extends ESTestCase {
|
|||
|
||||
public void testNoAttributes_givenClash() {
|
||||
Settings.Builder builder = Settings.builder();
|
||||
boolean enabled = true;
|
||||
if (randomBoolean()) {
|
||||
enabled = randomBoolean();
|
||||
builder.put("xpack.ml.enabled", enabled);
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
builder.put("xpack.ml.max_open_jobs", randomIntBetween(9, 12));
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
builder.put("node.attr.ml.enabled", !enabled);
|
||||
builder.put("node.attr.ml.enabled", randomBoolean());
|
||||
} else {
|
||||
builder.put("node.attr.ml.max_open_jobs", randomIntBetween(13, 15));
|
||||
}
|
||||
|
|
|
@ -317,7 +317,7 @@ public class Security extends Plugin implements ActionPlugin, IngestPlugin, Netw
|
|||
}
|
||||
modules.add(b -> XPackPlugin.bindFeatureSet(b, SecurityFeatureSet.class));
|
||||
|
||||
|
||||
|
||||
if (enabled == false) {
|
||||
modules.add(b -> {
|
||||
b.bind(Realms.class).toProvider(Providers.of(null)); // for SecurityFeatureSet
|
||||
|
@ -903,15 +903,6 @@ public class Security extends Plugin implements ActionPlugin, IngestPlugin, Netw
|
|||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Supplier<ClusterState.Custom>> getInitialClusterStateCustomSupplier() {
|
||||
if (enabled) {
|
||||
return Collections.singletonMap(TokenMetaData.TYPE, () -> tokenService.get().getTokenMetaData());
|
||||
} else {
|
||||
return Collections.emptyMap();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Function<String, Predicate<String>> getFieldFilter() {
|
||||
if (enabled) {
|
||||
|
|
|
@ -8,6 +8,9 @@ package org.elasticsearch.xpack.security.authc;
|
|||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.apache.lucene.util.BytesRefBuilder;
|
||||
import org.elasticsearch.cluster.ClusterStateUpdateTask;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.common.Priority;
|
||||
import org.elasticsearch.core.internal.io.IOUtils;
|
||||
import org.apache.lucene.util.UnicodeUtil;
|
||||
import org.elasticsearch.ElasticsearchSecurityException;
|
||||
|
@ -63,6 +66,7 @@ import org.elasticsearch.index.query.BoolQueryBuilder;
|
|||
import org.elasticsearch.index.query.QueryBuilders;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.search.SearchHit;
|
||||
import org.elasticsearch.xpack.core.XPackPlugin;
|
||||
import org.elasticsearch.xpack.core.XPackSettings;
|
||||
import org.elasticsearch.xpack.core.XPackField;
|
||||
import org.elasticsearch.xpack.core.security.ScrollHelper;
|
||||
|
@ -107,6 +111,7 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.function.Supplier;
|
||||
|
@ -1327,6 +1332,8 @@ public final class TokenService extends AbstractComponent {
|
|||
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) throws Exception {
|
||||
XPackPlugin.checkReadyForXPackCustomMetadata(currentState);
|
||||
|
||||
if (tokenMetaData.equals(currentState.custom(TokenMetaData.TYPE))) {
|
||||
return currentState;
|
||||
}
|
||||
|
@ -1347,6 +1354,15 @@ public final class TokenService extends AbstractComponent {
|
|||
return;
|
||||
}
|
||||
|
||||
if (state.nodes().isLocalNodeElectedMaster()) {
|
||||
if (XPackPlugin.isReadyForXPackCustomMetadata(state)) {
|
||||
installTokenMetadata(state.metaData());
|
||||
} else {
|
||||
logger.debug("cannot add token metadata to cluster as the following nodes might not understand the metadata: {}",
|
||||
() -> XPackPlugin.nodesNotReadyForXPackCustomMetadata(state));
|
||||
}
|
||||
}
|
||||
|
||||
TokenMetaData custom = event.state().custom(TokenMetaData.TYPE);
|
||||
if (custom != null && custom.equals(getTokenMetaData()) == false) {
|
||||
logger.info("refresh keys");
|
||||
|
@ -1360,6 +1376,39 @@ public final class TokenService extends AbstractComponent {
|
|||
});
|
||||
}
|
||||
|
||||
// to prevent too many cluster state update tasks to be queued for doing the same update
|
||||
private final AtomicBoolean installTokenMetadataInProgress = new AtomicBoolean(false);
|
||||
|
||||
private void installTokenMetadata(MetaData metaData) {
|
||||
if (metaData.custom(TokenMetaData.TYPE) == null) {
|
||||
if (installTokenMetadataInProgress.compareAndSet(false, true)) {
|
||||
clusterService.submitStateUpdateTask("install-token-metadata", new ClusterStateUpdateTask(Priority.URGENT) {
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) {
|
||||
XPackPlugin.checkReadyForXPackCustomMetadata(currentState);
|
||||
|
||||
if (currentState.custom(TokenMetaData.TYPE) == null) {
|
||||
return ClusterState.builder(currentState).putCustom(TokenMetaData.TYPE, getTokenMetaData()).build();
|
||||
} else {
|
||||
return currentState;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(String source, Exception e) {
|
||||
installTokenMetadataInProgress.set(false);
|
||||
logger.error("unable to install token metadata", e);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
|
||||
installTokenMetadataInProgress.set(false);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* For testing
|
||||
*/
|
||||
|
|
|
@ -23,6 +23,7 @@ import org.elasticsearch.common.settings.Settings;
|
|||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.xpack.core.XPackPlugin;
|
||||
import org.elasticsearch.xpack.core.watcher.WatcherMetaData;
|
||||
import org.elasticsearch.xpack.core.watcher.transport.actions.service.WatcherServiceAction;
|
||||
import org.elasticsearch.xpack.core.watcher.transport.actions.service.WatcherServiceRequest;
|
||||
|
@ -86,6 +87,8 @@ public class TransportWatcherServiceAction extends TransportMasterNodeAction<Wat
|
|||
|
||||
@Override
|
||||
public ClusterState execute(ClusterState clusterState) {
|
||||
XPackPlugin.checkReadyForXPackCustomMetadata(clusterState);
|
||||
|
||||
WatcherMetaData newWatcherMetaData = new WatcherMetaData(manuallyStopped);
|
||||
WatcherMetaData currentMetaData = clusterState.metaData().custom(WatcherMetaData.TYPE);
|
||||
|
||||
|
|
Loading…
Reference in New Issue