From 24163d10b7639ea8f66ec5fbf52a80ab90b0c356 Mon Sep 17 00:00:00 2001 From: Luca Cavanna Date: Wed, 13 Jun 2018 15:06:13 +0200 Subject: [PATCH 1/5] REST hl client: cluster health to default to cluster level (#31268) With #29331 we added support for the cluster health API to the high-level REST client. The transport client does not support the level parameter, and it always returns all the info needed for shards level rendering. We have maintained that behaviour when adding support for cluster health to the high-level REST client, to ease migration, but the correct thing to do is to default the high-level REST client to `cluster` level, which is the same default as when going through the Elasticsearch REST layer. --- .../java/org/elasticsearch/client/ClusterClientIT.java | 2 +- .../org/elasticsearch/client/RequestConvertersTests.java | 2 +- docs/java-rest/high-level/cluster/health.asciidoc | 1 + docs/reference/migration/migrate_7_0/restclient.asciidoc | 9 ++++++++- .../admin/cluster/health/ClusterHealthRequest.java | 8 +++----- 5 files changed, 14 insertions(+), 8 deletions(-) diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/ClusterClientIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/ClusterClientIT.java index 2ae6f9dc186..2a870ec65ee 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/ClusterClientIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/ClusterClientIT.java @@ -129,7 +129,6 @@ public class ClusterClientIT extends ESRestHighLevelClientTestCase { createIndex("index2", Settings.EMPTY); ClusterHealthRequest request = new ClusterHealthRequest(); request.timeout("5s"); - request.level(ClusterHealthRequest.Level.CLUSTER); ClusterHealthResponse response = execute(request, highLevelClient().cluster()::health, highLevelClient().cluster()::healthAsync); assertYellowShards(response); @@ -170,6 +169,7 @@ public class ClusterClientIT extends ESRestHighLevelClientTestCase { createIndex("index", Settings.EMPTY); createIndex("index2", Settings.EMPTY); ClusterHealthRequest request = new ClusterHealthRequest("index"); + request.level(ClusterHealthRequest.Level.SHARDS); request.timeout("5s"); ClusterHealthResponse response = execute(request, highLevelClient().cluster()::health, highLevelClient().cluster()::healthAsync); diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java index 5bc9bac96dc..aa8221f3099 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java @@ -1568,7 +1568,7 @@ public class RequestConvertersTests extends ESTestCase { healthRequest.level(level); expectedParams.put("level", level.name().toLowerCase(Locale.ROOT)); } else { - expectedParams.put("level", "shards"); + expectedParams.put("level", "cluster"); } if (randomBoolean()) { Priority priority = randomFrom(Priority.values()); diff --git a/docs/java-rest/high-level/cluster/health.asciidoc b/docs/java-rest/high-level/cluster/health.asciidoc index 6c0f926f15f..192880849e2 100644 --- a/docs/java-rest/high-level/cluster/health.asciidoc +++ b/docs/java-rest/high-level/cluster/health.asciidoc @@ -67,6 +67,7 @@ include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[health-request-wai include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[health-request-level] -------------------------------------------------- <1> The level of detail of the returned health information. Accepts a `ClusterHealthRequest.Level` value. +Default value is `cluster`. ["source","java",subs="attributes,callouts,macros"] -------------------------------------------------- diff --git a/docs/reference/migration/migrate_7_0/restclient.asciidoc b/docs/reference/migration/migrate_7_0/restclient.asciidoc index 146b0d73386..470996cfeff 100644 --- a/docs/reference/migration/migrate_7_0/restclient.asciidoc +++ b/docs/reference/migration/migrate_7_0/restclient.asciidoc @@ -10,4 +10,11 @@ header, e.g. `client.index(indexRequest)` becomes `client.index(indexRequest, RequestOptions.DEFAULT)`. In case you are specifying headers e.g. `client.index(indexRequest, new Header("name" "value"))` becomes -`client.index(indexRequest, RequestOptions.DEFAULT.toBuilder().addHeader("name", "value").build());` \ No newline at end of file +`client.index(indexRequest, RequestOptions.DEFAULT.toBuilder().addHeader("name", "value").build());` + +==== Cluster Health API default to `cluster` level + +The Cluster Health API used to default to `shards` level to ease migration +from transport client that doesn't support the `level` parameter and always +returns information including indices and shards details. The level default +value has been aligned with the Elasticsearch default level: `cluster`. \ No newline at end of file diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/health/ClusterHealthRequest.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/health/ClusterHealthRequest.java index 59a291888d0..0b9bcbf11b9 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/health/ClusterHealthRequest.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/health/ClusterHealthRequest.java @@ -48,9 +48,9 @@ public class ClusterHealthRequest extends MasterNodeReadRequest Date: Wed, 13 Jun 2018 09:33:06 -0400 Subject: [PATCH 2/5] Test: Remove broken yml test feature (#31255) The `requires_replica` yaml test feature hasn't worked for years. This is what happens if you try to use it: ``` > Throwable #1: java.lang.NullPointerException > at __randomizedtesting.SeedInfo.seed([E6602FB306244B12:6E341069A8D826EA]:0) > at org.elasticsearch.test.rest.yaml.Features.areAllSupported(Features.java:58) > at org.elasticsearch.test.rest.yaml.section.SkipSection.skip(SkipSection.java:144) > at org.elasticsearch.test.rest.yaml.ESClientYamlSuiteTestCase.test(ESClientYamlSuiteTestCase.java:321) ``` None of our tests use it. --- .../main/java/org/elasticsearch/test/rest/yaml/Features.java | 5 ----- 1 file changed, 5 deletions(-) diff --git a/test/framework/src/main/java/org/elasticsearch/test/rest/yaml/Features.java b/test/framework/src/main/java/org/elasticsearch/test/rest/yaml/Features.java index bb885d3f181..757fc2218d5 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/rest/yaml/Features.java +++ b/test/framework/src/main/java/org/elasticsearch/test/rest/yaml/Features.java @@ -19,8 +19,6 @@ package org.elasticsearch.test.rest.yaml; -import org.elasticsearch.test.ESIntegTestCase; - import java.util.Arrays; import java.util.List; @@ -56,9 +54,6 @@ public final class Features { */ public static boolean areAllSupported(List features) { for (String feature : features) { - if ("requires_replica".equals(feature) && ESIntegTestCase.cluster().numDataNodes() >= 2) { - continue; - } if (!SUPPORTED.contains(feature)) { return false; } From 7199d5f0e680182bdabba80c9976c6b74f00bedd Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Wed, 13 Jun 2018 10:16:46 -0400 Subject: [PATCH 3/5] Add notion of internal index settings (#31286) We have some use cases for an index setting to only be manageable by dedicated APIs rather than be updateable via the update settings API. This commit adds the notion of an internal index setting. Such settings can be set on create index requests, they can not be changed via the update settings API, yet they can be changed by action on behalf of or triggered by the user via dedicated APIs. --- .../MetaDataUpdateSettingsService.java | 6 +- .../settings/AbstractScopedSettings.java | 60 ++++- .../common/settings/Setting.java | 20 +- .../common/settings/ScopedSettingsTests.java | 24 ++ .../common/settings/SettingTests.java | 7 + .../indices/settings/UpdateSettingsIT.java | 205 +++++++++++++++++- 6 files changed, 311 insertions(+), 11 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataUpdateSettingsService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataUpdateSettingsService.java index ce5ad12a53d..38766c08e08 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataUpdateSettingsService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataUpdateSettingsService.java @@ -82,8 +82,10 @@ public class MetaDataUpdateSettingsService extends AbstractComponent { Settings.Builder settingsForOpenIndices = Settings.builder(); final Set skippedSettings = new HashSet<>(); - indexScopedSettings.validate(normalizedSettings.filter(s -> Regex.isSimpleMatchPattern(s) == false /* don't validate wildcards */), - false); //don't validate dependencies here we check it below never allow to change the number of shards + indexScopedSettings.validate( + normalizedSettings.filter(s -> Regex.isSimpleMatchPattern(s) == false), // don't validate wildcards + false, // don't validate dependencies here we check it below never allow to change the number of shards + true); // validate internal index settings for (String key : normalizedSettings.keySet()) { Setting setting = indexScopedSettings.get(key); boolean isWildcard = setting == null && Regex.isSimpleMatchPattern(key); diff --git a/server/src/main/java/org/elasticsearch/common/settings/AbstractScopedSettings.java b/server/src/main/java/org/elasticsearch/common/settings/AbstractScopedSettings.java index e8bb946c8a7..eb4e2946424 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/AbstractScopedSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/AbstractScopedSettings.java @@ -282,6 +282,18 @@ public abstract class AbstractScopedSettings extends AbstractComponent { validate(settings, validateDependencies, false, false); } + /** + * Validates that all settings are registered and valid. + * + * @param settings the settings to validate + * @param validateDependencies true if dependent settings should be validated + * @param validateInternalIndex true if internal index settings should be validated + * @see Setting#getSettingsDependencies(String) + */ + public final void validate(final Settings settings, final boolean validateDependencies, final boolean validateInternalIndex) { + validate(settings, validateDependencies, false, false, validateInternalIndex); + } + /** * Validates that all settings are registered and valid. * @@ -296,6 +308,25 @@ public abstract class AbstractScopedSettings extends AbstractComponent { final boolean validateDependencies, final boolean ignorePrivateSettings, final boolean ignoreArchivedSettings) { + validate(settings, validateDependencies, ignorePrivateSettings, ignoreArchivedSettings, false); + } + + /** + * Validates that all settings are registered and valid. + * + * @param settings the settings + * @param validateDependencies true if dependent settings should be validated + * @param ignorePrivateSettings true if private settings should be ignored during validation + * @param ignoreArchivedSettings true if archived settings should be ignored during validation + * @param validateInternalIndex true if index internal settings should be validated + * @see Setting#getSettingsDependencies(String) + */ + public final void validate( + final Settings settings, + final boolean validateDependencies, + final boolean ignorePrivateSettings, + final boolean ignoreArchivedSettings, + final boolean validateInternalIndex) { final List exceptions = new ArrayList<>(); for (final String key : settings.keySet()) { // settings iterate in deterministic fashion if (isPrivateSetting(key) && ignorePrivateSettings) { @@ -305,7 +336,7 @@ public abstract class AbstractScopedSettings extends AbstractComponent { continue; } try { - validate(key, settings, validateDependencies); + validate(key, settings, validateDependencies, validateInternalIndex); } catch (final RuntimeException ex) { exceptions.add(ex); } @@ -314,9 +345,27 @@ public abstract class AbstractScopedSettings extends AbstractComponent { } /** - * Validates that the setting is valid + * Validates that the settings is valid. + * + * @param key the key of the setting to validate + * @param settings the settings + * @param validateDependencies true if dependent settings should be validated + * @throws IllegalArgumentException if the setting is invalid */ - void validate(String key, Settings settings, boolean validateDependencies) { + void validate(final String key, final Settings settings, final boolean validateDependencies) { + validate(key, settings, validateDependencies, false); + } + + /** + * Validates that the settings is valid. + * + * @param key the key of the setting to validate + * @param settings the settings + * @param validateDependencies true if dependent settings should be validated + * @param validateInternalIndex true if internal index settings should be validated + * @throws IllegalArgumentException if the setting is invalid + */ + void validate(final String key, final Settings settings, final boolean validateDependencies, final boolean validateInternalIndex) { Setting setting = getRaw(key); if (setting == null) { LevensteinDistance ld = new LevensteinDistance(); @@ -356,6 +405,11 @@ public abstract class AbstractScopedSettings extends AbstractComponent { } } } + // the only time that validateInternalIndex should be true is if this call is coming via the update settings API + if (validateInternalIndex && setting.getProperties().contains(Setting.Property.InternalIndex)) { + throw new IllegalArgumentException( + "can not update internal setting [" + setting.getKey() + "]; this setting is managed via a dedicated API"); + } } setting.get(settings); } diff --git a/server/src/main/java/org/elasticsearch/common/settings/Setting.java b/server/src/main/java/org/elasticsearch/common/settings/Setting.java index f45f4bda9c9..7f3906ff5a2 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/Setting.java +++ b/server/src/main/java/org/elasticsearch/common/settings/Setting.java @@ -120,7 +120,13 @@ public class Setting implements ToXContentObject { * Mark this setting as not copyable during an index resize (shrink or split). This property can only be applied to settings that * also have {@link Property#IndexScope}. */ - NotCopyableOnResize + NotCopyableOnResize, + + /** + * Indicates an index-level setting that is managed internally. Such a setting can only be added to an index on index creation but + * can not be updated via the update API. + */ + InternalIndex } private final Key key; @@ -152,14 +158,18 @@ public class Setting implements ToXContentObject { if (propertiesAsSet.contains(Property.Dynamic) && propertiesAsSet.contains(Property.Final)) { throw new IllegalArgumentException("final setting [" + key + "] cannot be dynamic"); } - if (propertiesAsSet.contains(Property.NotCopyableOnResize) && propertiesAsSet.contains(Property.IndexScope) == false) { - throw new IllegalArgumentException( - "non-index-scoped setting [" + key + "] can not have property [" + Property.NotCopyableOnResize + "]"); - } + checkPropertyRequiresIndexScope(propertiesAsSet, Property.NotCopyableOnResize); + checkPropertyRequiresIndexScope(propertiesAsSet, Property.InternalIndex); this.properties = propertiesAsSet; } } + private void checkPropertyRequiresIndexScope(final EnumSet properties, final Property property) { + if (properties.contains(property) && properties.contains(Property.IndexScope) == false) { + throw new IllegalArgumentException("non-index-scoped setting [" + key + "] can not have property [" + property + "]"); + } + } + /** * Creates a new Setting instance * @param key the settings key for this setting. diff --git a/server/src/test/java/org/elasticsearch/common/settings/ScopedSettingsTests.java b/server/src/test/java/org/elasticsearch/common/settings/ScopedSettingsTests.java index f00768651f9..2376d566340 100644 --- a/server/src/test/java/org/elasticsearch/common/settings/ScopedSettingsTests.java +++ b/server/src/test/java/org/elasticsearch/common/settings/ScopedSettingsTests.java @@ -876,4 +876,28 @@ public class ScopedSettingsTests extends ESTestCase { Settings.builder().put(currentSettings), Settings.builder(), "node")); assertThat(exc.getMessage(), containsString("final node setting [some.final.group.foo]")); } + + public void testInternalIndexSettingsFailsValidation() { + final Setting indexInternalSetting = Setting.simpleString("index.internal", Property.InternalIndex, Property.IndexScope); + final IndexScopedSettings indexScopedSettings = + new IndexScopedSettings(Settings.EMPTY, Collections.singleton(indexInternalSetting)); + final IllegalArgumentException e = expectThrows( + IllegalArgumentException.class, + () -> { + final Settings settings = Settings.builder().put("index.internal", "internal").build(); + indexScopedSettings.validate(settings, false, /* validateInternalIndex */ true); + }); + final String message = "can not update internal setting [index.internal]; this setting is managed via a dedicated API"; + assertThat(e, hasToString(containsString(message))); + } + + public void testInternalIndexSettingsSkipValidation() { + final Setting internalIndexSetting = Setting.simpleString("index.internal", Property.InternalIndex, Property.IndexScope); + final IndexScopedSettings indexScopedSettings = + new IndexScopedSettings(Settings.EMPTY, Collections.singleton(internalIndexSetting)); + // nothing should happen, validation should not throw an exception + final Settings settings = Settings.builder().put("index.internal", "internal").build(); + indexScopedSettings.validate(settings, false, /* validateInternalIndex */ false); + } + } diff --git a/server/src/test/java/org/elasticsearch/common/settings/SettingTests.java b/server/src/test/java/org/elasticsearch/common/settings/SettingTests.java index 1ab92526e31..c32037f4452 100644 --- a/server/src/test/java/org/elasticsearch/common/settings/SettingTests.java +++ b/server/src/test/java/org/elasticsearch/common/settings/SettingTests.java @@ -735,6 +735,13 @@ public class SettingTests extends ESTestCase { assertThat(e, hasToString(containsString("non-index-scoped setting [foo.bar] can not have property [NotCopyableOnResize]"))); } + public void testRejectNonIndexScopedIndexInternalSetting() { + final IllegalArgumentException e = expectThrows( + IllegalArgumentException.class, + () -> Setting.simpleString("foo.bar", Property.InternalIndex)); + assertThat(e, hasToString(containsString("non-index-scoped setting [foo.bar] can not have property [InternalIndex]"))); + } + public void testTimeValue() { final TimeValue random = TimeValue.parseTimeValue(randomTimeValue(), "test"); diff --git a/server/src/test/java/org/elasticsearch/indices/settings/UpdateSettingsIT.java b/server/src/test/java/org/elasticsearch/indices/settings/UpdateSettingsIT.java index 51c073c607e..8093e7d38a1 100644 --- a/server/src/test/java/org/elasticsearch/indices/settings/UpdateSettingsIT.java +++ b/server/src/test/java/org/elasticsearch/indices/settings/UpdateSettingsIT.java @@ -19,19 +19,40 @@ package org.elasticsearch.indices.settings; +import org.elasticsearch.action.Action; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.master.MasterNodeRequest; +import org.elasticsearch.action.support.master.TransportMasterNodeAction; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateUpdateTask; +import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Priority; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.IndexModule; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.engine.VersionConflictEngineException; import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.plugins.ActionPlugin; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import java.io.IOException; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -46,6 +67,7 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertBloc import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertThrows; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasToString; import static org.hamcrest.Matchers.nullValue; public class UpdateSettingsIT extends ESIntegTestCase { @@ -79,7 +101,12 @@ public class UpdateSettingsIT extends ESIntegTestCase { @Override protected Collection> nodePlugins() { - return Arrays.asList(DummySettingPlugin.class, FinalSettingPlugin.class); + return Arrays.asList(DummySettingPlugin.class, FinalSettingPlugin.class, InternalIndexSettingsPlugin.class); + } + + @Override + protected Collection> transportClientPlugins() { + return Collections.singletonList(InternalIndexSettingsPlugin.class); } public static class DummySettingPlugin extends Plugin { @@ -124,6 +151,151 @@ public class UpdateSettingsIT extends ESIntegTestCase { } } + public static class InternalIndexSettingsPlugin extends Plugin implements ActionPlugin { + + public static final Setting INDEX_INTERNAL_SETTING = + Setting.simpleString("index.internal", Setting.Property.IndexScope, Setting.Property.InternalIndex); + + @Override + public List> getSettings() { + return Collections.singletonList(INDEX_INTERNAL_SETTING); + } + + public static class UpdateInternalIndexAction + extends Action { + + private static final UpdateInternalIndexAction INSTANCE = new UpdateInternalIndexAction(); + private static final String NAME = "indices:admin/settings/update-internal-index"; + + public UpdateInternalIndexAction() { + super(NAME); + } + + static class Request extends MasterNodeRequest { + + private String index; + private String value; + + Request() { + + } + + Request(final String index, final String value) { + this.index = index; + this.value = value; + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + @Override + public void readFrom(final StreamInput in) throws IOException { + super.readFrom(in); + index = in.readString(); + value = in.readString(); + } + + @Override + public void writeTo(final StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(index); + out.writeString(value); + } + + } + + static class Response extends ActionResponse { + + } + + @Override + public Response newResponse() { + return new Response(); + } + + } + + public static class TransportUpdateInternalIndexAction + extends TransportMasterNodeAction { + + @Inject + public TransportUpdateInternalIndexAction( + final Settings settings, + final TransportService transportService, + final ClusterService clusterService, + final ThreadPool threadPool, + final ActionFilters actionFilters, + final IndexNameExpressionResolver indexNameExpressionResolver) { + super( + settings, + UpdateInternalIndexAction.NAME, + transportService, + clusterService, + threadPool, + actionFilters, + indexNameExpressionResolver, + UpdateInternalIndexAction.Request::new); + } + + @Override + protected String executor() { + return ThreadPool.Names.SAME; + } + + @Override + protected UpdateInternalIndexAction.Response newResponse() { + return new UpdateInternalIndexAction.Response(); + } + + @Override + protected void masterOperation( + final UpdateInternalIndexAction.Request request, + final ClusterState state, + final ActionListener listener) throws Exception { + clusterService.submitStateUpdateTask("update-index-internal", new ClusterStateUpdateTask() { + @Override + public ClusterState execute(final ClusterState currentState) throws Exception { + final MetaData.Builder builder = MetaData.builder(currentState.metaData()); + final IndexMetaData.Builder imdBuilder = IndexMetaData.builder(currentState.metaData().index(request.index)); + final Settings.Builder settingsBuilder = + Settings.builder() + .put(currentState.metaData().index(request.index).getSettings()) + .put("index.internal", request.value); + imdBuilder.settings(settingsBuilder); + builder.put(imdBuilder.build(), true); + return ClusterState.builder(currentState).metaData(builder).build(); + } + + @Override + public void clusterStateProcessed(final String source, final ClusterState oldState, final ClusterState newState) { + listener.onResponse(new UpdateInternalIndexAction.Response()); + } + + @Override + public void onFailure(final String source, final Exception e) { + listener.onFailure(e); + } + + }); + } + + @Override + protected ClusterBlockException checkBlock(UpdateInternalIndexAction.Request request, ClusterState state) { + return null; + } + + } + + @Override + public List> getActions() { + return Collections.singletonList( + new ActionHandler<>(UpdateInternalIndexAction.INSTANCE, TransportUpdateInternalIndexAction.class)); + } + + } + public void testUpdateDependentClusterSettings() { IllegalArgumentException iae = expectThrows(IllegalArgumentException.class, () -> client().admin().cluster().prepareUpdateSettings().setPersistentSettings(Settings.builder() @@ -474,4 +646,35 @@ public class UpdateSettingsIT extends ESIntegTestCase { } } + public void testUpdateInternalIndexSettingViaSettingsAPI() { + final Settings settings = Settings.builder().put("index.internal", "internal").build(); + createIndex("test", settings); + final GetSettingsResponse response = client().admin().indices().prepareGetSettings("test").get(); + assertThat(response.getSetting("test", "index.internal"), equalTo("internal")); + // we can not update the setting via the update settings API + final IllegalArgumentException e = expectThrows(IllegalArgumentException.class, + () -> client().admin() + .indices() + .prepareUpdateSettings("test") + .setSettings(Settings.builder().put("index.internal", "internal-update")) + .get()); + final String message = "can not update internal setting [index.internal]; this setting is managed via a dedicated API"; + assertThat(e, hasToString(containsString(message))); + final GetSettingsResponse responseAfterAttemptedUpdate = client().admin().indices().prepareGetSettings("test").get(); + assertThat(responseAfterAttemptedUpdate.getSetting("test", "index.internal"), equalTo("internal")); + } + + public void testUpdateInternalIndexSettingViaDedicatedAPI() { + final Settings settings = Settings.builder().put("index.internal", "internal").build(); + createIndex("test", settings); + final GetSettingsResponse response = client().admin().indices().prepareGetSettings("test").get(); + assertThat(response.getSetting("test", "index.internal"), equalTo("internal")); + client().execute( + InternalIndexSettingsPlugin.UpdateInternalIndexAction.INSTANCE, + new InternalIndexSettingsPlugin.UpdateInternalIndexAction.Request("test", "internal-update")) + .actionGet(); + final GetSettingsResponse responseAfterUpdate = client().admin().indices().prepareGetSettings("test").get(); + assertThat(responseAfterUpdate.getSetting("test", "index.internal"), equalTo("internal-update")); + } + } From 88f44a9f66b61fb158b3f4605a57909c86037489 Mon Sep 17 00:00:00 2001 From: David Kyle Date: Wed, 13 Jun 2018 15:42:18 +0100 Subject: [PATCH 4/5] [ML] Check licence when datafeeds use cross cluster search (#31247) This change prevents a datafeed using cross cluster search from starting if the remote cluster does not have x-pack installed and a sufficient license. The check is made only when starting a datafeed. --- .../core/ml/datafeed/DatafeedConfigTests.java | 25 ++- .../action/TransportStartDatafeedAction.java | 129 +++++++---- .../ml/datafeed/DatafeedNodeSelector.java | 6 +- .../ml/datafeed/MlRemoteLicenseChecker.java | 192 +++++++++++++++++ .../process/autodetect/AutodetectProcess.java | 2 +- .../datafeed/MlRemoteLicenseCheckerTests.java | 200 ++++++++++++++++++ 6 files changed, 494 insertions(+), 60 deletions(-) create mode 100644 x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/MlRemoteLicenseChecker.java create mode 100644 x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/MlRemoteLicenseCheckerTests.java diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfigTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfigTests.java index 6aa987fc0e9..d59ef16dfdf 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfigTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfigTests.java @@ -40,7 +40,6 @@ import org.joda.time.DateTimeZone; import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.TimeZone; @@ -193,11 +192,11 @@ public class DatafeedConfigTests extends AbstractSerializingTestCase conf.setIndices(null)); } - public void testCheckValid_GivenEmptyIndices() throws IOException { + public void testCheckValid_GivenEmptyIndices() { DatafeedConfig.Builder conf = new DatafeedConfig.Builder("datafeed1", "job1"); conf.setIndices(Collections.emptyList()); ElasticsearchException e = ESTestCase.expectThrows(ElasticsearchException.class, conf::build); assertEquals(Messages.getMessage(Messages.DATAFEED_CONFIG_INVALID_OPTION_VALUE, "indices", "[]"), e.getMessage()); } - public void testCheckValid_GivenIndicesContainsOnlyNulls() throws IOException { + public void testCheckValid_GivenIndicesContainsOnlyNulls() { List indices = new ArrayList<>(); indices.add(null); indices.add(null); @@ -230,7 +229,7 @@ public class DatafeedConfigTests extends AbstractSerializingTestCase indices = new ArrayList<>(); indices.add(""); indices.add(""); @@ -240,27 +239,27 @@ public class DatafeedConfigTests extends AbstractSerializingTestCase conf.setQueryDelay(TimeValue.timeValueMillis(-10))); assertEquals("query_delay cannot be less than 0. Value = -10", e.getMessage()); } - public void testCheckValid_GivenZeroFrequency() throws IOException { + public void testCheckValid_GivenZeroFrequency() { DatafeedConfig.Builder conf = new DatafeedConfig.Builder("datafeed1", "job1"); IllegalArgumentException e = ESTestCase.expectThrows(IllegalArgumentException.class, () -> conf.setFrequency(TimeValue.ZERO)); assertEquals("frequency cannot be less or equal than 0. Value = 0s", e.getMessage()); } - public void testCheckValid_GivenNegativeFrequency() throws IOException { + public void testCheckValid_GivenNegativeFrequency() { DatafeedConfig.Builder conf = new DatafeedConfig.Builder("datafeed1", "job1"); IllegalArgumentException e = ESTestCase.expectThrows(IllegalArgumentException.class, () -> conf.setFrequency(TimeValue.timeValueMinutes(-1))); assertEquals("frequency cannot be less or equal than 0. Value = -1", e.getMessage()); } - public void testCheckValid_GivenNegativeScrollSize() throws IOException { + public void testCheckValid_GivenNegativeScrollSize() { DatafeedConfig.Builder conf = new DatafeedConfig.Builder("datafeed1", "job1"); ElasticsearchException e = ESTestCase.expectThrows(ElasticsearchException.class, () -> conf.setScrollSize(-1000)); assertEquals(Messages.getMessage(Messages.DATAFEED_CONFIG_INVALID_OPTION_VALUE, "scroll_size", -1000L), e.getMessage()); @@ -414,7 +413,7 @@ public class DatafeedConfigTests extends AbstractSerializingTestCase listener) { StartDatafeedAction.DatafeedParams params = request.getParams(); if (licenseState.isMachineLearningAllowed()) { - ActionListener> finalListener = - new ActionListener>() { - @Override - public void onResponse(PersistentTasksCustomMetaData.PersistentTask persistentTask) { - waitForDatafeedStarted(persistentTask.getId(), params, listener); - } - @Override - public void onFailure(Exception e) { - if (e instanceof ResourceAlreadyExistsException) { - logger.debug("datafeed already started", e); - e = new ElasticsearchStatusException("cannot start datafeed [" + params.getDatafeedId() + - "] because it has already been started", RestStatus.CONFLICT); - } - listener.onFailure(e); - } - }; + ActionListener> waitForTaskListener = + new ActionListener>() { + @Override + public void onResponse(PersistentTasksCustomMetaData.PersistentTask + persistentTask) { + waitForDatafeedStarted(persistentTask.getId(), params, listener); + } + + @Override + public void onFailure(Exception e) { + if (e instanceof ResourceAlreadyExistsException) { + logger.debug("datafeed already started", e); + e = new ElasticsearchStatusException("cannot start datafeed [" + params.getDatafeedId() + + "] because it has already been started", RestStatus.CONFLICT); + } + listener.onFailure(e); + } + }; // Verify data extractor factory can be created, then start persistent task MlMetadata mlMetadata = MlMetadata.getMlMetadata(state); @@ -135,16 +139,39 @@ public class TransportStartDatafeedAction extends TransportMasterNodeAction - persistentTasksService.sendStartRequest(MLMetadataField.datafeedTaskId(params.getDatafeedId()), - StartDatafeedAction.TASK_NAME, params, finalListener) - , listener::onFailure)); + + if (MlRemoteLicenseChecker.containsRemoteIndex(datafeed.getIndices())) { + MlRemoteLicenseChecker remoteLicenseChecker = new MlRemoteLicenseChecker(client); + remoteLicenseChecker.checkRemoteClusterLicenses(MlRemoteLicenseChecker.remoteClusterNames(datafeed.getIndices()), + ActionListener.wrap( + response -> { + if (response.isViolated()) { + listener.onFailure(createUnlicensedError(datafeed.getId(), response)); + } else { + createDataExtractor(job, datafeed, params, waitForTaskListener); + } + }, + e -> listener.onFailure(createUnknownLicenseError(datafeed.getId(), + MlRemoteLicenseChecker.remoteIndices(datafeed.getIndices()), e)) + )); + } else { + createDataExtractor(job, datafeed, params, waitForTaskListener); + } } else { listener.onFailure(LicenseUtils.newComplianceException(XPackField.MACHINE_LEARNING)); } } + private void createDataExtractor(Job job, DatafeedConfig datafeed, StartDatafeedAction.DatafeedParams params, + ActionListener> + listener) { + DataExtractorFactory.create(client, datafeed, job, ActionListener.wrap( + dataExtractorFactory -> + persistentTasksService.sendStartRequest(MLMetadataField.datafeedTaskId(params.getDatafeedId()), + StartDatafeedAction.TASK_NAME, params, listener) + , listener::onFailure)); + } + @Override protected ClusterBlockException checkBlock(StartDatafeedAction.Request request, ClusterState state) { // We only delegate here to PersistentTasksService, but if there is a metadata writeblock, @@ -158,28 +185,29 @@ public class TransportStartDatafeedAction extends TransportMasterNodeAction() { - @Override - public void onResponse(PersistentTasksCustomMetaData.PersistentTask persistentTask) { - if (predicate.exception != null) { - // We want to return to the caller without leaving an unassigned persistent task, to match - // what would have happened if the error had been detected in the "fast fail" validation - cancelDatafeedStart(persistentTask, predicate.exception, listener); - } else { - listener.onResponse(new StartDatafeedAction.Response(true)); - } - } + @Override + public void onResponse(PersistentTasksCustomMetaData.PersistentTask + persistentTask) { + if (predicate.exception != null) { + // We want to return to the caller without leaving an unassigned persistent task, to match + // what would have happened if the error had been detected in the "fast fail" validation + cancelDatafeedStart(persistentTask, predicate.exception, listener); + } else { + listener.onResponse(new StartDatafeedAction.Response(true)); + } + } - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } - @Override - public void onTimeout(TimeValue timeout) { - listener.onFailure(new ElasticsearchException("Starting datafeed [" - + params.getDatafeedId() + "] timed out after [" + timeout + "]")); - } - }); + @Override + public void onTimeout(TimeValue timeout) { + listener.onFailure(new ElasticsearchException("Starting datafeed [" + + params.getDatafeedId() + "] timed out after [" + timeout + "]")); + } + }); } private void cancelDatafeedStart(PersistentTasksCustomMetaData.PersistentTask persistentTask, @@ -203,6 +231,25 @@ public class TransportStartDatafeedAction extends TransportMasterNodeAction remoteIndices, + Exception cause) { + String message = "Cannot start datafeed [" + datafeedId + "] as it is configured to use" + + " indices on a remote cluster " + remoteIndices + + " but the license type could not be verified"; + + return new ElasticsearchStatusException(message, RestStatus.BAD_REQUEST, new Exception(cause.getMessage())); + } + public static class StartDatafeedPersistentTasksExecutor extends PersistentTasksExecutor { private final DatafeedManager datafeedManager; private final IndexNameExpressionResolver resolver; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelector.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelector.java index 37f9715d094..0eb57ab79be 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelector.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelector.java @@ -91,7 +91,7 @@ public class DatafeedNodeSelector { List indices = datafeed.getIndices(); for (String index : indices) { - if (isRemoteIndex(index)) { + if (MlRemoteLicenseChecker.isRemoteIndex(index)) { // We cannot verify remote indices continue; } @@ -122,10 +122,6 @@ public class DatafeedNodeSelector { return null; } - private boolean isRemoteIndex(String index) { - return index.indexOf(':') != -1; - } - private static class AssignmentFailure { private final String reason; private final boolean isCriticalForTaskCreation; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/MlRemoteLicenseChecker.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/MlRemoteLicenseChecker.java new file mode 100644 index 00000000000..b55713f6d0a --- /dev/null +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/MlRemoteLicenseChecker.java @@ -0,0 +1,192 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ml.datafeed; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.license.License; +import org.elasticsearch.license.XPackInfoResponse; +import org.elasticsearch.transport.ActionNotFoundTransportException; +import org.elasticsearch.transport.RemoteClusterAware; +import org.elasticsearch.xpack.core.action.XPackInfoAction; +import org.elasticsearch.xpack.core.action.XPackInfoRequest; + +import java.util.EnumSet; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; + +/** + * ML datafeeds can use cross cluster search to access data in a remote cluster. + * The remote cluster should be licenced for ML this class performs that check + * using the _xpack (info) endpoint. + */ +public class MlRemoteLicenseChecker { + + private final Client client; + + public static class RemoteClusterLicenseInfo { + private final String clusterName; + private final XPackInfoResponse.LicenseInfo licenseInfo; + + RemoteClusterLicenseInfo(String clusterName, XPackInfoResponse.LicenseInfo licenseInfo) { + this.clusterName = clusterName; + this.licenseInfo = licenseInfo; + } + + public String getClusterName() { + return clusterName; + } + + public XPackInfoResponse.LicenseInfo getLicenseInfo() { + return licenseInfo; + } + } + + public class LicenseViolation { + private final RemoteClusterLicenseInfo licenseInfo; + + private LicenseViolation(@Nullable RemoteClusterLicenseInfo licenseInfo) { + this.licenseInfo = licenseInfo; + } + + public boolean isViolated() { + return licenseInfo != null; + } + + public RemoteClusterLicenseInfo get() { + return licenseInfo; + } + } + + public MlRemoteLicenseChecker(Client client) { + this.client = client; + } + + /** + * Check each cluster is licensed for ML. + * This function evaluates lazily and will terminate when the first cluster + * that is not licensed is found or an error occurs. + * + * @param clusterNames List of remote cluster names + * @param listener Response listener + */ + public void checkRemoteClusterLicenses(List clusterNames, ActionListener listener) { + final Iterator itr = clusterNames.iterator(); + if (itr.hasNext() == false) { + listener.onResponse(new LicenseViolation(null)); + return; + } + + final AtomicReference clusterName = new AtomicReference<>(itr.next()); + + ActionListener infoListener = new ActionListener() { + @Override + public void onResponse(XPackInfoResponse xPackInfoResponse) { + if (licenseSupportsML(xPackInfoResponse.getLicenseInfo()) == false) { + listener.onResponse(new LicenseViolation( + new RemoteClusterLicenseInfo(clusterName.get(), xPackInfoResponse.getLicenseInfo()))); + return; + } + + if (itr.hasNext()) { + clusterName.set(itr.next()); + remoteClusterLicense(clusterName.get(), this); + } else { + listener.onResponse(new LicenseViolation(null)); + } + } + + @Override + public void onFailure(Exception e) { + String message = "Could not determine the X-Pack licence type for cluster [" + clusterName.get() + "]"; + if (e instanceof ActionNotFoundTransportException) { + // This is likely to be because x-pack is not installed in the target cluster + message += ". Is X-Pack installed on the target cluster?"; + } + listener.onFailure(new ElasticsearchException(message, e)); + } + }; + + remoteClusterLicense(clusterName.get(), infoListener); + } + + private void remoteClusterLicense(String clusterName, ActionListener listener) { + Client remoteClusterClient = client.getRemoteClusterClient(clusterName); + ThreadContext threadContext = remoteClusterClient.threadPool().getThreadContext(); + try (ThreadContext.StoredContext ignore = threadContext.stashContext()) { + // we stash any context here since this is an internal execution and should not leak any + // existing context information. + threadContext.markAsSystemContext(); + + XPackInfoRequest request = new XPackInfoRequest(); + request.setCategories(EnumSet.of(XPackInfoRequest.Category.LICENSE)); + remoteClusterClient.execute(XPackInfoAction.INSTANCE, request, listener); + } + } + + static boolean licenseSupportsML(XPackInfoResponse.LicenseInfo licenseInfo) { + License.OperationMode mode = License.OperationMode.resolve(licenseInfo.getMode()); + return licenseInfo.getStatus() == License.Status.ACTIVE && + (mode == License.OperationMode.PLATINUM || mode == License.OperationMode.TRIAL); + } + + public static boolean isRemoteIndex(String index) { + return index.indexOf(RemoteClusterAware.REMOTE_CLUSTER_INDEX_SEPARATOR) != -1; + } + + public static boolean containsRemoteIndex(List indices) { + return indices.stream().anyMatch(MlRemoteLicenseChecker::isRemoteIndex); + } + + /** + * Get any remote indices used in cross cluster search. + * Remote indices are of the form {@code cluster_name:index_name} + * @return List of remote cluster indices + */ + public static List remoteIndices(List indices) { + return indices.stream().filter(MlRemoteLicenseChecker::isRemoteIndex).collect(Collectors.toList()); + } + + /** + * Extract the list of remote cluster names from the list of indices. + * @param indices List of indices. Remote cluster indices are prefixed + * with {@code cluster-name:} + * @return Every cluster name found in {@code indices} + */ + public static List remoteClusterNames(List indices) { + return indices.stream() + .filter(MlRemoteLicenseChecker::isRemoteIndex) + .map(index -> index.substring(0, index.indexOf(RemoteClusterAware.REMOTE_CLUSTER_INDEX_SEPARATOR))) + .distinct() + .collect(Collectors.toList()); + } + + public static String buildErrorMessage(RemoteClusterLicenseInfo clusterLicenseInfo) { + StringBuilder error = new StringBuilder(); + if (clusterLicenseInfo.licenseInfo.getStatus() != License.Status.ACTIVE) { + error.append("The license on cluster [").append(clusterLicenseInfo.clusterName) + .append("] is not active. "); + } else { + License.OperationMode mode = License.OperationMode.resolve(clusterLicenseInfo.licenseInfo.getMode()); + if (mode != License.OperationMode.PLATINUM && mode != License.OperationMode.TRIAL) { + error.append("The license mode [").append(mode) + .append("] on cluster [") + .append(clusterLicenseInfo.clusterName) + .append("] does not enable Machine Learning. "); + } + } + + error.append(Strings.toString(clusterLicenseInfo.licenseInfo)); + return error.toString(); + } +} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcess.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcess.java index 049880b1ac2..21be815d561 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcess.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcess.java @@ -117,7 +117,7 @@ public interface AutodetectProcess extends Closeable { /** * Ask the job to start persisting model state in the background - * @throws IOException + * @throws IOException If writing the request fails */ void persistJob() throws IOException; diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/MlRemoteLicenseCheckerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/MlRemoteLicenseCheckerTests.java new file mode 100644 index 00000000000..47d4d30a7c6 --- /dev/null +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/MlRemoteLicenseCheckerTests.java @@ -0,0 +1,200 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ml.datafeed; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.license.License; +import org.elasticsearch.license.XPackInfoResponse; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.core.action.XPackInfoAction; +import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.is; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.same; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class MlRemoteLicenseCheckerTests extends ESTestCase { + + public void testIsRemoteIndex() { + List indices = Arrays.asList("local-index1", "local-index2"); + assertFalse(MlRemoteLicenseChecker.containsRemoteIndex(indices)); + indices = Arrays.asList("local-index1", "remote-cluster:remote-index2"); + assertTrue(MlRemoteLicenseChecker.containsRemoteIndex(indices)); + } + + public void testRemoteIndices() { + List indices = Collections.singletonList("local-index"); + assertThat(MlRemoteLicenseChecker.remoteIndices(indices), is(empty())); + indices = Arrays.asList("local-index", "remote-cluster:index1", "local-index2", "remote-cluster2:index1"); + assertThat(MlRemoteLicenseChecker.remoteIndices(indices), containsInAnyOrder("remote-cluster:index1", "remote-cluster2:index1")); + } + + public void testRemoteClusterNames() { + List indices = Arrays.asList("local-index1", "local-index2"); + assertThat(MlRemoteLicenseChecker.remoteClusterNames(indices), empty()); + indices = Arrays.asList("local-index1", "remote-cluster1:remote-index2"); + assertThat(MlRemoteLicenseChecker.remoteClusterNames(indices), contains("remote-cluster1")); + indices = Arrays.asList("remote-cluster1:index2", "index1", "remote-cluster2:index1"); + assertThat(MlRemoteLicenseChecker.remoteClusterNames(indices), contains("remote-cluster1", "remote-cluster2")); + indices = Arrays.asList("remote-cluster1:index2", "index1", "remote-cluster2:index1", "remote-cluster2:index2"); + assertThat(MlRemoteLicenseChecker.remoteClusterNames(indices), contains("remote-cluster1", "remote-cluster2")); + } + + public void testLicenseSupportsML() { + XPackInfoResponse.LicenseInfo licenseInfo = new XPackInfoResponse.LicenseInfo("uid", "trial", "trial", + License.Status.ACTIVE, randomNonNegativeLong()); + assertTrue(MlRemoteLicenseChecker.licenseSupportsML(licenseInfo)); + + licenseInfo = new XPackInfoResponse.LicenseInfo("uid", "trial", "trial", License.Status.EXPIRED, randomNonNegativeLong()); + assertFalse(MlRemoteLicenseChecker.licenseSupportsML(licenseInfo)); + + licenseInfo = new XPackInfoResponse.LicenseInfo("uid", "GOLD", "GOLD", License.Status.ACTIVE, randomNonNegativeLong()); + assertFalse(MlRemoteLicenseChecker.licenseSupportsML(licenseInfo)); + + licenseInfo = new XPackInfoResponse.LicenseInfo("uid", "PLATINUM", "PLATINUM", License.Status.ACTIVE, randomNonNegativeLong()); + assertTrue(MlRemoteLicenseChecker.licenseSupportsML(licenseInfo)); + } + + public void testCheckRemoteClusterLicenses_givenValidLicenses() { + final AtomicInteger index = new AtomicInteger(0); + final List responses = new ArrayList<>(); + + Client client = createMockClient(); + doAnswer(invocationMock -> { + @SuppressWarnings("raw_types") + ActionListener listener = (ActionListener) invocationMock.getArguments()[2]; + listener.onResponse(responses.get(index.getAndIncrement())); + return null; + }).when(client).execute(same(XPackInfoAction.INSTANCE), any(), any()); + + + List remoteClusterNames = Arrays.asList("valid1", "valid2", "valid3"); + responses.add(new XPackInfoResponse(null, createPlatinumLicenseResponse(), null)); + responses.add(new XPackInfoResponse(null, createPlatinumLicenseResponse(), null)); + responses.add(new XPackInfoResponse(null, createPlatinumLicenseResponse(), null)); + + MlRemoteLicenseChecker licenseChecker = new MlRemoteLicenseChecker(client); + AtomicReference licCheckResponse = new AtomicReference<>(); + + licenseChecker.checkRemoteClusterLicenses(remoteClusterNames, + new ActionListener() { + @Override + public void onResponse(MlRemoteLicenseChecker.LicenseViolation response) { + licCheckResponse.set(response); + } + + @Override + public void onFailure(Exception e) { + fail(e.getMessage()); + } + }); + + verify(client, times(3)).execute(same(XPackInfoAction.INSTANCE), any(), any()); + assertNotNull(licCheckResponse.get()); + assertFalse(licCheckResponse.get().isViolated()); + assertNull(licCheckResponse.get().get()); + } + + public void testCheckRemoteClusterLicenses_givenInvalidLicense() { + final AtomicInteger index = new AtomicInteger(0); + List remoteClusterNames = Arrays.asList("good", "cluster-with-basic-license", "good2"); + final List responses = new ArrayList<>(); + responses.add(new XPackInfoResponse(null, createPlatinumLicenseResponse(), null)); + responses.add(new XPackInfoResponse(null, createBasicLicenseResponse(), null)); + responses.add(new XPackInfoResponse(null, createPlatinumLicenseResponse(), null)); + + Client client = createMockClient(); + doAnswer(invocationMock -> { + @SuppressWarnings("raw_types") + ActionListener listener = (ActionListener) invocationMock.getArguments()[2]; + listener.onResponse(responses.get(index.getAndIncrement())); + return null; + }).when(client).execute(same(XPackInfoAction.INSTANCE), any(), any()); + + MlRemoteLicenseChecker licenseChecker = new MlRemoteLicenseChecker(client); + AtomicReference licCheckResponse = new AtomicReference<>(); + + licenseChecker.checkRemoteClusterLicenses(remoteClusterNames, + new ActionListener() { + @Override + public void onResponse(MlRemoteLicenseChecker.LicenseViolation response) { + licCheckResponse.set(response); + } + + @Override + public void onFailure(Exception e) { + fail(e.getMessage()); + } + }); + + verify(client, times(2)).execute(same(XPackInfoAction.INSTANCE), any(), any()); + assertNotNull(licCheckResponse.get()); + assertTrue(licCheckResponse.get().isViolated()); + assertEquals("cluster-with-basic-license", licCheckResponse.get().get().getClusterName()); + assertEquals("BASIC", licCheckResponse.get().get().getLicenseInfo().getType()); + } + + public void testBuildErrorMessage() { + XPackInfoResponse.LicenseInfo platinumLicence = createPlatinumLicenseResponse(); + MlRemoteLicenseChecker.RemoteClusterLicenseInfo info = + new MlRemoteLicenseChecker.RemoteClusterLicenseInfo("platinum-cluster", platinumLicence); + assertEquals(Strings.toString(platinumLicence), MlRemoteLicenseChecker.buildErrorMessage(info)); + + XPackInfoResponse.LicenseInfo basicLicense = createBasicLicenseResponse(); + info = new MlRemoteLicenseChecker.RemoteClusterLicenseInfo("basic-cluster", basicLicense); + String expected = "The license mode [BASIC] on cluster [basic-cluster] does not enable Machine Learning. " + + Strings.toString(basicLicense); + assertEquals(expected, MlRemoteLicenseChecker.buildErrorMessage(info)); + + XPackInfoResponse.LicenseInfo expiredLicense = createExpiredLicenseResponse(); + info = new MlRemoteLicenseChecker.RemoteClusterLicenseInfo("expired-cluster", expiredLicense); + expected = "The license on cluster [expired-cluster] is not active. " + Strings.toString(expiredLicense); + assertEquals(expected, MlRemoteLicenseChecker.buildErrorMessage(info)); + } + + private Client createMockClient() { + Client client = mock(Client.class); + ThreadPool threadPool = mock(ThreadPool.class); + when(client.threadPool()).thenReturn(threadPool); + when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY)); + when(client.getRemoteClusterClient(anyString())).thenReturn(client); + return client; + } + + private XPackInfoResponse.LicenseInfo createPlatinumLicenseResponse() { + return new XPackInfoResponse.LicenseInfo("uid", "PLATINUM", "PLATINUM", License.Status.ACTIVE, randomNonNegativeLong()); + } + + private XPackInfoResponse.LicenseInfo createBasicLicenseResponse() { + return new XPackInfoResponse.LicenseInfo("uid", "BASIC", "BASIC", License.Status.ACTIVE, randomNonNegativeLong()); + } + + private XPackInfoResponse.LicenseInfo createExpiredLicenseResponse() { + return new XPackInfoResponse.LicenseInfo("uid", "PLATINUM", "PLATINUM", License.Status.EXPIRED, randomNonNegativeLong()); + } +} From a486177a19171ab408cb88c1b1ef8d75ef2427fd Mon Sep 17 00:00:00 2001 From: Zachary Tong Date: Wed, 13 Jun 2018 11:31:04 -0400 Subject: [PATCH 5/5] [Rollup] Metric config parser must use builder so validation runs (#31159) The parser for the Metric config was directly instantiating the config object, rather than using the builder. That means it was bypassing the validation logic built into the builder, and would allow users to create invalid metric configs (like using unsupported metrics). The job would later blow up and abort due to bad configs, but this isn't immediately obvious to the user since the PutJob API succeeded. --- .../xpack/core/rollup/job/MetricConfig.java | 11 ++++--- .../core/rollup/job/RollupJobConfig.java | 2 +- .../job/MetricsConfigSerializingTests.java | 4 +-- .../rest-api-spec/test/rollup/put_job.yml | 30 +++++++++++++++++++ 4 files changed, 38 insertions(+), 9 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/job/MetricConfig.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/job/MetricConfig.java index f26c67935ed..67b83646c42 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/job/MetricConfig.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/job/MetricConfig.java @@ -12,7 +12,7 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; -import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.ToXContentFragment; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.index.mapper.NumberFieldMapper; @@ -75,12 +75,11 @@ public class MetricConfig implements Writeable, ToXContentFragment { MAPPER_TYPES = types; } - public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( - NAME, a -> new MetricConfig((String)a[0], (List) a[1])); + public static final ObjectParser PARSER = new ObjectParser<>(NAME, MetricConfig.Builder::new); static { - PARSER.declareString(ConstructingObjectParser.constructorArg(), FIELD); - PARSER.declareStringArray(ConstructingObjectParser.constructorArg(), METRICS); + PARSER.declareString(MetricConfig.Builder::setField, FIELD); + PARSER.declareStringArray(MetricConfig.Builder::setMetrics, METRICS); } MetricConfig(String name, List metrics) { @@ -257,4 +256,4 @@ public class MetricConfig implements Writeable, ToXContentFragment { return new MetricConfig(field, metrics); } } -} \ No newline at end of file +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/job/RollupJobConfig.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/job/RollupJobConfig.java index 3818ebcf447..422ecdd5fd9 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/job/RollupJobConfig.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/job/RollupJobConfig.java @@ -63,7 +63,7 @@ public class RollupJobConfig implements NamedWriteable, ToXContentObject { static { PARSER.declareString(RollupJobConfig.Builder::setId, RollupField.ID); PARSER.declareObject(RollupJobConfig.Builder::setGroupConfig, (p, c) -> GroupConfig.PARSER.apply(p,c).build(), GROUPS); - PARSER.declareObjectArray(RollupJobConfig.Builder::setMetricsConfig, MetricConfig.PARSER, METRICS); + PARSER.declareObjectArray(RollupJobConfig.Builder::setMetricsConfig, (p, c) -> MetricConfig.PARSER.apply(p, c).build(), METRICS); PARSER.declareString((params, val) -> params.setTimeout(TimeValue.parseTimeValue(val, TIMEOUT.getPreferredName())), TIMEOUT); PARSER.declareString(RollupJobConfig.Builder::setIndexPattern, INDEX_PATTERN); diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/rollup/job/MetricsConfigSerializingTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/rollup/job/MetricsConfigSerializingTests.java index 92a0976f532..9b330e71650 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/rollup/job/MetricsConfigSerializingTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/rollup/job/MetricsConfigSerializingTests.java @@ -24,7 +24,7 @@ import static org.mockito.Mockito.when; public class MetricsConfigSerializingTests extends AbstractSerializingTestCase { @Override protected MetricConfig doParseInstance(XContentParser parser) throws IOException { - return MetricConfig.PARSER.apply(parser, null); + return MetricConfig.PARSER.apply(parser, null).build(); } @Override @@ -36,7 +36,7 @@ public class MetricsConfigSerializingTests extends AbstractSerializingTestCase> responseMap = new HashMap<>(); diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/rollup/put_job.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/rollup/put_job.yml index 717be0d6b25..98ef9b32e3d 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/rollup/put_job.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/rollup/put_job.yml @@ -188,3 +188,33 @@ setup: ] } +--- +"Unknown Metric": + + - do: + catch: /Unsupported metric \[does_not_exist\]/ + headers: + Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser + xpack.rollup.put_job: + id: foo + body: > + { + "index_pattern": "foo", + "rollup_index": "foo_rollup", + "cron": "*/30 * * * * ?", + "page_size" :10, + "groups" : { + "date_histogram": { + "field": "the_field", + "interval": "1h" + } + }, + "metrics": [ + { + "field": "value_field", + "metrics": ["min", "max", "sum", "does_not_exist"] + } + ] + } + +