diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/ClusterClientIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/ClusterClientIT.java index 2ae6f9dc186..2a870ec65ee 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/ClusterClientIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/ClusterClientIT.java @@ -129,7 +129,6 @@ public class ClusterClientIT extends ESRestHighLevelClientTestCase { createIndex("index2", Settings.EMPTY); ClusterHealthRequest request = new ClusterHealthRequest(); request.timeout("5s"); - request.level(ClusterHealthRequest.Level.CLUSTER); ClusterHealthResponse response = execute(request, highLevelClient().cluster()::health, highLevelClient().cluster()::healthAsync); assertYellowShards(response); @@ -170,6 +169,7 @@ public class ClusterClientIT extends ESRestHighLevelClientTestCase { createIndex("index", Settings.EMPTY); createIndex("index2", Settings.EMPTY); ClusterHealthRequest request = new ClusterHealthRequest("index"); + request.level(ClusterHealthRequest.Level.SHARDS); request.timeout("5s"); ClusterHealthResponse response = execute(request, highLevelClient().cluster()::health, highLevelClient().cluster()::healthAsync); diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java index 5bc9bac96dc..aa8221f3099 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java @@ -1568,7 +1568,7 @@ public class RequestConvertersTests extends ESTestCase { healthRequest.level(level); expectedParams.put("level", level.name().toLowerCase(Locale.ROOT)); } else { - expectedParams.put("level", "shards"); + expectedParams.put("level", "cluster"); } if (randomBoolean()) { Priority priority = randomFrom(Priority.values()); diff --git a/docs/java-rest/high-level/cluster/health.asciidoc b/docs/java-rest/high-level/cluster/health.asciidoc index 6c0f926f15f..192880849e2 100644 --- a/docs/java-rest/high-level/cluster/health.asciidoc +++ b/docs/java-rest/high-level/cluster/health.asciidoc @@ -67,6 +67,7 @@ include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[health-request-wai include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[health-request-level] -------------------------------------------------- <1> The level of detail of the returned health information. Accepts a `ClusterHealthRequest.Level` value. +Default value is `cluster`. ["source","java",subs="attributes,callouts,macros"] -------------------------------------------------- diff --git a/docs/reference/migration/migrate_7_0/restclient.asciidoc b/docs/reference/migration/migrate_7_0/restclient.asciidoc index 146b0d73386..470996cfeff 100644 --- a/docs/reference/migration/migrate_7_0/restclient.asciidoc +++ b/docs/reference/migration/migrate_7_0/restclient.asciidoc @@ -10,4 +10,11 @@ header, e.g. `client.index(indexRequest)` becomes `client.index(indexRequest, RequestOptions.DEFAULT)`. In case you are specifying headers e.g. `client.index(indexRequest, new Header("name" "value"))` becomes -`client.index(indexRequest, RequestOptions.DEFAULT.toBuilder().addHeader("name", "value").build());` \ No newline at end of file +`client.index(indexRequest, RequestOptions.DEFAULT.toBuilder().addHeader("name", "value").build());` + +==== Cluster Health API default to `cluster` level + +The Cluster Health API used to default to `shards` level to ease migration +from transport client that doesn't support the `level` parameter and always +returns information including indices and shards details. The level default +value has been aligned with the Elasticsearch default level: `cluster`. \ No newline at end of file diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/health/ClusterHealthRequest.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/health/ClusterHealthRequest.java index 59a291888d0..0b9bcbf11b9 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/health/ClusterHealthRequest.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/health/ClusterHealthRequest.java @@ -48,9 +48,9 @@ public class ClusterHealthRequest extends MasterNodeReadRequest skippedSettings = new HashSet<>(); - indexScopedSettings.validate(normalizedSettings.filter(s -> Regex.isSimpleMatchPattern(s) == false /* don't validate wildcards */), - false); //don't validate dependencies here we check it below never allow to change the number of shards + indexScopedSettings.validate( + normalizedSettings.filter(s -> Regex.isSimpleMatchPattern(s) == false), // don't validate wildcards + false, // don't validate dependencies here we check it below never allow to change the number of shards + true); // validate internal index settings for (String key : normalizedSettings.keySet()) { Setting setting = indexScopedSettings.get(key); boolean isWildcard = setting == null && Regex.isSimpleMatchPattern(key); diff --git a/server/src/main/java/org/elasticsearch/common/settings/AbstractScopedSettings.java b/server/src/main/java/org/elasticsearch/common/settings/AbstractScopedSettings.java index e8bb946c8a7..eb4e2946424 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/AbstractScopedSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/AbstractScopedSettings.java @@ -282,6 +282,18 @@ public abstract class AbstractScopedSettings extends AbstractComponent { validate(settings, validateDependencies, false, false); } + /** + * Validates that all settings are registered and valid. + * + * @param settings the settings to validate + * @param validateDependencies true if dependent settings should be validated + * @param validateInternalIndex true if internal index settings should be validated + * @see Setting#getSettingsDependencies(String) + */ + public final void validate(final Settings settings, final boolean validateDependencies, final boolean validateInternalIndex) { + validate(settings, validateDependencies, false, false, validateInternalIndex); + } + /** * Validates that all settings are registered and valid. * @@ -296,6 +308,25 @@ public abstract class AbstractScopedSettings extends AbstractComponent { final boolean validateDependencies, final boolean ignorePrivateSettings, final boolean ignoreArchivedSettings) { + validate(settings, validateDependencies, ignorePrivateSettings, ignoreArchivedSettings, false); + } + + /** + * Validates that all settings are registered and valid. + * + * @param settings the settings + * @param validateDependencies true if dependent settings should be validated + * @param ignorePrivateSettings true if private settings should be ignored during validation + * @param ignoreArchivedSettings true if archived settings should be ignored during validation + * @param validateInternalIndex true if index internal settings should be validated + * @see Setting#getSettingsDependencies(String) + */ + public final void validate( + final Settings settings, + final boolean validateDependencies, + final boolean ignorePrivateSettings, + final boolean ignoreArchivedSettings, + final boolean validateInternalIndex) { final List exceptions = new ArrayList<>(); for (final String key : settings.keySet()) { // settings iterate in deterministic fashion if (isPrivateSetting(key) && ignorePrivateSettings) { @@ -305,7 +336,7 @@ public abstract class AbstractScopedSettings extends AbstractComponent { continue; } try { - validate(key, settings, validateDependencies); + validate(key, settings, validateDependencies, validateInternalIndex); } catch (final RuntimeException ex) { exceptions.add(ex); } @@ -314,9 +345,27 @@ public abstract class AbstractScopedSettings extends AbstractComponent { } /** - * Validates that the setting is valid + * Validates that the settings is valid. + * + * @param key the key of the setting to validate + * @param settings the settings + * @param validateDependencies true if dependent settings should be validated + * @throws IllegalArgumentException if the setting is invalid */ - void validate(String key, Settings settings, boolean validateDependencies) { + void validate(final String key, final Settings settings, final boolean validateDependencies) { + validate(key, settings, validateDependencies, false); + } + + /** + * Validates that the settings is valid. + * + * @param key the key of the setting to validate + * @param settings the settings + * @param validateDependencies true if dependent settings should be validated + * @param validateInternalIndex true if internal index settings should be validated + * @throws IllegalArgumentException if the setting is invalid + */ + void validate(final String key, final Settings settings, final boolean validateDependencies, final boolean validateInternalIndex) { Setting setting = getRaw(key); if (setting == null) { LevensteinDistance ld = new LevensteinDistance(); @@ -356,6 +405,11 @@ public abstract class AbstractScopedSettings extends AbstractComponent { } } } + // the only time that validateInternalIndex should be true is if this call is coming via the update settings API + if (validateInternalIndex && setting.getProperties().contains(Setting.Property.InternalIndex)) { + throw new IllegalArgumentException( + "can not update internal setting [" + setting.getKey() + "]; this setting is managed via a dedicated API"); + } } setting.get(settings); } diff --git a/server/src/main/java/org/elasticsearch/common/settings/Setting.java b/server/src/main/java/org/elasticsearch/common/settings/Setting.java index f45f4bda9c9..7f3906ff5a2 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/Setting.java +++ b/server/src/main/java/org/elasticsearch/common/settings/Setting.java @@ -120,7 +120,13 @@ public class Setting implements ToXContentObject { * Mark this setting as not copyable during an index resize (shrink or split). This property can only be applied to settings that * also have {@link Property#IndexScope}. */ - NotCopyableOnResize + NotCopyableOnResize, + + /** + * Indicates an index-level setting that is managed internally. Such a setting can only be added to an index on index creation but + * can not be updated via the update API. + */ + InternalIndex } private final Key key; @@ -152,14 +158,18 @@ public class Setting implements ToXContentObject { if (propertiesAsSet.contains(Property.Dynamic) && propertiesAsSet.contains(Property.Final)) { throw new IllegalArgumentException("final setting [" + key + "] cannot be dynamic"); } - if (propertiesAsSet.contains(Property.NotCopyableOnResize) && propertiesAsSet.contains(Property.IndexScope) == false) { - throw new IllegalArgumentException( - "non-index-scoped setting [" + key + "] can not have property [" + Property.NotCopyableOnResize + "]"); - } + checkPropertyRequiresIndexScope(propertiesAsSet, Property.NotCopyableOnResize); + checkPropertyRequiresIndexScope(propertiesAsSet, Property.InternalIndex); this.properties = propertiesAsSet; } } + private void checkPropertyRequiresIndexScope(final EnumSet properties, final Property property) { + if (properties.contains(property) && properties.contains(Property.IndexScope) == false) { + throw new IllegalArgumentException("non-index-scoped setting [" + key + "] can not have property [" + property + "]"); + } + } + /** * Creates a new Setting instance * @param key the settings key for this setting. diff --git a/server/src/test/java/org/elasticsearch/common/settings/ScopedSettingsTests.java b/server/src/test/java/org/elasticsearch/common/settings/ScopedSettingsTests.java index f00768651f9..2376d566340 100644 --- a/server/src/test/java/org/elasticsearch/common/settings/ScopedSettingsTests.java +++ b/server/src/test/java/org/elasticsearch/common/settings/ScopedSettingsTests.java @@ -876,4 +876,28 @@ public class ScopedSettingsTests extends ESTestCase { Settings.builder().put(currentSettings), Settings.builder(), "node")); assertThat(exc.getMessage(), containsString("final node setting [some.final.group.foo]")); } + + public void testInternalIndexSettingsFailsValidation() { + final Setting indexInternalSetting = Setting.simpleString("index.internal", Property.InternalIndex, Property.IndexScope); + final IndexScopedSettings indexScopedSettings = + new IndexScopedSettings(Settings.EMPTY, Collections.singleton(indexInternalSetting)); + final IllegalArgumentException e = expectThrows( + IllegalArgumentException.class, + () -> { + final Settings settings = Settings.builder().put("index.internal", "internal").build(); + indexScopedSettings.validate(settings, false, /* validateInternalIndex */ true); + }); + final String message = "can not update internal setting [index.internal]; this setting is managed via a dedicated API"; + assertThat(e, hasToString(containsString(message))); + } + + public void testInternalIndexSettingsSkipValidation() { + final Setting internalIndexSetting = Setting.simpleString("index.internal", Property.InternalIndex, Property.IndexScope); + final IndexScopedSettings indexScopedSettings = + new IndexScopedSettings(Settings.EMPTY, Collections.singleton(internalIndexSetting)); + // nothing should happen, validation should not throw an exception + final Settings settings = Settings.builder().put("index.internal", "internal").build(); + indexScopedSettings.validate(settings, false, /* validateInternalIndex */ false); + } + } diff --git a/server/src/test/java/org/elasticsearch/common/settings/SettingTests.java b/server/src/test/java/org/elasticsearch/common/settings/SettingTests.java index 1ab92526e31..c32037f4452 100644 --- a/server/src/test/java/org/elasticsearch/common/settings/SettingTests.java +++ b/server/src/test/java/org/elasticsearch/common/settings/SettingTests.java @@ -735,6 +735,13 @@ public class SettingTests extends ESTestCase { assertThat(e, hasToString(containsString("non-index-scoped setting [foo.bar] can not have property [NotCopyableOnResize]"))); } + public void testRejectNonIndexScopedIndexInternalSetting() { + final IllegalArgumentException e = expectThrows( + IllegalArgumentException.class, + () -> Setting.simpleString("foo.bar", Property.InternalIndex)); + assertThat(e, hasToString(containsString("non-index-scoped setting [foo.bar] can not have property [InternalIndex]"))); + } + public void testTimeValue() { final TimeValue random = TimeValue.parseTimeValue(randomTimeValue(), "test"); diff --git a/server/src/test/java/org/elasticsearch/indices/settings/UpdateSettingsIT.java b/server/src/test/java/org/elasticsearch/indices/settings/UpdateSettingsIT.java index 51c073c607e..8093e7d38a1 100644 --- a/server/src/test/java/org/elasticsearch/indices/settings/UpdateSettingsIT.java +++ b/server/src/test/java/org/elasticsearch/indices/settings/UpdateSettingsIT.java @@ -19,19 +19,40 @@ package org.elasticsearch.indices.settings; +import org.elasticsearch.action.Action; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.master.MasterNodeRequest; +import org.elasticsearch.action.support.master.TransportMasterNodeAction; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateUpdateTask; +import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Priority; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.IndexModule; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.engine.VersionConflictEngineException; import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.plugins.ActionPlugin; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import java.io.IOException; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -46,6 +67,7 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertBloc import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertThrows; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasToString; import static org.hamcrest.Matchers.nullValue; public class UpdateSettingsIT extends ESIntegTestCase { @@ -79,7 +101,12 @@ public class UpdateSettingsIT extends ESIntegTestCase { @Override protected Collection> nodePlugins() { - return Arrays.asList(DummySettingPlugin.class, FinalSettingPlugin.class); + return Arrays.asList(DummySettingPlugin.class, FinalSettingPlugin.class, InternalIndexSettingsPlugin.class); + } + + @Override + protected Collection> transportClientPlugins() { + return Collections.singletonList(InternalIndexSettingsPlugin.class); } public static class DummySettingPlugin extends Plugin { @@ -124,6 +151,151 @@ public class UpdateSettingsIT extends ESIntegTestCase { } } + public static class InternalIndexSettingsPlugin extends Plugin implements ActionPlugin { + + public static final Setting INDEX_INTERNAL_SETTING = + Setting.simpleString("index.internal", Setting.Property.IndexScope, Setting.Property.InternalIndex); + + @Override + public List> getSettings() { + return Collections.singletonList(INDEX_INTERNAL_SETTING); + } + + public static class UpdateInternalIndexAction + extends Action { + + private static final UpdateInternalIndexAction INSTANCE = new UpdateInternalIndexAction(); + private static final String NAME = "indices:admin/settings/update-internal-index"; + + public UpdateInternalIndexAction() { + super(NAME); + } + + static class Request extends MasterNodeRequest { + + private String index; + private String value; + + Request() { + + } + + Request(final String index, final String value) { + this.index = index; + this.value = value; + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + @Override + public void readFrom(final StreamInput in) throws IOException { + super.readFrom(in); + index = in.readString(); + value = in.readString(); + } + + @Override + public void writeTo(final StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(index); + out.writeString(value); + } + + } + + static class Response extends ActionResponse { + + } + + @Override + public Response newResponse() { + return new Response(); + } + + } + + public static class TransportUpdateInternalIndexAction + extends TransportMasterNodeAction { + + @Inject + public TransportUpdateInternalIndexAction( + final Settings settings, + final TransportService transportService, + final ClusterService clusterService, + final ThreadPool threadPool, + final ActionFilters actionFilters, + final IndexNameExpressionResolver indexNameExpressionResolver) { + super( + settings, + UpdateInternalIndexAction.NAME, + transportService, + clusterService, + threadPool, + actionFilters, + indexNameExpressionResolver, + UpdateInternalIndexAction.Request::new); + } + + @Override + protected String executor() { + return ThreadPool.Names.SAME; + } + + @Override + protected UpdateInternalIndexAction.Response newResponse() { + return new UpdateInternalIndexAction.Response(); + } + + @Override + protected void masterOperation( + final UpdateInternalIndexAction.Request request, + final ClusterState state, + final ActionListener listener) throws Exception { + clusterService.submitStateUpdateTask("update-index-internal", new ClusterStateUpdateTask() { + @Override + public ClusterState execute(final ClusterState currentState) throws Exception { + final MetaData.Builder builder = MetaData.builder(currentState.metaData()); + final IndexMetaData.Builder imdBuilder = IndexMetaData.builder(currentState.metaData().index(request.index)); + final Settings.Builder settingsBuilder = + Settings.builder() + .put(currentState.metaData().index(request.index).getSettings()) + .put("index.internal", request.value); + imdBuilder.settings(settingsBuilder); + builder.put(imdBuilder.build(), true); + return ClusterState.builder(currentState).metaData(builder).build(); + } + + @Override + public void clusterStateProcessed(final String source, final ClusterState oldState, final ClusterState newState) { + listener.onResponse(new UpdateInternalIndexAction.Response()); + } + + @Override + public void onFailure(final String source, final Exception e) { + listener.onFailure(e); + } + + }); + } + + @Override + protected ClusterBlockException checkBlock(UpdateInternalIndexAction.Request request, ClusterState state) { + return null; + } + + } + + @Override + public List> getActions() { + return Collections.singletonList( + new ActionHandler<>(UpdateInternalIndexAction.INSTANCE, TransportUpdateInternalIndexAction.class)); + } + + } + public void testUpdateDependentClusterSettings() { IllegalArgumentException iae = expectThrows(IllegalArgumentException.class, () -> client().admin().cluster().prepareUpdateSettings().setPersistentSettings(Settings.builder() @@ -474,4 +646,35 @@ public class UpdateSettingsIT extends ESIntegTestCase { } } + public void testUpdateInternalIndexSettingViaSettingsAPI() { + final Settings settings = Settings.builder().put("index.internal", "internal").build(); + createIndex("test", settings); + final GetSettingsResponse response = client().admin().indices().prepareGetSettings("test").get(); + assertThat(response.getSetting("test", "index.internal"), equalTo("internal")); + // we can not update the setting via the update settings API + final IllegalArgumentException e = expectThrows(IllegalArgumentException.class, + () -> client().admin() + .indices() + .prepareUpdateSettings("test") + .setSettings(Settings.builder().put("index.internal", "internal-update")) + .get()); + final String message = "can not update internal setting [index.internal]; this setting is managed via a dedicated API"; + assertThat(e, hasToString(containsString(message))); + final GetSettingsResponse responseAfterAttemptedUpdate = client().admin().indices().prepareGetSettings("test").get(); + assertThat(responseAfterAttemptedUpdate.getSetting("test", "index.internal"), equalTo("internal")); + } + + public void testUpdateInternalIndexSettingViaDedicatedAPI() { + final Settings settings = Settings.builder().put("index.internal", "internal").build(); + createIndex("test", settings); + final GetSettingsResponse response = client().admin().indices().prepareGetSettings("test").get(); + assertThat(response.getSetting("test", "index.internal"), equalTo("internal")); + client().execute( + InternalIndexSettingsPlugin.UpdateInternalIndexAction.INSTANCE, + new InternalIndexSettingsPlugin.UpdateInternalIndexAction.Request("test", "internal-update")) + .actionGet(); + final GetSettingsResponse responseAfterUpdate = client().admin().indices().prepareGetSettings("test").get(); + assertThat(responseAfterUpdate.getSetting("test", "index.internal"), equalTo("internal-update")); + } + } diff --git a/test/framework/src/main/java/org/elasticsearch/test/rest/yaml/Features.java b/test/framework/src/main/java/org/elasticsearch/test/rest/yaml/Features.java index bb885d3f181..757fc2218d5 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/rest/yaml/Features.java +++ b/test/framework/src/main/java/org/elasticsearch/test/rest/yaml/Features.java @@ -19,8 +19,6 @@ package org.elasticsearch.test.rest.yaml; -import org.elasticsearch.test.ESIntegTestCase; - import java.util.Arrays; import java.util.List; @@ -56,9 +54,6 @@ public final class Features { */ public static boolean areAllSupported(List features) { for (String feature : features) { - if ("requires_replica".equals(feature) && ESIntegTestCase.cluster().numDataNodes() >= 2) { - continue; - } if (!SUPPORTED.contains(feature)) { return false; } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/job/MetricConfig.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/job/MetricConfig.java index f26c67935ed..67b83646c42 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/job/MetricConfig.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/job/MetricConfig.java @@ -12,7 +12,7 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; -import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.ToXContentFragment; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.index.mapper.NumberFieldMapper; @@ -75,12 +75,11 @@ public class MetricConfig implements Writeable, ToXContentFragment { MAPPER_TYPES = types; } - public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( - NAME, a -> new MetricConfig((String)a[0], (List) a[1])); + public static final ObjectParser PARSER = new ObjectParser<>(NAME, MetricConfig.Builder::new); static { - PARSER.declareString(ConstructingObjectParser.constructorArg(), FIELD); - PARSER.declareStringArray(ConstructingObjectParser.constructorArg(), METRICS); + PARSER.declareString(MetricConfig.Builder::setField, FIELD); + PARSER.declareStringArray(MetricConfig.Builder::setMetrics, METRICS); } MetricConfig(String name, List metrics) { @@ -257,4 +256,4 @@ public class MetricConfig implements Writeable, ToXContentFragment { return new MetricConfig(field, metrics); } } -} \ No newline at end of file +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/job/RollupJobConfig.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/job/RollupJobConfig.java index 3818ebcf447..422ecdd5fd9 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/job/RollupJobConfig.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/job/RollupJobConfig.java @@ -63,7 +63,7 @@ public class RollupJobConfig implements NamedWriteable, ToXContentObject { static { PARSER.declareString(RollupJobConfig.Builder::setId, RollupField.ID); PARSER.declareObject(RollupJobConfig.Builder::setGroupConfig, (p, c) -> GroupConfig.PARSER.apply(p,c).build(), GROUPS); - PARSER.declareObjectArray(RollupJobConfig.Builder::setMetricsConfig, MetricConfig.PARSER, METRICS); + PARSER.declareObjectArray(RollupJobConfig.Builder::setMetricsConfig, (p, c) -> MetricConfig.PARSER.apply(p, c).build(), METRICS); PARSER.declareString((params, val) -> params.setTimeout(TimeValue.parseTimeValue(val, TIMEOUT.getPreferredName())), TIMEOUT); PARSER.declareString(RollupJobConfig.Builder::setIndexPattern, INDEX_PATTERN); diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfigTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfigTests.java index 6aa987fc0e9..d59ef16dfdf 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfigTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfigTests.java @@ -40,7 +40,6 @@ import org.joda.time.DateTimeZone; import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.TimeZone; @@ -193,11 +192,11 @@ public class DatafeedConfigTests extends AbstractSerializingTestCase conf.setIndices(null)); } - public void testCheckValid_GivenEmptyIndices() throws IOException { + public void testCheckValid_GivenEmptyIndices() { DatafeedConfig.Builder conf = new DatafeedConfig.Builder("datafeed1", "job1"); conf.setIndices(Collections.emptyList()); ElasticsearchException e = ESTestCase.expectThrows(ElasticsearchException.class, conf::build); assertEquals(Messages.getMessage(Messages.DATAFEED_CONFIG_INVALID_OPTION_VALUE, "indices", "[]"), e.getMessage()); } - public void testCheckValid_GivenIndicesContainsOnlyNulls() throws IOException { + public void testCheckValid_GivenIndicesContainsOnlyNulls() { List indices = new ArrayList<>(); indices.add(null); indices.add(null); @@ -230,7 +229,7 @@ public class DatafeedConfigTests extends AbstractSerializingTestCase indices = new ArrayList<>(); indices.add(""); indices.add(""); @@ -240,27 +239,27 @@ public class DatafeedConfigTests extends AbstractSerializingTestCase conf.setQueryDelay(TimeValue.timeValueMillis(-10))); assertEquals("query_delay cannot be less than 0. Value = -10", e.getMessage()); } - public void testCheckValid_GivenZeroFrequency() throws IOException { + public void testCheckValid_GivenZeroFrequency() { DatafeedConfig.Builder conf = new DatafeedConfig.Builder("datafeed1", "job1"); IllegalArgumentException e = ESTestCase.expectThrows(IllegalArgumentException.class, () -> conf.setFrequency(TimeValue.ZERO)); assertEquals("frequency cannot be less or equal than 0. Value = 0s", e.getMessage()); } - public void testCheckValid_GivenNegativeFrequency() throws IOException { + public void testCheckValid_GivenNegativeFrequency() { DatafeedConfig.Builder conf = new DatafeedConfig.Builder("datafeed1", "job1"); IllegalArgumentException e = ESTestCase.expectThrows(IllegalArgumentException.class, () -> conf.setFrequency(TimeValue.timeValueMinutes(-1))); assertEquals("frequency cannot be less or equal than 0. Value = -1", e.getMessage()); } - public void testCheckValid_GivenNegativeScrollSize() throws IOException { + public void testCheckValid_GivenNegativeScrollSize() { DatafeedConfig.Builder conf = new DatafeedConfig.Builder("datafeed1", "job1"); ElasticsearchException e = ESTestCase.expectThrows(ElasticsearchException.class, () -> conf.setScrollSize(-1000)); assertEquals(Messages.getMessage(Messages.DATAFEED_CONFIG_INVALID_OPTION_VALUE, "scroll_size", -1000L), e.getMessage()); @@ -414,7 +413,7 @@ public class DatafeedConfigTests extends AbstractSerializingTestCase { @Override protected MetricConfig doParseInstance(XContentParser parser) throws IOException { - return MetricConfig.PARSER.apply(parser, null); + return MetricConfig.PARSER.apply(parser, null).build(); } @Override @@ -36,7 +36,7 @@ public class MetricsConfigSerializingTests extends AbstractSerializingTestCase> responseMap = new HashMap<>(); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java index bed83ed82c1..3d261864ab4 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java @@ -43,10 +43,12 @@ import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.persistent.PersistentTasksExecutor; import org.elasticsearch.persistent.PersistentTasksService; import org.elasticsearch.xpack.ml.MachineLearning; +import org.elasticsearch.xpack.ml.datafeed.MlRemoteLicenseChecker; import org.elasticsearch.xpack.ml.datafeed.DatafeedManager; import org.elasticsearch.xpack.ml.datafeed.DatafeedNodeSelector; import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory; +import java.util.List; import java.util.Map; import java.util.function.Predicate; @@ -111,23 +113,25 @@ public class TransportStartDatafeedAction extends TransportMasterNodeAction listener) { StartDatafeedAction.DatafeedParams params = request.getParams(); if (licenseState.isMachineLearningAllowed()) { - ActionListener> finalListener = - new ActionListener>() { - @Override - public void onResponse(PersistentTasksCustomMetaData.PersistentTask persistentTask) { - waitForDatafeedStarted(persistentTask.getId(), params, listener); - } - @Override - public void onFailure(Exception e) { - if (e instanceof ResourceAlreadyExistsException) { - logger.debug("datafeed already started", e); - e = new ElasticsearchStatusException("cannot start datafeed [" + params.getDatafeedId() + - "] because it has already been started", RestStatus.CONFLICT); - } - listener.onFailure(e); - } - }; + ActionListener> waitForTaskListener = + new ActionListener>() { + @Override + public void onResponse(PersistentTasksCustomMetaData.PersistentTask + persistentTask) { + waitForDatafeedStarted(persistentTask.getId(), params, listener); + } + + @Override + public void onFailure(Exception e) { + if (e instanceof ResourceAlreadyExistsException) { + logger.debug("datafeed already started", e); + e = new ElasticsearchStatusException("cannot start datafeed [" + params.getDatafeedId() + + "] because it has already been started", RestStatus.CONFLICT); + } + listener.onFailure(e); + } + }; // Verify data extractor factory can be created, then start persistent task MlMetadata mlMetadata = MlMetadata.getMlMetadata(state); @@ -135,16 +139,39 @@ public class TransportStartDatafeedAction extends TransportMasterNodeAction - persistentTasksService.sendStartRequest(MLMetadataField.datafeedTaskId(params.getDatafeedId()), - StartDatafeedAction.TASK_NAME, params, finalListener) - , listener::onFailure)); + + if (MlRemoteLicenseChecker.containsRemoteIndex(datafeed.getIndices())) { + MlRemoteLicenseChecker remoteLicenseChecker = new MlRemoteLicenseChecker(client); + remoteLicenseChecker.checkRemoteClusterLicenses(MlRemoteLicenseChecker.remoteClusterNames(datafeed.getIndices()), + ActionListener.wrap( + response -> { + if (response.isViolated()) { + listener.onFailure(createUnlicensedError(datafeed.getId(), response)); + } else { + createDataExtractor(job, datafeed, params, waitForTaskListener); + } + }, + e -> listener.onFailure(createUnknownLicenseError(datafeed.getId(), + MlRemoteLicenseChecker.remoteIndices(datafeed.getIndices()), e)) + )); + } else { + createDataExtractor(job, datafeed, params, waitForTaskListener); + } } else { listener.onFailure(LicenseUtils.newComplianceException(XPackField.MACHINE_LEARNING)); } } + private void createDataExtractor(Job job, DatafeedConfig datafeed, StartDatafeedAction.DatafeedParams params, + ActionListener> + listener) { + DataExtractorFactory.create(client, datafeed, job, ActionListener.wrap( + dataExtractorFactory -> + persistentTasksService.sendStartRequest(MLMetadataField.datafeedTaskId(params.getDatafeedId()), + StartDatafeedAction.TASK_NAME, params, listener) + , listener::onFailure)); + } + @Override protected ClusterBlockException checkBlock(StartDatafeedAction.Request request, ClusterState state) { // We only delegate here to PersistentTasksService, but if there is a metadata writeblock, @@ -158,28 +185,29 @@ public class TransportStartDatafeedAction extends TransportMasterNodeAction() { - @Override - public void onResponse(PersistentTasksCustomMetaData.PersistentTask persistentTask) { - if (predicate.exception != null) { - // We want to return to the caller without leaving an unassigned persistent task, to match - // what would have happened if the error had been detected in the "fast fail" validation - cancelDatafeedStart(persistentTask, predicate.exception, listener); - } else { - listener.onResponse(new StartDatafeedAction.Response(true)); - } - } + @Override + public void onResponse(PersistentTasksCustomMetaData.PersistentTask + persistentTask) { + if (predicate.exception != null) { + // We want to return to the caller without leaving an unassigned persistent task, to match + // what would have happened if the error had been detected in the "fast fail" validation + cancelDatafeedStart(persistentTask, predicate.exception, listener); + } else { + listener.onResponse(new StartDatafeedAction.Response(true)); + } + } - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } - @Override - public void onTimeout(TimeValue timeout) { - listener.onFailure(new ElasticsearchException("Starting datafeed [" - + params.getDatafeedId() + "] timed out after [" + timeout + "]")); - } - }); + @Override + public void onTimeout(TimeValue timeout) { + listener.onFailure(new ElasticsearchException("Starting datafeed [" + + params.getDatafeedId() + "] timed out after [" + timeout + "]")); + } + }); } private void cancelDatafeedStart(PersistentTasksCustomMetaData.PersistentTask persistentTask, @@ -203,6 +231,25 @@ public class TransportStartDatafeedAction extends TransportMasterNodeAction remoteIndices, + Exception cause) { + String message = "Cannot start datafeed [" + datafeedId + "] as it is configured to use" + + " indices on a remote cluster " + remoteIndices + + " but the license type could not be verified"; + + return new ElasticsearchStatusException(message, RestStatus.BAD_REQUEST, new Exception(cause.getMessage())); + } + public static class StartDatafeedPersistentTasksExecutor extends PersistentTasksExecutor { private final DatafeedManager datafeedManager; private final IndexNameExpressionResolver resolver; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelector.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelector.java index 37f9715d094..0eb57ab79be 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelector.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelector.java @@ -91,7 +91,7 @@ public class DatafeedNodeSelector { List indices = datafeed.getIndices(); for (String index : indices) { - if (isRemoteIndex(index)) { + if (MlRemoteLicenseChecker.isRemoteIndex(index)) { // We cannot verify remote indices continue; } @@ -122,10 +122,6 @@ public class DatafeedNodeSelector { return null; } - private boolean isRemoteIndex(String index) { - return index.indexOf(':') != -1; - } - private static class AssignmentFailure { private final String reason; private final boolean isCriticalForTaskCreation; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/MlRemoteLicenseChecker.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/MlRemoteLicenseChecker.java new file mode 100644 index 00000000000..b55713f6d0a --- /dev/null +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/MlRemoteLicenseChecker.java @@ -0,0 +1,192 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ml.datafeed; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.license.License; +import org.elasticsearch.license.XPackInfoResponse; +import org.elasticsearch.transport.ActionNotFoundTransportException; +import org.elasticsearch.transport.RemoteClusterAware; +import org.elasticsearch.xpack.core.action.XPackInfoAction; +import org.elasticsearch.xpack.core.action.XPackInfoRequest; + +import java.util.EnumSet; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; + +/** + * ML datafeeds can use cross cluster search to access data in a remote cluster. + * The remote cluster should be licenced for ML this class performs that check + * using the _xpack (info) endpoint. + */ +public class MlRemoteLicenseChecker { + + private final Client client; + + public static class RemoteClusterLicenseInfo { + private final String clusterName; + private final XPackInfoResponse.LicenseInfo licenseInfo; + + RemoteClusterLicenseInfo(String clusterName, XPackInfoResponse.LicenseInfo licenseInfo) { + this.clusterName = clusterName; + this.licenseInfo = licenseInfo; + } + + public String getClusterName() { + return clusterName; + } + + public XPackInfoResponse.LicenseInfo getLicenseInfo() { + return licenseInfo; + } + } + + public class LicenseViolation { + private final RemoteClusterLicenseInfo licenseInfo; + + private LicenseViolation(@Nullable RemoteClusterLicenseInfo licenseInfo) { + this.licenseInfo = licenseInfo; + } + + public boolean isViolated() { + return licenseInfo != null; + } + + public RemoteClusterLicenseInfo get() { + return licenseInfo; + } + } + + public MlRemoteLicenseChecker(Client client) { + this.client = client; + } + + /** + * Check each cluster is licensed for ML. + * This function evaluates lazily and will terminate when the first cluster + * that is not licensed is found or an error occurs. + * + * @param clusterNames List of remote cluster names + * @param listener Response listener + */ + public void checkRemoteClusterLicenses(List clusterNames, ActionListener listener) { + final Iterator itr = clusterNames.iterator(); + if (itr.hasNext() == false) { + listener.onResponse(new LicenseViolation(null)); + return; + } + + final AtomicReference clusterName = new AtomicReference<>(itr.next()); + + ActionListener infoListener = new ActionListener() { + @Override + public void onResponse(XPackInfoResponse xPackInfoResponse) { + if (licenseSupportsML(xPackInfoResponse.getLicenseInfo()) == false) { + listener.onResponse(new LicenseViolation( + new RemoteClusterLicenseInfo(clusterName.get(), xPackInfoResponse.getLicenseInfo()))); + return; + } + + if (itr.hasNext()) { + clusterName.set(itr.next()); + remoteClusterLicense(clusterName.get(), this); + } else { + listener.onResponse(new LicenseViolation(null)); + } + } + + @Override + public void onFailure(Exception e) { + String message = "Could not determine the X-Pack licence type for cluster [" + clusterName.get() + "]"; + if (e instanceof ActionNotFoundTransportException) { + // This is likely to be because x-pack is not installed in the target cluster + message += ". Is X-Pack installed on the target cluster?"; + } + listener.onFailure(new ElasticsearchException(message, e)); + } + }; + + remoteClusterLicense(clusterName.get(), infoListener); + } + + private void remoteClusterLicense(String clusterName, ActionListener listener) { + Client remoteClusterClient = client.getRemoteClusterClient(clusterName); + ThreadContext threadContext = remoteClusterClient.threadPool().getThreadContext(); + try (ThreadContext.StoredContext ignore = threadContext.stashContext()) { + // we stash any context here since this is an internal execution and should not leak any + // existing context information. + threadContext.markAsSystemContext(); + + XPackInfoRequest request = new XPackInfoRequest(); + request.setCategories(EnumSet.of(XPackInfoRequest.Category.LICENSE)); + remoteClusterClient.execute(XPackInfoAction.INSTANCE, request, listener); + } + } + + static boolean licenseSupportsML(XPackInfoResponse.LicenseInfo licenseInfo) { + License.OperationMode mode = License.OperationMode.resolve(licenseInfo.getMode()); + return licenseInfo.getStatus() == License.Status.ACTIVE && + (mode == License.OperationMode.PLATINUM || mode == License.OperationMode.TRIAL); + } + + public static boolean isRemoteIndex(String index) { + return index.indexOf(RemoteClusterAware.REMOTE_CLUSTER_INDEX_SEPARATOR) != -1; + } + + public static boolean containsRemoteIndex(List indices) { + return indices.stream().anyMatch(MlRemoteLicenseChecker::isRemoteIndex); + } + + /** + * Get any remote indices used in cross cluster search. + * Remote indices are of the form {@code cluster_name:index_name} + * @return List of remote cluster indices + */ + public static List remoteIndices(List indices) { + return indices.stream().filter(MlRemoteLicenseChecker::isRemoteIndex).collect(Collectors.toList()); + } + + /** + * Extract the list of remote cluster names from the list of indices. + * @param indices List of indices. Remote cluster indices are prefixed + * with {@code cluster-name:} + * @return Every cluster name found in {@code indices} + */ + public static List remoteClusterNames(List indices) { + return indices.stream() + .filter(MlRemoteLicenseChecker::isRemoteIndex) + .map(index -> index.substring(0, index.indexOf(RemoteClusterAware.REMOTE_CLUSTER_INDEX_SEPARATOR))) + .distinct() + .collect(Collectors.toList()); + } + + public static String buildErrorMessage(RemoteClusterLicenseInfo clusterLicenseInfo) { + StringBuilder error = new StringBuilder(); + if (clusterLicenseInfo.licenseInfo.getStatus() != License.Status.ACTIVE) { + error.append("The license on cluster [").append(clusterLicenseInfo.clusterName) + .append("] is not active. "); + } else { + License.OperationMode mode = License.OperationMode.resolve(clusterLicenseInfo.licenseInfo.getMode()); + if (mode != License.OperationMode.PLATINUM && mode != License.OperationMode.TRIAL) { + error.append("The license mode [").append(mode) + .append("] on cluster [") + .append(clusterLicenseInfo.clusterName) + .append("] does not enable Machine Learning. "); + } + } + + error.append(Strings.toString(clusterLicenseInfo.licenseInfo)); + return error.toString(); + } +} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcess.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcess.java index 049880b1ac2..21be815d561 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcess.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcess.java @@ -117,7 +117,7 @@ public interface AutodetectProcess extends Closeable { /** * Ask the job to start persisting model state in the background - * @throws IOException + * @throws IOException If writing the request fails */ void persistJob() throws IOException; diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/MlRemoteLicenseCheckerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/MlRemoteLicenseCheckerTests.java new file mode 100644 index 00000000000..47d4d30a7c6 --- /dev/null +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/MlRemoteLicenseCheckerTests.java @@ -0,0 +1,200 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ml.datafeed; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.license.License; +import org.elasticsearch.license.XPackInfoResponse; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.core.action.XPackInfoAction; +import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.is; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.same; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class MlRemoteLicenseCheckerTests extends ESTestCase { + + public void testIsRemoteIndex() { + List indices = Arrays.asList("local-index1", "local-index2"); + assertFalse(MlRemoteLicenseChecker.containsRemoteIndex(indices)); + indices = Arrays.asList("local-index1", "remote-cluster:remote-index2"); + assertTrue(MlRemoteLicenseChecker.containsRemoteIndex(indices)); + } + + public void testRemoteIndices() { + List indices = Collections.singletonList("local-index"); + assertThat(MlRemoteLicenseChecker.remoteIndices(indices), is(empty())); + indices = Arrays.asList("local-index", "remote-cluster:index1", "local-index2", "remote-cluster2:index1"); + assertThat(MlRemoteLicenseChecker.remoteIndices(indices), containsInAnyOrder("remote-cluster:index1", "remote-cluster2:index1")); + } + + public void testRemoteClusterNames() { + List indices = Arrays.asList("local-index1", "local-index2"); + assertThat(MlRemoteLicenseChecker.remoteClusterNames(indices), empty()); + indices = Arrays.asList("local-index1", "remote-cluster1:remote-index2"); + assertThat(MlRemoteLicenseChecker.remoteClusterNames(indices), contains("remote-cluster1")); + indices = Arrays.asList("remote-cluster1:index2", "index1", "remote-cluster2:index1"); + assertThat(MlRemoteLicenseChecker.remoteClusterNames(indices), contains("remote-cluster1", "remote-cluster2")); + indices = Arrays.asList("remote-cluster1:index2", "index1", "remote-cluster2:index1", "remote-cluster2:index2"); + assertThat(MlRemoteLicenseChecker.remoteClusterNames(indices), contains("remote-cluster1", "remote-cluster2")); + } + + public void testLicenseSupportsML() { + XPackInfoResponse.LicenseInfo licenseInfo = new XPackInfoResponse.LicenseInfo("uid", "trial", "trial", + License.Status.ACTIVE, randomNonNegativeLong()); + assertTrue(MlRemoteLicenseChecker.licenseSupportsML(licenseInfo)); + + licenseInfo = new XPackInfoResponse.LicenseInfo("uid", "trial", "trial", License.Status.EXPIRED, randomNonNegativeLong()); + assertFalse(MlRemoteLicenseChecker.licenseSupportsML(licenseInfo)); + + licenseInfo = new XPackInfoResponse.LicenseInfo("uid", "GOLD", "GOLD", License.Status.ACTIVE, randomNonNegativeLong()); + assertFalse(MlRemoteLicenseChecker.licenseSupportsML(licenseInfo)); + + licenseInfo = new XPackInfoResponse.LicenseInfo("uid", "PLATINUM", "PLATINUM", License.Status.ACTIVE, randomNonNegativeLong()); + assertTrue(MlRemoteLicenseChecker.licenseSupportsML(licenseInfo)); + } + + public void testCheckRemoteClusterLicenses_givenValidLicenses() { + final AtomicInteger index = new AtomicInteger(0); + final List responses = new ArrayList<>(); + + Client client = createMockClient(); + doAnswer(invocationMock -> { + @SuppressWarnings("raw_types") + ActionListener listener = (ActionListener) invocationMock.getArguments()[2]; + listener.onResponse(responses.get(index.getAndIncrement())); + return null; + }).when(client).execute(same(XPackInfoAction.INSTANCE), any(), any()); + + + List remoteClusterNames = Arrays.asList("valid1", "valid2", "valid3"); + responses.add(new XPackInfoResponse(null, createPlatinumLicenseResponse(), null)); + responses.add(new XPackInfoResponse(null, createPlatinumLicenseResponse(), null)); + responses.add(new XPackInfoResponse(null, createPlatinumLicenseResponse(), null)); + + MlRemoteLicenseChecker licenseChecker = new MlRemoteLicenseChecker(client); + AtomicReference licCheckResponse = new AtomicReference<>(); + + licenseChecker.checkRemoteClusterLicenses(remoteClusterNames, + new ActionListener() { + @Override + public void onResponse(MlRemoteLicenseChecker.LicenseViolation response) { + licCheckResponse.set(response); + } + + @Override + public void onFailure(Exception e) { + fail(e.getMessage()); + } + }); + + verify(client, times(3)).execute(same(XPackInfoAction.INSTANCE), any(), any()); + assertNotNull(licCheckResponse.get()); + assertFalse(licCheckResponse.get().isViolated()); + assertNull(licCheckResponse.get().get()); + } + + public void testCheckRemoteClusterLicenses_givenInvalidLicense() { + final AtomicInteger index = new AtomicInteger(0); + List remoteClusterNames = Arrays.asList("good", "cluster-with-basic-license", "good2"); + final List responses = new ArrayList<>(); + responses.add(new XPackInfoResponse(null, createPlatinumLicenseResponse(), null)); + responses.add(new XPackInfoResponse(null, createBasicLicenseResponse(), null)); + responses.add(new XPackInfoResponse(null, createPlatinumLicenseResponse(), null)); + + Client client = createMockClient(); + doAnswer(invocationMock -> { + @SuppressWarnings("raw_types") + ActionListener listener = (ActionListener) invocationMock.getArguments()[2]; + listener.onResponse(responses.get(index.getAndIncrement())); + return null; + }).when(client).execute(same(XPackInfoAction.INSTANCE), any(), any()); + + MlRemoteLicenseChecker licenseChecker = new MlRemoteLicenseChecker(client); + AtomicReference licCheckResponse = new AtomicReference<>(); + + licenseChecker.checkRemoteClusterLicenses(remoteClusterNames, + new ActionListener() { + @Override + public void onResponse(MlRemoteLicenseChecker.LicenseViolation response) { + licCheckResponse.set(response); + } + + @Override + public void onFailure(Exception e) { + fail(e.getMessage()); + } + }); + + verify(client, times(2)).execute(same(XPackInfoAction.INSTANCE), any(), any()); + assertNotNull(licCheckResponse.get()); + assertTrue(licCheckResponse.get().isViolated()); + assertEquals("cluster-with-basic-license", licCheckResponse.get().get().getClusterName()); + assertEquals("BASIC", licCheckResponse.get().get().getLicenseInfo().getType()); + } + + public void testBuildErrorMessage() { + XPackInfoResponse.LicenseInfo platinumLicence = createPlatinumLicenseResponse(); + MlRemoteLicenseChecker.RemoteClusterLicenseInfo info = + new MlRemoteLicenseChecker.RemoteClusterLicenseInfo("platinum-cluster", platinumLicence); + assertEquals(Strings.toString(platinumLicence), MlRemoteLicenseChecker.buildErrorMessage(info)); + + XPackInfoResponse.LicenseInfo basicLicense = createBasicLicenseResponse(); + info = new MlRemoteLicenseChecker.RemoteClusterLicenseInfo("basic-cluster", basicLicense); + String expected = "The license mode [BASIC] on cluster [basic-cluster] does not enable Machine Learning. " + + Strings.toString(basicLicense); + assertEquals(expected, MlRemoteLicenseChecker.buildErrorMessage(info)); + + XPackInfoResponse.LicenseInfo expiredLicense = createExpiredLicenseResponse(); + info = new MlRemoteLicenseChecker.RemoteClusterLicenseInfo("expired-cluster", expiredLicense); + expected = "The license on cluster [expired-cluster] is not active. " + Strings.toString(expiredLicense); + assertEquals(expected, MlRemoteLicenseChecker.buildErrorMessage(info)); + } + + private Client createMockClient() { + Client client = mock(Client.class); + ThreadPool threadPool = mock(ThreadPool.class); + when(client.threadPool()).thenReturn(threadPool); + when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY)); + when(client.getRemoteClusterClient(anyString())).thenReturn(client); + return client; + } + + private XPackInfoResponse.LicenseInfo createPlatinumLicenseResponse() { + return new XPackInfoResponse.LicenseInfo("uid", "PLATINUM", "PLATINUM", License.Status.ACTIVE, randomNonNegativeLong()); + } + + private XPackInfoResponse.LicenseInfo createBasicLicenseResponse() { + return new XPackInfoResponse.LicenseInfo("uid", "BASIC", "BASIC", License.Status.ACTIVE, randomNonNegativeLong()); + } + + private XPackInfoResponse.LicenseInfo createExpiredLicenseResponse() { + return new XPackInfoResponse.LicenseInfo("uid", "PLATINUM", "PLATINUM", License.Status.EXPIRED, randomNonNegativeLong()); + } +} diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/rollup/put_job.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/rollup/put_job.yml index 717be0d6b25..98ef9b32e3d 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/rollup/put_job.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/rollup/put_job.yml @@ -188,3 +188,33 @@ setup: ] } +--- +"Unknown Metric": + + - do: + catch: /Unsupported metric \[does_not_exist\]/ + headers: + Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser + xpack.rollup.put_job: + id: foo + body: > + { + "index_pattern": "foo", + "rollup_index": "foo_rollup", + "cron": "*/30 * * * * ?", + "page_size" :10, + "groups" : { + "date_histogram": { + "field": "the_field", + "interval": "1h" + } + }, + "metrics": [ + { + "field": "value_field", + "metrics": ["min", "max", "sum", "does_not_exist"] + } + ] + } + +