Merge branch 'master' into index-lifecycle
This commit is contained in:
commit
6785391a5f
|
@ -129,7 +129,6 @@ public class ClusterClientIT extends ESRestHighLevelClientTestCase {
|
||||||
createIndex("index2", Settings.EMPTY);
|
createIndex("index2", Settings.EMPTY);
|
||||||
ClusterHealthRequest request = new ClusterHealthRequest();
|
ClusterHealthRequest request = new ClusterHealthRequest();
|
||||||
request.timeout("5s");
|
request.timeout("5s");
|
||||||
request.level(ClusterHealthRequest.Level.CLUSTER);
|
|
||||||
ClusterHealthResponse response = execute(request, highLevelClient().cluster()::health, highLevelClient().cluster()::healthAsync);
|
ClusterHealthResponse response = execute(request, highLevelClient().cluster()::health, highLevelClient().cluster()::healthAsync);
|
||||||
|
|
||||||
assertYellowShards(response);
|
assertYellowShards(response);
|
||||||
|
@ -170,6 +169,7 @@ public class ClusterClientIT extends ESRestHighLevelClientTestCase {
|
||||||
createIndex("index", Settings.EMPTY);
|
createIndex("index", Settings.EMPTY);
|
||||||
createIndex("index2", Settings.EMPTY);
|
createIndex("index2", Settings.EMPTY);
|
||||||
ClusterHealthRequest request = new ClusterHealthRequest("index");
|
ClusterHealthRequest request = new ClusterHealthRequest("index");
|
||||||
|
request.level(ClusterHealthRequest.Level.SHARDS);
|
||||||
request.timeout("5s");
|
request.timeout("5s");
|
||||||
ClusterHealthResponse response = execute(request, highLevelClient().cluster()::health, highLevelClient().cluster()::healthAsync);
|
ClusterHealthResponse response = execute(request, highLevelClient().cluster()::health, highLevelClient().cluster()::healthAsync);
|
||||||
|
|
||||||
|
|
|
@ -1568,7 +1568,7 @@ public class RequestConvertersTests extends ESTestCase {
|
||||||
healthRequest.level(level);
|
healthRequest.level(level);
|
||||||
expectedParams.put("level", level.name().toLowerCase(Locale.ROOT));
|
expectedParams.put("level", level.name().toLowerCase(Locale.ROOT));
|
||||||
} else {
|
} else {
|
||||||
expectedParams.put("level", "shards");
|
expectedParams.put("level", "cluster");
|
||||||
}
|
}
|
||||||
if (randomBoolean()) {
|
if (randomBoolean()) {
|
||||||
Priority priority = randomFrom(Priority.values());
|
Priority priority = randomFrom(Priority.values());
|
||||||
|
|
|
@ -67,6 +67,7 @@ include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[health-request-wai
|
||||||
include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[health-request-level]
|
include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[health-request-level]
|
||||||
--------------------------------------------------
|
--------------------------------------------------
|
||||||
<1> The level of detail of the returned health information. Accepts a `ClusterHealthRequest.Level` value.
|
<1> The level of detail of the returned health information. Accepts a `ClusterHealthRequest.Level` value.
|
||||||
|
Default value is `cluster`.
|
||||||
|
|
||||||
["source","java",subs="attributes,callouts,macros"]
|
["source","java",subs="attributes,callouts,macros"]
|
||||||
--------------------------------------------------
|
--------------------------------------------------
|
||||||
|
|
|
@ -11,3 +11,10 @@ header, e.g. `client.index(indexRequest)` becomes
|
||||||
In case you are specifying headers
|
In case you are specifying headers
|
||||||
e.g. `client.index(indexRequest, new Header("name" "value"))` becomes
|
e.g. `client.index(indexRequest, new Header("name" "value"))` becomes
|
||||||
`client.index(indexRequest, RequestOptions.DEFAULT.toBuilder().addHeader("name", "value").build());`
|
`client.index(indexRequest, RequestOptions.DEFAULT.toBuilder().addHeader("name", "value").build());`
|
||||||
|
|
||||||
|
==== Cluster Health API default to `cluster` level
|
||||||
|
|
||||||
|
The Cluster Health API used to default to `shards` level to ease migration
|
||||||
|
from transport client that doesn't support the `level` parameter and always
|
||||||
|
returns information including indices and shards details. The level default
|
||||||
|
value has been aligned with the Elasticsearch default level: `cluster`.
|
|
@ -48,9 +48,9 @@ public class ClusterHealthRequest extends MasterNodeReadRequest<ClusterHealthReq
|
||||||
private Priority waitForEvents = null;
|
private Priority waitForEvents = null;
|
||||||
/**
|
/**
|
||||||
* Only used by the high-level REST Client. Controls the details level of the health information returned.
|
* Only used by the high-level REST Client. Controls the details level of the health information returned.
|
||||||
* The default value is 'shards' so it is backward compatible with the transport client behaviour.
|
* The default value is 'cluster'.
|
||||||
*/
|
*/
|
||||||
private Level level = Level.SHARDS;
|
private Level level = Level.CLUSTER;
|
||||||
|
|
||||||
public ClusterHealthRequest() {
|
public ClusterHealthRequest() {
|
||||||
}
|
}
|
||||||
|
@ -250,8 +250,7 @@ public class ClusterHealthRequest extends MasterNodeReadRequest<ClusterHealthReq
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Set the level of detail for the health information to be returned.
|
* Set the level of detail for the health information to be returned.
|
||||||
* Only used by the high-level REST Client
|
* Only used by the high-level REST Client.
|
||||||
* The default value is 'shards' so it is backward compatible with the transport client behaviour.
|
|
||||||
*/
|
*/
|
||||||
public void level(Level level) {
|
public void level(Level level) {
|
||||||
this.level = Objects.requireNonNull(level, "level must not be null");
|
this.level = Objects.requireNonNull(level, "level must not be null");
|
||||||
|
@ -260,7 +259,6 @@ public class ClusterHealthRequest extends MasterNodeReadRequest<ClusterHealthReq
|
||||||
/**
|
/**
|
||||||
* Get the level of detail for the health information to be returned.
|
* Get the level of detail for the health information to be returned.
|
||||||
* Only used by the high-level REST Client.
|
* Only used by the high-level REST Client.
|
||||||
* The default value is 'shards' so it is backward compatible with the transport client behaviour.
|
|
||||||
*/
|
*/
|
||||||
public Level level() {
|
public Level level() {
|
||||||
return level;
|
return level;
|
||||||
|
|
|
@ -82,8 +82,10 @@ public class MetaDataUpdateSettingsService extends AbstractComponent {
|
||||||
Settings.Builder settingsForOpenIndices = Settings.builder();
|
Settings.Builder settingsForOpenIndices = Settings.builder();
|
||||||
final Set<String> skippedSettings = new HashSet<>();
|
final Set<String> skippedSettings = new HashSet<>();
|
||||||
|
|
||||||
indexScopedSettings.validate(normalizedSettings.filter(s -> Regex.isSimpleMatchPattern(s) == false /* don't validate wildcards */),
|
indexScopedSettings.validate(
|
||||||
false); //don't validate dependencies here we check it below never allow to change the number of shards
|
normalizedSettings.filter(s -> Regex.isSimpleMatchPattern(s) == false), // don't validate wildcards
|
||||||
|
false, // don't validate dependencies here we check it below never allow to change the number of shards
|
||||||
|
true); // validate internal index settings
|
||||||
for (String key : normalizedSettings.keySet()) {
|
for (String key : normalizedSettings.keySet()) {
|
||||||
Setting setting = indexScopedSettings.get(key);
|
Setting setting = indexScopedSettings.get(key);
|
||||||
boolean isWildcard = setting == null && Regex.isSimpleMatchPattern(key);
|
boolean isWildcard = setting == null && Regex.isSimpleMatchPattern(key);
|
||||||
|
|
|
@ -282,6 +282,18 @@ public abstract class AbstractScopedSettings extends AbstractComponent {
|
||||||
validate(settings, validateDependencies, false, false);
|
validate(settings, validateDependencies, false, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Validates that all settings are registered and valid.
|
||||||
|
*
|
||||||
|
* @param settings the settings to validate
|
||||||
|
* @param validateDependencies true if dependent settings should be validated
|
||||||
|
* @param validateInternalIndex true if internal index settings should be validated
|
||||||
|
* @see Setting#getSettingsDependencies(String)
|
||||||
|
*/
|
||||||
|
public final void validate(final Settings settings, final boolean validateDependencies, final boolean validateInternalIndex) {
|
||||||
|
validate(settings, validateDependencies, false, false, validateInternalIndex);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Validates that all settings are registered and valid.
|
* Validates that all settings are registered and valid.
|
||||||
*
|
*
|
||||||
|
@ -296,6 +308,25 @@ public abstract class AbstractScopedSettings extends AbstractComponent {
|
||||||
final boolean validateDependencies,
|
final boolean validateDependencies,
|
||||||
final boolean ignorePrivateSettings,
|
final boolean ignorePrivateSettings,
|
||||||
final boolean ignoreArchivedSettings) {
|
final boolean ignoreArchivedSettings) {
|
||||||
|
validate(settings, validateDependencies, ignorePrivateSettings, ignoreArchivedSettings, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Validates that all settings are registered and valid.
|
||||||
|
*
|
||||||
|
* @param settings the settings
|
||||||
|
* @param validateDependencies true if dependent settings should be validated
|
||||||
|
* @param ignorePrivateSettings true if private settings should be ignored during validation
|
||||||
|
* @param ignoreArchivedSettings true if archived settings should be ignored during validation
|
||||||
|
* @param validateInternalIndex true if index internal settings should be validated
|
||||||
|
* @see Setting#getSettingsDependencies(String)
|
||||||
|
*/
|
||||||
|
public final void validate(
|
||||||
|
final Settings settings,
|
||||||
|
final boolean validateDependencies,
|
||||||
|
final boolean ignorePrivateSettings,
|
||||||
|
final boolean ignoreArchivedSettings,
|
||||||
|
final boolean validateInternalIndex) {
|
||||||
final List<RuntimeException> exceptions = new ArrayList<>();
|
final List<RuntimeException> exceptions = new ArrayList<>();
|
||||||
for (final String key : settings.keySet()) { // settings iterate in deterministic fashion
|
for (final String key : settings.keySet()) { // settings iterate in deterministic fashion
|
||||||
if (isPrivateSetting(key) && ignorePrivateSettings) {
|
if (isPrivateSetting(key) && ignorePrivateSettings) {
|
||||||
|
@ -305,7 +336,7 @@ public abstract class AbstractScopedSettings extends AbstractComponent {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
validate(key, settings, validateDependencies);
|
validate(key, settings, validateDependencies, validateInternalIndex);
|
||||||
} catch (final RuntimeException ex) {
|
} catch (final RuntimeException ex) {
|
||||||
exceptions.add(ex);
|
exceptions.add(ex);
|
||||||
}
|
}
|
||||||
|
@ -314,9 +345,27 @@ public abstract class AbstractScopedSettings extends AbstractComponent {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Validates that the setting is valid
|
* Validates that the settings is valid.
|
||||||
|
*
|
||||||
|
* @param key the key of the setting to validate
|
||||||
|
* @param settings the settings
|
||||||
|
* @param validateDependencies true if dependent settings should be validated
|
||||||
|
* @throws IllegalArgumentException if the setting is invalid
|
||||||
*/
|
*/
|
||||||
void validate(String key, Settings settings, boolean validateDependencies) {
|
void validate(final String key, final Settings settings, final boolean validateDependencies) {
|
||||||
|
validate(key, settings, validateDependencies, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Validates that the settings is valid.
|
||||||
|
*
|
||||||
|
* @param key the key of the setting to validate
|
||||||
|
* @param settings the settings
|
||||||
|
* @param validateDependencies true if dependent settings should be validated
|
||||||
|
* @param validateInternalIndex true if internal index settings should be validated
|
||||||
|
* @throws IllegalArgumentException if the setting is invalid
|
||||||
|
*/
|
||||||
|
void validate(final String key, final Settings settings, final boolean validateDependencies, final boolean validateInternalIndex) {
|
||||||
Setting setting = getRaw(key);
|
Setting setting = getRaw(key);
|
||||||
if (setting == null) {
|
if (setting == null) {
|
||||||
LevensteinDistance ld = new LevensteinDistance();
|
LevensteinDistance ld = new LevensteinDistance();
|
||||||
|
@ -356,6 +405,11 @@ public abstract class AbstractScopedSettings extends AbstractComponent {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// the only time that validateInternalIndex should be true is if this call is coming via the update settings API
|
||||||
|
if (validateInternalIndex && setting.getProperties().contains(Setting.Property.InternalIndex)) {
|
||||||
|
throw new IllegalArgumentException(
|
||||||
|
"can not update internal setting [" + setting.getKey() + "]; this setting is managed via a dedicated API");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
setting.get(settings);
|
setting.get(settings);
|
||||||
}
|
}
|
||||||
|
|
|
@ -120,7 +120,13 @@ public class Setting<T> implements ToXContentObject {
|
||||||
* Mark this setting as not copyable during an index resize (shrink or split). This property can only be applied to settings that
|
* Mark this setting as not copyable during an index resize (shrink or split). This property can only be applied to settings that
|
||||||
* also have {@link Property#IndexScope}.
|
* also have {@link Property#IndexScope}.
|
||||||
*/
|
*/
|
||||||
NotCopyableOnResize
|
NotCopyableOnResize,
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Indicates an index-level setting that is managed internally. Such a setting can only be added to an index on index creation but
|
||||||
|
* can not be updated via the update API.
|
||||||
|
*/
|
||||||
|
InternalIndex
|
||||||
}
|
}
|
||||||
|
|
||||||
private final Key key;
|
private final Key key;
|
||||||
|
@ -152,14 +158,18 @@ public class Setting<T> implements ToXContentObject {
|
||||||
if (propertiesAsSet.contains(Property.Dynamic) && propertiesAsSet.contains(Property.Final)) {
|
if (propertiesAsSet.contains(Property.Dynamic) && propertiesAsSet.contains(Property.Final)) {
|
||||||
throw new IllegalArgumentException("final setting [" + key + "] cannot be dynamic");
|
throw new IllegalArgumentException("final setting [" + key + "] cannot be dynamic");
|
||||||
}
|
}
|
||||||
if (propertiesAsSet.contains(Property.NotCopyableOnResize) && propertiesAsSet.contains(Property.IndexScope) == false) {
|
checkPropertyRequiresIndexScope(propertiesAsSet, Property.NotCopyableOnResize);
|
||||||
throw new IllegalArgumentException(
|
checkPropertyRequiresIndexScope(propertiesAsSet, Property.InternalIndex);
|
||||||
"non-index-scoped setting [" + key + "] can not have property [" + Property.NotCopyableOnResize + "]");
|
|
||||||
}
|
|
||||||
this.properties = propertiesAsSet;
|
this.properties = propertiesAsSet;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void checkPropertyRequiresIndexScope(final EnumSet<Property> properties, final Property property) {
|
||||||
|
if (properties.contains(property) && properties.contains(Property.IndexScope) == false) {
|
||||||
|
throw new IllegalArgumentException("non-index-scoped setting [" + key + "] can not have property [" + property + "]");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a new Setting instance
|
* Creates a new Setting instance
|
||||||
* @param key the settings key for this setting.
|
* @param key the settings key for this setting.
|
||||||
|
|
|
@ -876,4 +876,28 @@ public class ScopedSettingsTests extends ESTestCase {
|
||||||
Settings.builder().put(currentSettings), Settings.builder(), "node"));
|
Settings.builder().put(currentSettings), Settings.builder(), "node"));
|
||||||
assertThat(exc.getMessage(), containsString("final node setting [some.final.group.foo]"));
|
assertThat(exc.getMessage(), containsString("final node setting [some.final.group.foo]"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testInternalIndexSettingsFailsValidation() {
|
||||||
|
final Setting<String> indexInternalSetting = Setting.simpleString("index.internal", Property.InternalIndex, Property.IndexScope);
|
||||||
|
final IndexScopedSettings indexScopedSettings =
|
||||||
|
new IndexScopedSettings(Settings.EMPTY, Collections.singleton(indexInternalSetting));
|
||||||
|
final IllegalArgumentException e = expectThrows(
|
||||||
|
IllegalArgumentException.class,
|
||||||
|
() -> {
|
||||||
|
final Settings settings = Settings.builder().put("index.internal", "internal").build();
|
||||||
|
indexScopedSettings.validate(settings, false, /* validateInternalIndex */ true);
|
||||||
|
});
|
||||||
|
final String message = "can not update internal setting [index.internal]; this setting is managed via a dedicated API";
|
||||||
|
assertThat(e, hasToString(containsString(message)));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testInternalIndexSettingsSkipValidation() {
|
||||||
|
final Setting<String> internalIndexSetting = Setting.simpleString("index.internal", Property.InternalIndex, Property.IndexScope);
|
||||||
|
final IndexScopedSettings indexScopedSettings =
|
||||||
|
new IndexScopedSettings(Settings.EMPTY, Collections.singleton(internalIndexSetting));
|
||||||
|
// nothing should happen, validation should not throw an exception
|
||||||
|
final Settings settings = Settings.builder().put("index.internal", "internal").build();
|
||||||
|
indexScopedSettings.validate(settings, false, /* validateInternalIndex */ false);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -735,6 +735,13 @@ public class SettingTests extends ESTestCase {
|
||||||
assertThat(e, hasToString(containsString("non-index-scoped setting [foo.bar] can not have property [NotCopyableOnResize]")));
|
assertThat(e, hasToString(containsString("non-index-scoped setting [foo.bar] can not have property [NotCopyableOnResize]")));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testRejectNonIndexScopedIndexInternalSetting() {
|
||||||
|
final IllegalArgumentException e = expectThrows(
|
||||||
|
IllegalArgumentException.class,
|
||||||
|
() -> Setting.simpleString("foo.bar", Property.InternalIndex));
|
||||||
|
assertThat(e, hasToString(containsString("non-index-scoped setting [foo.bar] can not have property [InternalIndex]")));
|
||||||
|
}
|
||||||
|
|
||||||
public void testTimeValue() {
|
public void testTimeValue() {
|
||||||
final TimeValue random = TimeValue.parseTimeValue(randomTimeValue(), "test");
|
final TimeValue random = TimeValue.parseTimeValue(randomTimeValue(), "test");
|
||||||
|
|
||||||
|
|
|
@ -19,19 +19,40 @@
|
||||||
|
|
||||||
package org.elasticsearch.indices.settings;
|
package org.elasticsearch.indices.settings;
|
||||||
|
|
||||||
|
import org.elasticsearch.action.Action;
|
||||||
|
import org.elasticsearch.action.ActionListener;
|
||||||
|
import org.elasticsearch.action.ActionRequest;
|
||||||
|
import org.elasticsearch.action.ActionRequestValidationException;
|
||||||
|
import org.elasticsearch.action.ActionResponse;
|
||||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
|
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
|
||||||
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
|
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
|
||||||
|
import org.elasticsearch.action.support.ActionFilters;
|
||||||
|
import org.elasticsearch.action.support.master.MasterNodeRequest;
|
||||||
|
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
|
||||||
|
import org.elasticsearch.cluster.ClusterState;
|
||||||
|
import org.elasticsearch.cluster.ClusterStateUpdateTask;
|
||||||
|
import org.elasticsearch.cluster.block.ClusterBlockException;
|
||||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||||
|
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||||
|
import org.elasticsearch.cluster.metadata.MetaData;
|
||||||
|
import org.elasticsearch.cluster.service.ClusterService;
|
||||||
import org.elasticsearch.common.Priority;
|
import org.elasticsearch.common.Priority;
|
||||||
|
import org.elasticsearch.common.inject.Inject;
|
||||||
|
import org.elasticsearch.common.io.stream.StreamInput;
|
||||||
|
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||||
import org.elasticsearch.common.settings.Setting;
|
import org.elasticsearch.common.settings.Setting;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.index.IndexModule;
|
import org.elasticsearch.index.IndexModule;
|
||||||
import org.elasticsearch.index.IndexService;
|
import org.elasticsearch.index.IndexService;
|
||||||
import org.elasticsearch.index.engine.VersionConflictEngineException;
|
import org.elasticsearch.index.engine.VersionConflictEngineException;
|
||||||
import org.elasticsearch.indices.IndicesService;
|
import org.elasticsearch.indices.IndicesService;
|
||||||
|
import org.elasticsearch.plugins.ActionPlugin;
|
||||||
import org.elasticsearch.plugins.Plugin;
|
import org.elasticsearch.plugins.Plugin;
|
||||||
import org.elasticsearch.test.ESIntegTestCase;
|
import org.elasticsearch.test.ESIntegTestCase;
|
||||||
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
|
import org.elasticsearch.transport.TransportService;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
@ -46,6 +67,7 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertBloc
|
||||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertThrows;
|
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertThrows;
|
||||||
import static org.hamcrest.Matchers.containsString;
|
import static org.hamcrest.Matchers.containsString;
|
||||||
import static org.hamcrest.Matchers.equalTo;
|
import static org.hamcrest.Matchers.equalTo;
|
||||||
|
import static org.hamcrest.Matchers.hasToString;
|
||||||
import static org.hamcrest.Matchers.nullValue;
|
import static org.hamcrest.Matchers.nullValue;
|
||||||
|
|
||||||
public class UpdateSettingsIT extends ESIntegTestCase {
|
public class UpdateSettingsIT extends ESIntegTestCase {
|
||||||
|
@ -79,7 +101,12 @@ public class UpdateSettingsIT extends ESIntegTestCase {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected Collection<Class<? extends Plugin>> nodePlugins() {
|
protected Collection<Class<? extends Plugin>> nodePlugins() {
|
||||||
return Arrays.asList(DummySettingPlugin.class, FinalSettingPlugin.class);
|
return Arrays.asList(DummySettingPlugin.class, FinalSettingPlugin.class, InternalIndexSettingsPlugin.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected Collection<Class<? extends Plugin>> transportClientPlugins() {
|
||||||
|
return Collections.singletonList(InternalIndexSettingsPlugin.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class DummySettingPlugin extends Plugin {
|
public static class DummySettingPlugin extends Plugin {
|
||||||
|
@ -124,6 +151,151 @@ public class UpdateSettingsIT extends ESIntegTestCase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static class InternalIndexSettingsPlugin extends Plugin implements ActionPlugin {
|
||||||
|
|
||||||
|
public static final Setting<String> INDEX_INTERNAL_SETTING =
|
||||||
|
Setting.simpleString("index.internal", Setting.Property.IndexScope, Setting.Property.InternalIndex);
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<Setting<?>> getSettings() {
|
||||||
|
return Collections.singletonList(INDEX_INTERNAL_SETTING);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class UpdateInternalIndexAction
|
||||||
|
extends Action<UpdateInternalIndexAction.Request, UpdateInternalIndexAction.Response> {
|
||||||
|
|
||||||
|
private static final UpdateInternalIndexAction INSTANCE = new UpdateInternalIndexAction();
|
||||||
|
private static final String NAME = "indices:admin/settings/update-internal-index";
|
||||||
|
|
||||||
|
public UpdateInternalIndexAction() {
|
||||||
|
super(NAME);
|
||||||
|
}
|
||||||
|
|
||||||
|
static class Request extends MasterNodeRequest<Request> {
|
||||||
|
|
||||||
|
private String index;
|
||||||
|
private String value;
|
||||||
|
|
||||||
|
Request() {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
Request(final String index, final String value) {
|
||||||
|
this.index = index;
|
||||||
|
this.value = value;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ActionRequestValidationException validate() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void readFrom(final StreamInput in) throws IOException {
|
||||||
|
super.readFrom(in);
|
||||||
|
index = in.readString();
|
||||||
|
value = in.readString();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void writeTo(final StreamOutput out) throws IOException {
|
||||||
|
super.writeTo(out);
|
||||||
|
out.writeString(index);
|
||||||
|
out.writeString(value);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
static class Response extends ActionResponse {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Response newResponse() {
|
||||||
|
return new Response();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class TransportUpdateInternalIndexAction
|
||||||
|
extends TransportMasterNodeAction<UpdateInternalIndexAction.Request, UpdateInternalIndexAction.Response> {
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
public TransportUpdateInternalIndexAction(
|
||||||
|
final Settings settings,
|
||||||
|
final TransportService transportService,
|
||||||
|
final ClusterService clusterService,
|
||||||
|
final ThreadPool threadPool,
|
||||||
|
final ActionFilters actionFilters,
|
||||||
|
final IndexNameExpressionResolver indexNameExpressionResolver) {
|
||||||
|
super(
|
||||||
|
settings,
|
||||||
|
UpdateInternalIndexAction.NAME,
|
||||||
|
transportService,
|
||||||
|
clusterService,
|
||||||
|
threadPool,
|
||||||
|
actionFilters,
|
||||||
|
indexNameExpressionResolver,
|
||||||
|
UpdateInternalIndexAction.Request::new);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected String executor() {
|
||||||
|
return ThreadPool.Names.SAME;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected UpdateInternalIndexAction.Response newResponse() {
|
||||||
|
return new UpdateInternalIndexAction.Response();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void masterOperation(
|
||||||
|
final UpdateInternalIndexAction.Request request,
|
||||||
|
final ClusterState state,
|
||||||
|
final ActionListener<UpdateInternalIndexAction.Response> listener) throws Exception {
|
||||||
|
clusterService.submitStateUpdateTask("update-index-internal", new ClusterStateUpdateTask() {
|
||||||
|
@Override
|
||||||
|
public ClusterState execute(final ClusterState currentState) throws Exception {
|
||||||
|
final MetaData.Builder builder = MetaData.builder(currentState.metaData());
|
||||||
|
final IndexMetaData.Builder imdBuilder = IndexMetaData.builder(currentState.metaData().index(request.index));
|
||||||
|
final Settings.Builder settingsBuilder =
|
||||||
|
Settings.builder()
|
||||||
|
.put(currentState.metaData().index(request.index).getSettings())
|
||||||
|
.put("index.internal", request.value);
|
||||||
|
imdBuilder.settings(settingsBuilder);
|
||||||
|
builder.put(imdBuilder.build(), true);
|
||||||
|
return ClusterState.builder(currentState).metaData(builder).build();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void clusterStateProcessed(final String source, final ClusterState oldState, final ClusterState newState) {
|
||||||
|
listener.onResponse(new UpdateInternalIndexAction.Response());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onFailure(final String source, final Exception e) {
|
||||||
|
listener.onFailure(e);
|
||||||
|
}
|
||||||
|
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected ClusterBlockException checkBlock(UpdateInternalIndexAction.Request request, ClusterState state) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {
|
||||||
|
return Collections.singletonList(
|
||||||
|
new ActionHandler<>(UpdateInternalIndexAction.INSTANCE, TransportUpdateInternalIndexAction.class));
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
public void testUpdateDependentClusterSettings() {
|
public void testUpdateDependentClusterSettings() {
|
||||||
IllegalArgumentException iae = expectThrows(IllegalArgumentException.class, () ->
|
IllegalArgumentException iae = expectThrows(IllegalArgumentException.class, () ->
|
||||||
client().admin().cluster().prepareUpdateSettings().setPersistentSettings(Settings.builder()
|
client().admin().cluster().prepareUpdateSettings().setPersistentSettings(Settings.builder()
|
||||||
|
@ -474,4 +646,35 @@ public class UpdateSettingsIT extends ESIntegTestCase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testUpdateInternalIndexSettingViaSettingsAPI() {
|
||||||
|
final Settings settings = Settings.builder().put("index.internal", "internal").build();
|
||||||
|
createIndex("test", settings);
|
||||||
|
final GetSettingsResponse response = client().admin().indices().prepareGetSettings("test").get();
|
||||||
|
assertThat(response.getSetting("test", "index.internal"), equalTo("internal"));
|
||||||
|
// we can not update the setting via the update settings API
|
||||||
|
final IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
|
||||||
|
() -> client().admin()
|
||||||
|
.indices()
|
||||||
|
.prepareUpdateSettings("test")
|
||||||
|
.setSettings(Settings.builder().put("index.internal", "internal-update"))
|
||||||
|
.get());
|
||||||
|
final String message = "can not update internal setting [index.internal]; this setting is managed via a dedicated API";
|
||||||
|
assertThat(e, hasToString(containsString(message)));
|
||||||
|
final GetSettingsResponse responseAfterAttemptedUpdate = client().admin().indices().prepareGetSettings("test").get();
|
||||||
|
assertThat(responseAfterAttemptedUpdate.getSetting("test", "index.internal"), equalTo("internal"));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testUpdateInternalIndexSettingViaDedicatedAPI() {
|
||||||
|
final Settings settings = Settings.builder().put("index.internal", "internal").build();
|
||||||
|
createIndex("test", settings);
|
||||||
|
final GetSettingsResponse response = client().admin().indices().prepareGetSettings("test").get();
|
||||||
|
assertThat(response.getSetting("test", "index.internal"), equalTo("internal"));
|
||||||
|
client().execute(
|
||||||
|
InternalIndexSettingsPlugin.UpdateInternalIndexAction.INSTANCE,
|
||||||
|
new InternalIndexSettingsPlugin.UpdateInternalIndexAction.Request("test", "internal-update"))
|
||||||
|
.actionGet();
|
||||||
|
final GetSettingsResponse responseAfterUpdate = client().admin().indices().prepareGetSettings("test").get();
|
||||||
|
assertThat(responseAfterUpdate.getSetting("test", "index.internal"), equalTo("internal-update"));
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,8 +19,6 @@
|
||||||
|
|
||||||
package org.elasticsearch.test.rest.yaml;
|
package org.elasticsearch.test.rest.yaml;
|
||||||
|
|
||||||
import org.elasticsearch.test.ESIntegTestCase;
|
|
||||||
|
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
|
@ -56,9 +54,6 @@ public final class Features {
|
||||||
*/
|
*/
|
||||||
public static boolean areAllSupported(List<String> features) {
|
public static boolean areAllSupported(List<String> features) {
|
||||||
for (String feature : features) {
|
for (String feature : features) {
|
||||||
if ("requires_replica".equals(feature) && ESIntegTestCase.cluster().numDataNodes() >= 2) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
if (!SUPPORTED.contains(feature)) {
|
if (!SUPPORTED.contains(feature)) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
|
@ -12,7 +12,7 @@ import org.elasticsearch.common.Strings;
|
||||||
import org.elasticsearch.common.io.stream.StreamInput;
|
import org.elasticsearch.common.io.stream.StreamInput;
|
||||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||||
import org.elasticsearch.common.io.stream.Writeable;
|
import org.elasticsearch.common.io.stream.Writeable;
|
||||||
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
|
import org.elasticsearch.common.xcontent.ObjectParser;
|
||||||
import org.elasticsearch.common.xcontent.ToXContentFragment;
|
import org.elasticsearch.common.xcontent.ToXContentFragment;
|
||||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||||
import org.elasticsearch.index.mapper.NumberFieldMapper;
|
import org.elasticsearch.index.mapper.NumberFieldMapper;
|
||||||
|
@ -75,12 +75,11 @@ public class MetricConfig implements Writeable, ToXContentFragment {
|
||||||
MAPPER_TYPES = types;
|
MAPPER_TYPES = types;
|
||||||
}
|
}
|
||||||
|
|
||||||
public static final ConstructingObjectParser<MetricConfig, Void> PARSER = new ConstructingObjectParser<>(
|
public static final ObjectParser<MetricConfig.Builder, Void> PARSER = new ObjectParser<>(NAME, MetricConfig.Builder::new);
|
||||||
NAME, a -> new MetricConfig((String)a[0], (List<String>) a[1]));
|
|
||||||
|
|
||||||
static {
|
static {
|
||||||
PARSER.declareString(ConstructingObjectParser.constructorArg(), FIELD);
|
PARSER.declareString(MetricConfig.Builder::setField, FIELD);
|
||||||
PARSER.declareStringArray(ConstructingObjectParser.constructorArg(), METRICS);
|
PARSER.declareStringArray(MetricConfig.Builder::setMetrics, METRICS);
|
||||||
}
|
}
|
||||||
|
|
||||||
MetricConfig(String name, List<String> metrics) {
|
MetricConfig(String name, List<String> metrics) {
|
||||||
|
|
|
@ -63,7 +63,7 @@ public class RollupJobConfig implements NamedWriteable, ToXContentObject {
|
||||||
static {
|
static {
|
||||||
PARSER.declareString(RollupJobConfig.Builder::setId, RollupField.ID);
|
PARSER.declareString(RollupJobConfig.Builder::setId, RollupField.ID);
|
||||||
PARSER.declareObject(RollupJobConfig.Builder::setGroupConfig, (p, c) -> GroupConfig.PARSER.apply(p,c).build(), GROUPS);
|
PARSER.declareObject(RollupJobConfig.Builder::setGroupConfig, (p, c) -> GroupConfig.PARSER.apply(p,c).build(), GROUPS);
|
||||||
PARSER.declareObjectArray(RollupJobConfig.Builder::setMetricsConfig, MetricConfig.PARSER, METRICS);
|
PARSER.declareObjectArray(RollupJobConfig.Builder::setMetricsConfig, (p, c) -> MetricConfig.PARSER.apply(p, c).build(), METRICS);
|
||||||
PARSER.declareString((params, val) ->
|
PARSER.declareString((params, val) ->
|
||||||
params.setTimeout(TimeValue.parseTimeValue(val, TIMEOUT.getPreferredName())), TIMEOUT);
|
params.setTimeout(TimeValue.parseTimeValue(val, TIMEOUT.getPreferredName())), TIMEOUT);
|
||||||
PARSER.declareString(RollupJobConfig.Builder::setIndexPattern, INDEX_PATTERN);
|
PARSER.declareString(RollupJobConfig.Builder::setIndexPattern, INDEX_PATTERN);
|
||||||
|
|
|
@ -40,7 +40,6 @@ import org.joda.time.DateTimeZone;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.TimeZone;
|
import java.util.TimeZone;
|
||||||
|
@ -193,11 +192,11 @@ public class DatafeedConfigTests extends AbstractSerializingTestCase<DatafeedCon
|
||||||
|
|
||||||
public void testDefaultQueryDelay() {
|
public void testDefaultQueryDelay() {
|
||||||
DatafeedConfig.Builder feedBuilder1 = new DatafeedConfig.Builder("datafeed1", "job1");
|
DatafeedConfig.Builder feedBuilder1 = new DatafeedConfig.Builder("datafeed1", "job1");
|
||||||
feedBuilder1.setIndices(Arrays.asList("foo"));
|
feedBuilder1.setIndices(Collections.singletonList("foo"));
|
||||||
DatafeedConfig.Builder feedBuilder2 = new DatafeedConfig.Builder("datafeed2", "job1");
|
DatafeedConfig.Builder feedBuilder2 = new DatafeedConfig.Builder("datafeed2", "job1");
|
||||||
feedBuilder2.setIndices(Arrays.asList("foo"));
|
feedBuilder2.setIndices(Collections.singletonList("foo"));
|
||||||
DatafeedConfig.Builder feedBuilder3 = new DatafeedConfig.Builder("datafeed3", "job2");
|
DatafeedConfig.Builder feedBuilder3 = new DatafeedConfig.Builder("datafeed3", "job2");
|
||||||
feedBuilder3.setIndices(Arrays.asList("foo"));
|
feedBuilder3.setIndices(Collections.singletonList("foo"));
|
||||||
DatafeedConfig feed1 = feedBuilder1.build();
|
DatafeedConfig feed1 = feedBuilder1.build();
|
||||||
DatafeedConfig feed2 = feedBuilder2.build();
|
DatafeedConfig feed2 = feedBuilder2.build();
|
||||||
DatafeedConfig feed3 = feedBuilder3.build();
|
DatafeedConfig feed3 = feedBuilder3.build();
|
||||||
|
@ -208,19 +207,19 @@ public class DatafeedConfigTests extends AbstractSerializingTestCase<DatafeedCon
|
||||||
assertThat(feed1.getQueryDelay(), not(equalTo(feed3.getQueryDelay())));
|
assertThat(feed1.getQueryDelay(), not(equalTo(feed3.getQueryDelay())));
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testCheckValid_GivenNullIndices() throws IOException {
|
public void testCheckValid_GivenNullIndices() {
|
||||||
DatafeedConfig.Builder conf = new DatafeedConfig.Builder("datafeed1", "job1");
|
DatafeedConfig.Builder conf = new DatafeedConfig.Builder("datafeed1", "job1");
|
||||||
expectThrows(IllegalArgumentException.class, () -> conf.setIndices(null));
|
expectThrows(IllegalArgumentException.class, () -> conf.setIndices(null));
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testCheckValid_GivenEmptyIndices() throws IOException {
|
public void testCheckValid_GivenEmptyIndices() {
|
||||||
DatafeedConfig.Builder conf = new DatafeedConfig.Builder("datafeed1", "job1");
|
DatafeedConfig.Builder conf = new DatafeedConfig.Builder("datafeed1", "job1");
|
||||||
conf.setIndices(Collections.emptyList());
|
conf.setIndices(Collections.emptyList());
|
||||||
ElasticsearchException e = ESTestCase.expectThrows(ElasticsearchException.class, conf::build);
|
ElasticsearchException e = ESTestCase.expectThrows(ElasticsearchException.class, conf::build);
|
||||||
assertEquals(Messages.getMessage(Messages.DATAFEED_CONFIG_INVALID_OPTION_VALUE, "indices", "[]"), e.getMessage());
|
assertEquals(Messages.getMessage(Messages.DATAFEED_CONFIG_INVALID_OPTION_VALUE, "indices", "[]"), e.getMessage());
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testCheckValid_GivenIndicesContainsOnlyNulls() throws IOException {
|
public void testCheckValid_GivenIndicesContainsOnlyNulls() {
|
||||||
List<String> indices = new ArrayList<>();
|
List<String> indices = new ArrayList<>();
|
||||||
indices.add(null);
|
indices.add(null);
|
||||||
indices.add(null);
|
indices.add(null);
|
||||||
|
@ -230,7 +229,7 @@ public class DatafeedConfigTests extends AbstractSerializingTestCase<DatafeedCon
|
||||||
assertEquals(Messages.getMessage(Messages.DATAFEED_CONFIG_INVALID_OPTION_VALUE, "indices", "[null, null]"), e.getMessage());
|
assertEquals(Messages.getMessage(Messages.DATAFEED_CONFIG_INVALID_OPTION_VALUE, "indices", "[null, null]"), e.getMessage());
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testCheckValid_GivenIndicesContainsOnlyEmptyStrings() throws IOException {
|
public void testCheckValid_GivenIndicesContainsOnlyEmptyStrings() {
|
||||||
List<String> indices = new ArrayList<>();
|
List<String> indices = new ArrayList<>();
|
||||||
indices.add("");
|
indices.add("");
|
||||||
indices.add("");
|
indices.add("");
|
||||||
|
@ -240,27 +239,27 @@ public class DatafeedConfigTests extends AbstractSerializingTestCase<DatafeedCon
|
||||||
assertEquals(Messages.getMessage(Messages.DATAFEED_CONFIG_INVALID_OPTION_VALUE, "indices", "[, ]"), e.getMessage());
|
assertEquals(Messages.getMessage(Messages.DATAFEED_CONFIG_INVALID_OPTION_VALUE, "indices", "[, ]"), e.getMessage());
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testCheckValid_GivenNegativeQueryDelay() throws IOException {
|
public void testCheckValid_GivenNegativeQueryDelay() {
|
||||||
DatafeedConfig.Builder conf = new DatafeedConfig.Builder("datafeed1", "job1");
|
DatafeedConfig.Builder conf = new DatafeedConfig.Builder("datafeed1", "job1");
|
||||||
IllegalArgumentException e = ESTestCase.expectThrows(IllegalArgumentException.class,
|
IllegalArgumentException e = ESTestCase.expectThrows(IllegalArgumentException.class,
|
||||||
() -> conf.setQueryDelay(TimeValue.timeValueMillis(-10)));
|
() -> conf.setQueryDelay(TimeValue.timeValueMillis(-10)));
|
||||||
assertEquals("query_delay cannot be less than 0. Value = -10", e.getMessage());
|
assertEquals("query_delay cannot be less than 0. Value = -10", e.getMessage());
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testCheckValid_GivenZeroFrequency() throws IOException {
|
public void testCheckValid_GivenZeroFrequency() {
|
||||||
DatafeedConfig.Builder conf = new DatafeedConfig.Builder("datafeed1", "job1");
|
DatafeedConfig.Builder conf = new DatafeedConfig.Builder("datafeed1", "job1");
|
||||||
IllegalArgumentException e = ESTestCase.expectThrows(IllegalArgumentException.class, () -> conf.setFrequency(TimeValue.ZERO));
|
IllegalArgumentException e = ESTestCase.expectThrows(IllegalArgumentException.class, () -> conf.setFrequency(TimeValue.ZERO));
|
||||||
assertEquals("frequency cannot be less or equal than 0. Value = 0s", e.getMessage());
|
assertEquals("frequency cannot be less or equal than 0. Value = 0s", e.getMessage());
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testCheckValid_GivenNegativeFrequency() throws IOException {
|
public void testCheckValid_GivenNegativeFrequency() {
|
||||||
DatafeedConfig.Builder conf = new DatafeedConfig.Builder("datafeed1", "job1");
|
DatafeedConfig.Builder conf = new DatafeedConfig.Builder("datafeed1", "job1");
|
||||||
IllegalArgumentException e = ESTestCase.expectThrows(IllegalArgumentException.class,
|
IllegalArgumentException e = ESTestCase.expectThrows(IllegalArgumentException.class,
|
||||||
() -> conf.setFrequency(TimeValue.timeValueMinutes(-1)));
|
() -> conf.setFrequency(TimeValue.timeValueMinutes(-1)));
|
||||||
assertEquals("frequency cannot be less or equal than 0. Value = -1", e.getMessage());
|
assertEquals("frequency cannot be less or equal than 0. Value = -1", e.getMessage());
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testCheckValid_GivenNegativeScrollSize() throws IOException {
|
public void testCheckValid_GivenNegativeScrollSize() {
|
||||||
DatafeedConfig.Builder conf = new DatafeedConfig.Builder("datafeed1", "job1");
|
DatafeedConfig.Builder conf = new DatafeedConfig.Builder("datafeed1", "job1");
|
||||||
ElasticsearchException e = ESTestCase.expectThrows(ElasticsearchException.class, () -> conf.setScrollSize(-1000));
|
ElasticsearchException e = ESTestCase.expectThrows(ElasticsearchException.class, () -> conf.setScrollSize(-1000));
|
||||||
assertEquals(Messages.getMessage(Messages.DATAFEED_CONFIG_INVALID_OPTION_VALUE, "scroll_size", -1000L), e.getMessage());
|
assertEquals(Messages.getMessage(Messages.DATAFEED_CONFIG_INVALID_OPTION_VALUE, "scroll_size", -1000L), e.getMessage());
|
||||||
|
@ -414,7 +413,7 @@ public class DatafeedConfigTests extends AbstractSerializingTestCase<DatafeedCon
|
||||||
|
|
||||||
public void testDefaultFrequency_GivenNoAggregations() {
|
public void testDefaultFrequency_GivenNoAggregations() {
|
||||||
DatafeedConfig.Builder datafeedBuilder = new DatafeedConfig.Builder("feed", "job");
|
DatafeedConfig.Builder datafeedBuilder = new DatafeedConfig.Builder("feed", "job");
|
||||||
datafeedBuilder.setIndices(Arrays.asList("my_index"));
|
datafeedBuilder.setIndices(Collections.singletonList("my_index"));
|
||||||
DatafeedConfig datafeed = datafeedBuilder.build();
|
DatafeedConfig datafeed = datafeedBuilder.build();
|
||||||
|
|
||||||
assertEquals(TimeValue.timeValueMinutes(1), datafeed.defaultFrequency(TimeValue.timeValueSeconds(1)));
|
assertEquals(TimeValue.timeValueMinutes(1), datafeed.defaultFrequency(TimeValue.timeValueSeconds(1)));
|
||||||
|
|
|
@ -24,7 +24,7 @@ import static org.mockito.Mockito.when;
|
||||||
public class MetricsConfigSerializingTests extends AbstractSerializingTestCase<MetricConfig> {
|
public class MetricsConfigSerializingTests extends AbstractSerializingTestCase<MetricConfig> {
|
||||||
@Override
|
@Override
|
||||||
protected MetricConfig doParseInstance(XContentParser parser) throws IOException {
|
protected MetricConfig doParseInstance(XContentParser parser) throws IOException {
|
||||||
return MetricConfig.PARSER.apply(parser, null);
|
return MetricConfig.PARSER.apply(parser, null).build();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -43,10 +43,12 @@ import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
|
||||||
import org.elasticsearch.persistent.PersistentTasksExecutor;
|
import org.elasticsearch.persistent.PersistentTasksExecutor;
|
||||||
import org.elasticsearch.persistent.PersistentTasksService;
|
import org.elasticsearch.persistent.PersistentTasksService;
|
||||||
import org.elasticsearch.xpack.ml.MachineLearning;
|
import org.elasticsearch.xpack.ml.MachineLearning;
|
||||||
|
import org.elasticsearch.xpack.ml.datafeed.MlRemoteLicenseChecker;
|
||||||
import org.elasticsearch.xpack.ml.datafeed.DatafeedManager;
|
import org.elasticsearch.xpack.ml.datafeed.DatafeedManager;
|
||||||
import org.elasticsearch.xpack.ml.datafeed.DatafeedNodeSelector;
|
import org.elasticsearch.xpack.ml.datafeed.DatafeedNodeSelector;
|
||||||
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;
|
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.function.Predicate;
|
import java.util.function.Predicate;
|
||||||
|
|
||||||
|
@ -111,23 +113,25 @@ public class TransportStartDatafeedAction extends TransportMasterNodeAction<Star
|
||||||
ActionListener<StartDatafeedAction.Response> listener) {
|
ActionListener<StartDatafeedAction.Response> listener) {
|
||||||
StartDatafeedAction.DatafeedParams params = request.getParams();
|
StartDatafeedAction.DatafeedParams params = request.getParams();
|
||||||
if (licenseState.isMachineLearningAllowed()) {
|
if (licenseState.isMachineLearningAllowed()) {
|
||||||
ActionListener<PersistentTasksCustomMetaData.PersistentTask<StartDatafeedAction.DatafeedParams>> finalListener =
|
|
||||||
new ActionListener<PersistentTasksCustomMetaData.PersistentTask<StartDatafeedAction.DatafeedParams>>() {
|
|
||||||
@Override
|
|
||||||
public void onResponse(PersistentTasksCustomMetaData.PersistentTask<StartDatafeedAction.DatafeedParams> persistentTask) {
|
|
||||||
waitForDatafeedStarted(persistentTask.getId(), params, listener);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
ActionListener<PersistentTasksCustomMetaData.PersistentTask<StartDatafeedAction.DatafeedParams>> waitForTaskListener =
|
||||||
public void onFailure(Exception e) {
|
new ActionListener<PersistentTasksCustomMetaData.PersistentTask<StartDatafeedAction.DatafeedParams>>() {
|
||||||
if (e instanceof ResourceAlreadyExistsException) {
|
@Override
|
||||||
logger.debug("datafeed already started", e);
|
public void onResponse(PersistentTasksCustomMetaData.PersistentTask<StartDatafeedAction.DatafeedParams>
|
||||||
e = new ElasticsearchStatusException("cannot start datafeed [" + params.getDatafeedId() +
|
persistentTask) {
|
||||||
"] because it has already been started", RestStatus.CONFLICT);
|
waitForDatafeedStarted(persistentTask.getId(), params, listener);
|
||||||
}
|
}
|
||||||
listener.onFailure(e);
|
|
||||||
}
|
@Override
|
||||||
};
|
public void onFailure(Exception e) {
|
||||||
|
if (e instanceof ResourceAlreadyExistsException) {
|
||||||
|
logger.debug("datafeed already started", e);
|
||||||
|
e = new ElasticsearchStatusException("cannot start datafeed [" + params.getDatafeedId() +
|
||||||
|
"] because it has already been started", RestStatus.CONFLICT);
|
||||||
|
}
|
||||||
|
listener.onFailure(e);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
// Verify data extractor factory can be created, then start persistent task
|
// Verify data extractor factory can be created, then start persistent task
|
||||||
MlMetadata mlMetadata = MlMetadata.getMlMetadata(state);
|
MlMetadata mlMetadata = MlMetadata.getMlMetadata(state);
|
||||||
|
@ -135,16 +139,39 @@ public class TransportStartDatafeedAction extends TransportMasterNodeAction<Star
|
||||||
validate(params.getDatafeedId(), mlMetadata, tasks);
|
validate(params.getDatafeedId(), mlMetadata, tasks);
|
||||||
DatafeedConfig datafeed = mlMetadata.getDatafeed(params.getDatafeedId());
|
DatafeedConfig datafeed = mlMetadata.getDatafeed(params.getDatafeedId());
|
||||||
Job job = mlMetadata.getJobs().get(datafeed.getJobId());
|
Job job = mlMetadata.getJobs().get(datafeed.getJobId());
|
||||||
DataExtractorFactory.create(client, datafeed, job, ActionListener.wrap(
|
|
||||||
dataExtractorFactory ->
|
if (MlRemoteLicenseChecker.containsRemoteIndex(datafeed.getIndices())) {
|
||||||
persistentTasksService.sendStartRequest(MLMetadataField.datafeedTaskId(params.getDatafeedId()),
|
MlRemoteLicenseChecker remoteLicenseChecker = new MlRemoteLicenseChecker(client);
|
||||||
StartDatafeedAction.TASK_NAME, params, finalListener)
|
remoteLicenseChecker.checkRemoteClusterLicenses(MlRemoteLicenseChecker.remoteClusterNames(datafeed.getIndices()),
|
||||||
, listener::onFailure));
|
ActionListener.wrap(
|
||||||
|
response -> {
|
||||||
|
if (response.isViolated()) {
|
||||||
|
listener.onFailure(createUnlicensedError(datafeed.getId(), response));
|
||||||
|
} else {
|
||||||
|
createDataExtractor(job, datafeed, params, waitForTaskListener);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
e -> listener.onFailure(createUnknownLicenseError(datafeed.getId(),
|
||||||
|
MlRemoteLicenseChecker.remoteIndices(datafeed.getIndices()), e))
|
||||||
|
));
|
||||||
|
} else {
|
||||||
|
createDataExtractor(job, datafeed, params, waitForTaskListener);
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
listener.onFailure(LicenseUtils.newComplianceException(XPackField.MACHINE_LEARNING));
|
listener.onFailure(LicenseUtils.newComplianceException(XPackField.MACHINE_LEARNING));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void createDataExtractor(Job job, DatafeedConfig datafeed, StartDatafeedAction.DatafeedParams params,
|
||||||
|
ActionListener<PersistentTasksCustomMetaData.PersistentTask<StartDatafeedAction.DatafeedParams>>
|
||||||
|
listener) {
|
||||||
|
DataExtractorFactory.create(client, datafeed, job, ActionListener.wrap(
|
||||||
|
dataExtractorFactory ->
|
||||||
|
persistentTasksService.sendStartRequest(MLMetadataField.datafeedTaskId(params.getDatafeedId()),
|
||||||
|
StartDatafeedAction.TASK_NAME, params, listener)
|
||||||
|
, listener::onFailure));
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected ClusterBlockException checkBlock(StartDatafeedAction.Request request, ClusterState state) {
|
protected ClusterBlockException checkBlock(StartDatafeedAction.Request request, ClusterState state) {
|
||||||
// We only delegate here to PersistentTasksService, but if there is a metadata writeblock,
|
// We only delegate here to PersistentTasksService, but if there is a metadata writeblock,
|
||||||
|
@ -158,28 +185,29 @@ public class TransportStartDatafeedAction extends TransportMasterNodeAction<Star
|
||||||
DatafeedPredicate predicate = new DatafeedPredicate();
|
DatafeedPredicate predicate = new DatafeedPredicate();
|
||||||
persistentTasksService.waitForPersistentTaskCondition(taskId, predicate, params.getTimeout(),
|
persistentTasksService.waitForPersistentTaskCondition(taskId, predicate, params.getTimeout(),
|
||||||
new PersistentTasksService.WaitForPersistentTaskListener<StartDatafeedAction.DatafeedParams>() {
|
new PersistentTasksService.WaitForPersistentTaskListener<StartDatafeedAction.DatafeedParams>() {
|
||||||
@Override
|
@Override
|
||||||
public void onResponse(PersistentTasksCustomMetaData.PersistentTask<StartDatafeedAction.DatafeedParams> persistentTask) {
|
public void onResponse(PersistentTasksCustomMetaData.PersistentTask<StartDatafeedAction.DatafeedParams>
|
||||||
if (predicate.exception != null) {
|
persistentTask) {
|
||||||
// We want to return to the caller without leaving an unassigned persistent task, to match
|
if (predicate.exception != null) {
|
||||||
// what would have happened if the error had been detected in the "fast fail" validation
|
// We want to return to the caller without leaving an unassigned persistent task, to match
|
||||||
cancelDatafeedStart(persistentTask, predicate.exception, listener);
|
// what would have happened if the error had been detected in the "fast fail" validation
|
||||||
} else {
|
cancelDatafeedStart(persistentTask, predicate.exception, listener);
|
||||||
listener.onResponse(new StartDatafeedAction.Response(true));
|
} else {
|
||||||
}
|
listener.onResponse(new StartDatafeedAction.Response(true));
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onFailure(Exception e) {
|
public void onFailure(Exception e) {
|
||||||
listener.onFailure(e);
|
listener.onFailure(e);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onTimeout(TimeValue timeout) {
|
public void onTimeout(TimeValue timeout) {
|
||||||
listener.onFailure(new ElasticsearchException("Starting datafeed ["
|
listener.onFailure(new ElasticsearchException("Starting datafeed ["
|
||||||
+ params.getDatafeedId() + "] timed out after [" + timeout + "]"));
|
+ params.getDatafeedId() + "] timed out after [" + timeout + "]"));
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
private void cancelDatafeedStart(PersistentTasksCustomMetaData.PersistentTask<StartDatafeedAction.DatafeedParams> persistentTask,
|
private void cancelDatafeedStart(PersistentTasksCustomMetaData.PersistentTask<StartDatafeedAction.DatafeedParams> persistentTask,
|
||||||
|
@ -203,6 +231,25 @@ public class TransportStartDatafeedAction extends TransportMasterNodeAction<Star
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private ElasticsearchStatusException createUnlicensedError(String datafeedId,
|
||||||
|
MlRemoteLicenseChecker.LicenseViolation licenseViolation) {
|
||||||
|
String message = "Cannot start datafeed [" + datafeedId + "] as it is configured to use "
|
||||||
|
+ "indices on a remote cluster [" + licenseViolation.get().getClusterName()
|
||||||
|
+ "] that is not licensed for Machine Learning. "
|
||||||
|
+ MlRemoteLicenseChecker.buildErrorMessage(licenseViolation.get());
|
||||||
|
|
||||||
|
return new ElasticsearchStatusException(message, RestStatus.BAD_REQUEST);
|
||||||
|
}
|
||||||
|
|
||||||
|
private ElasticsearchStatusException createUnknownLicenseError(String datafeedId, List<String> remoteIndices,
|
||||||
|
Exception cause) {
|
||||||
|
String message = "Cannot start datafeed [" + datafeedId + "] as it is configured to use"
|
||||||
|
+ " indices on a remote cluster " + remoteIndices
|
||||||
|
+ " but the license type could not be verified";
|
||||||
|
|
||||||
|
return new ElasticsearchStatusException(message, RestStatus.BAD_REQUEST, new Exception(cause.getMessage()));
|
||||||
|
}
|
||||||
|
|
||||||
public static class StartDatafeedPersistentTasksExecutor extends PersistentTasksExecutor<StartDatafeedAction.DatafeedParams> {
|
public static class StartDatafeedPersistentTasksExecutor extends PersistentTasksExecutor<StartDatafeedAction.DatafeedParams> {
|
||||||
private final DatafeedManager datafeedManager;
|
private final DatafeedManager datafeedManager;
|
||||||
private final IndexNameExpressionResolver resolver;
|
private final IndexNameExpressionResolver resolver;
|
||||||
|
|
|
@ -91,7 +91,7 @@ public class DatafeedNodeSelector {
|
||||||
List<String> indices = datafeed.getIndices();
|
List<String> indices = datafeed.getIndices();
|
||||||
for (String index : indices) {
|
for (String index : indices) {
|
||||||
|
|
||||||
if (isRemoteIndex(index)) {
|
if (MlRemoteLicenseChecker.isRemoteIndex(index)) {
|
||||||
// We cannot verify remote indices
|
// We cannot verify remote indices
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -122,10 +122,6 @@ public class DatafeedNodeSelector {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean isRemoteIndex(String index) {
|
|
||||||
return index.indexOf(':') != -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
private static class AssignmentFailure {
|
private static class AssignmentFailure {
|
||||||
private final String reason;
|
private final String reason;
|
||||||
private final boolean isCriticalForTaskCreation;
|
private final boolean isCriticalForTaskCreation;
|
||||||
|
|
|
@ -0,0 +1,192 @@
|
||||||
|
/*
|
||||||
|
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||||
|
* or more contributor license agreements. Licensed under the Elastic License;
|
||||||
|
* you may not use this file except in compliance with the Elastic License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.elasticsearch.xpack.ml.datafeed;
|
||||||
|
|
||||||
|
import org.elasticsearch.ElasticsearchException;
|
||||||
|
import org.elasticsearch.action.ActionListener;
|
||||||
|
import org.elasticsearch.client.Client;
|
||||||
|
import org.elasticsearch.common.Nullable;
|
||||||
|
import org.elasticsearch.common.Strings;
|
||||||
|
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
||||||
|
import org.elasticsearch.license.License;
|
||||||
|
import org.elasticsearch.license.XPackInfoResponse;
|
||||||
|
import org.elasticsearch.transport.ActionNotFoundTransportException;
|
||||||
|
import org.elasticsearch.transport.RemoteClusterAware;
|
||||||
|
import org.elasticsearch.xpack.core.action.XPackInfoAction;
|
||||||
|
import org.elasticsearch.xpack.core.action.XPackInfoRequest;
|
||||||
|
|
||||||
|
import java.util.EnumSet;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* ML datafeeds can use cross cluster search to access data in a remote cluster.
|
||||||
|
* The remote cluster should be licenced for ML this class performs that check
|
||||||
|
* using the _xpack (info) endpoint.
|
||||||
|
*/
|
||||||
|
public class MlRemoteLicenseChecker {
|
||||||
|
|
||||||
|
private final Client client;
|
||||||
|
|
||||||
|
public static class RemoteClusterLicenseInfo {
|
||||||
|
private final String clusterName;
|
||||||
|
private final XPackInfoResponse.LicenseInfo licenseInfo;
|
||||||
|
|
||||||
|
RemoteClusterLicenseInfo(String clusterName, XPackInfoResponse.LicenseInfo licenseInfo) {
|
||||||
|
this.clusterName = clusterName;
|
||||||
|
this.licenseInfo = licenseInfo;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getClusterName() {
|
||||||
|
return clusterName;
|
||||||
|
}
|
||||||
|
|
||||||
|
public XPackInfoResponse.LicenseInfo getLicenseInfo() {
|
||||||
|
return licenseInfo;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public class LicenseViolation {
|
||||||
|
private final RemoteClusterLicenseInfo licenseInfo;
|
||||||
|
|
||||||
|
private LicenseViolation(@Nullable RemoteClusterLicenseInfo licenseInfo) {
|
||||||
|
this.licenseInfo = licenseInfo;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isViolated() {
|
||||||
|
return licenseInfo != null;
|
||||||
|
}
|
||||||
|
|
||||||
|
public RemoteClusterLicenseInfo get() {
|
||||||
|
return licenseInfo;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public MlRemoteLicenseChecker(Client client) {
|
||||||
|
this.client = client;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check each cluster is licensed for ML.
|
||||||
|
* This function evaluates lazily and will terminate when the first cluster
|
||||||
|
* that is not licensed is found or an error occurs.
|
||||||
|
*
|
||||||
|
* @param clusterNames List of remote cluster names
|
||||||
|
* @param listener Response listener
|
||||||
|
*/
|
||||||
|
public void checkRemoteClusterLicenses(List<String> clusterNames, ActionListener<LicenseViolation> listener) {
|
||||||
|
final Iterator<String> itr = clusterNames.iterator();
|
||||||
|
if (itr.hasNext() == false) {
|
||||||
|
listener.onResponse(new LicenseViolation(null));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
final AtomicReference<String> clusterName = new AtomicReference<>(itr.next());
|
||||||
|
|
||||||
|
ActionListener<XPackInfoResponse> infoListener = new ActionListener<XPackInfoResponse>() {
|
||||||
|
@Override
|
||||||
|
public void onResponse(XPackInfoResponse xPackInfoResponse) {
|
||||||
|
if (licenseSupportsML(xPackInfoResponse.getLicenseInfo()) == false) {
|
||||||
|
listener.onResponse(new LicenseViolation(
|
||||||
|
new RemoteClusterLicenseInfo(clusterName.get(), xPackInfoResponse.getLicenseInfo())));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (itr.hasNext()) {
|
||||||
|
clusterName.set(itr.next());
|
||||||
|
remoteClusterLicense(clusterName.get(), this);
|
||||||
|
} else {
|
||||||
|
listener.onResponse(new LicenseViolation(null));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onFailure(Exception e) {
|
||||||
|
String message = "Could not determine the X-Pack licence type for cluster [" + clusterName.get() + "]";
|
||||||
|
if (e instanceof ActionNotFoundTransportException) {
|
||||||
|
// This is likely to be because x-pack is not installed in the target cluster
|
||||||
|
message += ". Is X-Pack installed on the target cluster?";
|
||||||
|
}
|
||||||
|
listener.onFailure(new ElasticsearchException(message, e));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
remoteClusterLicense(clusterName.get(), infoListener);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void remoteClusterLicense(String clusterName, ActionListener<XPackInfoResponse> listener) {
|
||||||
|
Client remoteClusterClient = client.getRemoteClusterClient(clusterName);
|
||||||
|
ThreadContext threadContext = remoteClusterClient.threadPool().getThreadContext();
|
||||||
|
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
|
||||||
|
// we stash any context here since this is an internal execution and should not leak any
|
||||||
|
// existing context information.
|
||||||
|
threadContext.markAsSystemContext();
|
||||||
|
|
||||||
|
XPackInfoRequest request = new XPackInfoRequest();
|
||||||
|
request.setCategories(EnumSet.of(XPackInfoRequest.Category.LICENSE));
|
||||||
|
remoteClusterClient.execute(XPackInfoAction.INSTANCE, request, listener);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static boolean licenseSupportsML(XPackInfoResponse.LicenseInfo licenseInfo) {
|
||||||
|
License.OperationMode mode = License.OperationMode.resolve(licenseInfo.getMode());
|
||||||
|
return licenseInfo.getStatus() == License.Status.ACTIVE &&
|
||||||
|
(mode == License.OperationMode.PLATINUM || mode == License.OperationMode.TRIAL);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static boolean isRemoteIndex(String index) {
|
||||||
|
return index.indexOf(RemoteClusterAware.REMOTE_CLUSTER_INDEX_SEPARATOR) != -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static boolean containsRemoteIndex(List<String> indices) {
|
||||||
|
return indices.stream().anyMatch(MlRemoteLicenseChecker::isRemoteIndex);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get any remote indices used in cross cluster search.
|
||||||
|
* Remote indices are of the form {@code cluster_name:index_name}
|
||||||
|
* @return List of remote cluster indices
|
||||||
|
*/
|
||||||
|
public static List<String> remoteIndices(List<String> indices) {
|
||||||
|
return indices.stream().filter(MlRemoteLicenseChecker::isRemoteIndex).collect(Collectors.toList());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Extract the list of remote cluster names from the list of indices.
|
||||||
|
* @param indices List of indices. Remote cluster indices are prefixed
|
||||||
|
* with {@code cluster-name:}
|
||||||
|
* @return Every cluster name found in {@code indices}
|
||||||
|
*/
|
||||||
|
public static List<String> remoteClusterNames(List<String> indices) {
|
||||||
|
return indices.stream()
|
||||||
|
.filter(MlRemoteLicenseChecker::isRemoteIndex)
|
||||||
|
.map(index -> index.substring(0, index.indexOf(RemoteClusterAware.REMOTE_CLUSTER_INDEX_SEPARATOR)))
|
||||||
|
.distinct()
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
}
|
||||||
|
|
||||||
|
public static String buildErrorMessage(RemoteClusterLicenseInfo clusterLicenseInfo) {
|
||||||
|
StringBuilder error = new StringBuilder();
|
||||||
|
if (clusterLicenseInfo.licenseInfo.getStatus() != License.Status.ACTIVE) {
|
||||||
|
error.append("The license on cluster [").append(clusterLicenseInfo.clusterName)
|
||||||
|
.append("] is not active. ");
|
||||||
|
} else {
|
||||||
|
License.OperationMode mode = License.OperationMode.resolve(clusterLicenseInfo.licenseInfo.getMode());
|
||||||
|
if (mode != License.OperationMode.PLATINUM && mode != License.OperationMode.TRIAL) {
|
||||||
|
error.append("The license mode [").append(mode)
|
||||||
|
.append("] on cluster [")
|
||||||
|
.append(clusterLicenseInfo.clusterName)
|
||||||
|
.append("] does not enable Machine Learning. ");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
error.append(Strings.toString(clusterLicenseInfo.licenseInfo));
|
||||||
|
return error.toString();
|
||||||
|
}
|
||||||
|
}
|
|
@ -117,7 +117,7 @@ public interface AutodetectProcess extends Closeable {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Ask the job to start persisting model state in the background
|
* Ask the job to start persisting model state in the background
|
||||||
* @throws IOException
|
* @throws IOException If writing the request fails
|
||||||
*/
|
*/
|
||||||
void persistJob() throws IOException;
|
void persistJob() throws IOException;
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,200 @@
|
||||||
|
/*
|
||||||
|
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||||
|
* or more contributor license agreements. Licensed under the Elastic License;
|
||||||
|
* you may not use this file except in compliance with the Elastic License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.elasticsearch.xpack.ml.datafeed;
|
||||||
|
|
||||||
|
import org.elasticsearch.action.ActionListener;
|
||||||
|
import org.elasticsearch.client.Client;
|
||||||
|
import org.elasticsearch.common.Strings;
|
||||||
|
import org.elasticsearch.common.settings.Settings;
|
||||||
|
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
||||||
|
import org.elasticsearch.license.License;
|
||||||
|
import org.elasticsearch.license.XPackInfoResponse;
|
||||||
|
import org.elasticsearch.test.ESTestCase;
|
||||||
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
|
import org.elasticsearch.xpack.core.action.XPackInfoAction;
|
||||||
|
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
|
import static org.hamcrest.Matchers.contains;
|
||||||
|
import static org.hamcrest.Matchers.containsInAnyOrder;
|
||||||
|
import static org.hamcrest.Matchers.empty;
|
||||||
|
import static org.hamcrest.Matchers.is;
|
||||||
|
import static org.mockito.Matchers.any;
|
||||||
|
import static org.mockito.Matchers.anyString;
|
||||||
|
import static org.mockito.Matchers.same;
|
||||||
|
import static org.mockito.Mockito.doAnswer;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.times;
|
||||||
|
import static org.mockito.Mockito.verify;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
public class MlRemoteLicenseCheckerTests extends ESTestCase {
|
||||||
|
|
||||||
|
public void testIsRemoteIndex() {
|
||||||
|
List<String> indices = Arrays.asList("local-index1", "local-index2");
|
||||||
|
assertFalse(MlRemoteLicenseChecker.containsRemoteIndex(indices));
|
||||||
|
indices = Arrays.asList("local-index1", "remote-cluster:remote-index2");
|
||||||
|
assertTrue(MlRemoteLicenseChecker.containsRemoteIndex(indices));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testRemoteIndices() {
|
||||||
|
List<String> indices = Collections.singletonList("local-index");
|
||||||
|
assertThat(MlRemoteLicenseChecker.remoteIndices(indices), is(empty()));
|
||||||
|
indices = Arrays.asList("local-index", "remote-cluster:index1", "local-index2", "remote-cluster2:index1");
|
||||||
|
assertThat(MlRemoteLicenseChecker.remoteIndices(indices), containsInAnyOrder("remote-cluster:index1", "remote-cluster2:index1"));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testRemoteClusterNames() {
|
||||||
|
List<String> indices = Arrays.asList("local-index1", "local-index2");
|
||||||
|
assertThat(MlRemoteLicenseChecker.remoteClusterNames(indices), empty());
|
||||||
|
indices = Arrays.asList("local-index1", "remote-cluster1:remote-index2");
|
||||||
|
assertThat(MlRemoteLicenseChecker.remoteClusterNames(indices), contains("remote-cluster1"));
|
||||||
|
indices = Arrays.asList("remote-cluster1:index2", "index1", "remote-cluster2:index1");
|
||||||
|
assertThat(MlRemoteLicenseChecker.remoteClusterNames(indices), contains("remote-cluster1", "remote-cluster2"));
|
||||||
|
indices = Arrays.asList("remote-cluster1:index2", "index1", "remote-cluster2:index1", "remote-cluster2:index2");
|
||||||
|
assertThat(MlRemoteLicenseChecker.remoteClusterNames(indices), contains("remote-cluster1", "remote-cluster2"));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testLicenseSupportsML() {
|
||||||
|
XPackInfoResponse.LicenseInfo licenseInfo = new XPackInfoResponse.LicenseInfo("uid", "trial", "trial",
|
||||||
|
License.Status.ACTIVE, randomNonNegativeLong());
|
||||||
|
assertTrue(MlRemoteLicenseChecker.licenseSupportsML(licenseInfo));
|
||||||
|
|
||||||
|
licenseInfo = new XPackInfoResponse.LicenseInfo("uid", "trial", "trial", License.Status.EXPIRED, randomNonNegativeLong());
|
||||||
|
assertFalse(MlRemoteLicenseChecker.licenseSupportsML(licenseInfo));
|
||||||
|
|
||||||
|
licenseInfo = new XPackInfoResponse.LicenseInfo("uid", "GOLD", "GOLD", License.Status.ACTIVE, randomNonNegativeLong());
|
||||||
|
assertFalse(MlRemoteLicenseChecker.licenseSupportsML(licenseInfo));
|
||||||
|
|
||||||
|
licenseInfo = new XPackInfoResponse.LicenseInfo("uid", "PLATINUM", "PLATINUM", License.Status.ACTIVE, randomNonNegativeLong());
|
||||||
|
assertTrue(MlRemoteLicenseChecker.licenseSupportsML(licenseInfo));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testCheckRemoteClusterLicenses_givenValidLicenses() {
|
||||||
|
final AtomicInteger index = new AtomicInteger(0);
|
||||||
|
final List<XPackInfoResponse> responses = new ArrayList<>();
|
||||||
|
|
||||||
|
Client client = createMockClient();
|
||||||
|
doAnswer(invocationMock -> {
|
||||||
|
@SuppressWarnings("raw_types")
|
||||||
|
ActionListener listener = (ActionListener) invocationMock.getArguments()[2];
|
||||||
|
listener.onResponse(responses.get(index.getAndIncrement()));
|
||||||
|
return null;
|
||||||
|
}).when(client).execute(same(XPackInfoAction.INSTANCE), any(), any());
|
||||||
|
|
||||||
|
|
||||||
|
List<String> remoteClusterNames = Arrays.asList("valid1", "valid2", "valid3");
|
||||||
|
responses.add(new XPackInfoResponse(null, createPlatinumLicenseResponse(), null));
|
||||||
|
responses.add(new XPackInfoResponse(null, createPlatinumLicenseResponse(), null));
|
||||||
|
responses.add(new XPackInfoResponse(null, createPlatinumLicenseResponse(), null));
|
||||||
|
|
||||||
|
MlRemoteLicenseChecker licenseChecker = new MlRemoteLicenseChecker(client);
|
||||||
|
AtomicReference<MlRemoteLicenseChecker.LicenseViolation> licCheckResponse = new AtomicReference<>();
|
||||||
|
|
||||||
|
licenseChecker.checkRemoteClusterLicenses(remoteClusterNames,
|
||||||
|
new ActionListener<MlRemoteLicenseChecker.LicenseViolation>() {
|
||||||
|
@Override
|
||||||
|
public void onResponse(MlRemoteLicenseChecker.LicenseViolation response) {
|
||||||
|
licCheckResponse.set(response);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onFailure(Exception e) {
|
||||||
|
fail(e.getMessage());
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
verify(client, times(3)).execute(same(XPackInfoAction.INSTANCE), any(), any());
|
||||||
|
assertNotNull(licCheckResponse.get());
|
||||||
|
assertFalse(licCheckResponse.get().isViolated());
|
||||||
|
assertNull(licCheckResponse.get().get());
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testCheckRemoteClusterLicenses_givenInvalidLicense() {
|
||||||
|
final AtomicInteger index = new AtomicInteger(0);
|
||||||
|
List<String> remoteClusterNames = Arrays.asList("good", "cluster-with-basic-license", "good2");
|
||||||
|
final List<XPackInfoResponse> responses = new ArrayList<>();
|
||||||
|
responses.add(new XPackInfoResponse(null, createPlatinumLicenseResponse(), null));
|
||||||
|
responses.add(new XPackInfoResponse(null, createBasicLicenseResponse(), null));
|
||||||
|
responses.add(new XPackInfoResponse(null, createPlatinumLicenseResponse(), null));
|
||||||
|
|
||||||
|
Client client = createMockClient();
|
||||||
|
doAnswer(invocationMock -> {
|
||||||
|
@SuppressWarnings("raw_types")
|
||||||
|
ActionListener listener = (ActionListener) invocationMock.getArguments()[2];
|
||||||
|
listener.onResponse(responses.get(index.getAndIncrement()));
|
||||||
|
return null;
|
||||||
|
}).when(client).execute(same(XPackInfoAction.INSTANCE), any(), any());
|
||||||
|
|
||||||
|
MlRemoteLicenseChecker licenseChecker = new MlRemoteLicenseChecker(client);
|
||||||
|
AtomicReference<MlRemoteLicenseChecker.LicenseViolation> licCheckResponse = new AtomicReference<>();
|
||||||
|
|
||||||
|
licenseChecker.checkRemoteClusterLicenses(remoteClusterNames,
|
||||||
|
new ActionListener<MlRemoteLicenseChecker.LicenseViolation>() {
|
||||||
|
@Override
|
||||||
|
public void onResponse(MlRemoteLicenseChecker.LicenseViolation response) {
|
||||||
|
licCheckResponse.set(response);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onFailure(Exception e) {
|
||||||
|
fail(e.getMessage());
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
verify(client, times(2)).execute(same(XPackInfoAction.INSTANCE), any(), any());
|
||||||
|
assertNotNull(licCheckResponse.get());
|
||||||
|
assertTrue(licCheckResponse.get().isViolated());
|
||||||
|
assertEquals("cluster-with-basic-license", licCheckResponse.get().get().getClusterName());
|
||||||
|
assertEquals("BASIC", licCheckResponse.get().get().getLicenseInfo().getType());
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testBuildErrorMessage() {
|
||||||
|
XPackInfoResponse.LicenseInfo platinumLicence = createPlatinumLicenseResponse();
|
||||||
|
MlRemoteLicenseChecker.RemoteClusterLicenseInfo info =
|
||||||
|
new MlRemoteLicenseChecker.RemoteClusterLicenseInfo("platinum-cluster", platinumLicence);
|
||||||
|
assertEquals(Strings.toString(platinumLicence), MlRemoteLicenseChecker.buildErrorMessage(info));
|
||||||
|
|
||||||
|
XPackInfoResponse.LicenseInfo basicLicense = createBasicLicenseResponse();
|
||||||
|
info = new MlRemoteLicenseChecker.RemoteClusterLicenseInfo("basic-cluster", basicLicense);
|
||||||
|
String expected = "The license mode [BASIC] on cluster [basic-cluster] does not enable Machine Learning. "
|
||||||
|
+ Strings.toString(basicLicense);
|
||||||
|
assertEquals(expected, MlRemoteLicenseChecker.buildErrorMessage(info));
|
||||||
|
|
||||||
|
XPackInfoResponse.LicenseInfo expiredLicense = createExpiredLicenseResponse();
|
||||||
|
info = new MlRemoteLicenseChecker.RemoteClusterLicenseInfo("expired-cluster", expiredLicense);
|
||||||
|
expected = "The license on cluster [expired-cluster] is not active. " + Strings.toString(expiredLicense);
|
||||||
|
assertEquals(expected, MlRemoteLicenseChecker.buildErrorMessage(info));
|
||||||
|
}
|
||||||
|
|
||||||
|
private Client createMockClient() {
|
||||||
|
Client client = mock(Client.class);
|
||||||
|
ThreadPool threadPool = mock(ThreadPool.class);
|
||||||
|
when(client.threadPool()).thenReturn(threadPool);
|
||||||
|
when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY));
|
||||||
|
when(client.getRemoteClusterClient(anyString())).thenReturn(client);
|
||||||
|
return client;
|
||||||
|
}
|
||||||
|
|
||||||
|
private XPackInfoResponse.LicenseInfo createPlatinumLicenseResponse() {
|
||||||
|
return new XPackInfoResponse.LicenseInfo("uid", "PLATINUM", "PLATINUM", License.Status.ACTIVE, randomNonNegativeLong());
|
||||||
|
}
|
||||||
|
|
||||||
|
private XPackInfoResponse.LicenseInfo createBasicLicenseResponse() {
|
||||||
|
return new XPackInfoResponse.LicenseInfo("uid", "BASIC", "BASIC", License.Status.ACTIVE, randomNonNegativeLong());
|
||||||
|
}
|
||||||
|
|
||||||
|
private XPackInfoResponse.LicenseInfo createExpiredLicenseResponse() {
|
||||||
|
return new XPackInfoResponse.LicenseInfo("uid", "PLATINUM", "PLATINUM", License.Status.EXPIRED, randomNonNegativeLong());
|
||||||
|
}
|
||||||
|
}
|
|
@ -188,3 +188,33 @@ setup:
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
---
|
||||||
|
"Unknown Metric":
|
||||||
|
|
||||||
|
- do:
|
||||||
|
catch: /Unsupported metric \[does_not_exist\]/
|
||||||
|
headers:
|
||||||
|
Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser
|
||||||
|
xpack.rollup.put_job:
|
||||||
|
id: foo
|
||||||
|
body: >
|
||||||
|
{
|
||||||
|
"index_pattern": "foo",
|
||||||
|
"rollup_index": "foo_rollup",
|
||||||
|
"cron": "*/30 * * * * ?",
|
||||||
|
"page_size" :10,
|
||||||
|
"groups" : {
|
||||||
|
"date_histogram": {
|
||||||
|
"field": "the_field",
|
||||||
|
"interval": "1h"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"metrics": [
|
||||||
|
{
|
||||||
|
"field": "value_field",
|
||||||
|
"metrics": ["min", "max", "sum", "does_not_exist"]
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue