Upgrade: Remove watcher/security upgrade checks (elastic/x-pack-elasticsearch#2338)
The checks are used for the 5.6 to 6.x transition, thus they do not make sense to keep in 7.x. Original commit: elastic/x-pack-elasticsearch@c6c6fa819e
This commit is contained in:
parent
c6b6a5c804
commit
9b0a1a34e0
|
@ -5,73 +5,37 @@
|
||||||
*/
|
*/
|
||||||
package org.elasticsearch.xpack.upgrade;
|
package org.elasticsearch.xpack.upgrade;
|
||||||
|
|
||||||
import org.elasticsearch.ElasticsearchException;
|
|
||||||
import org.elasticsearch.Version;
|
import org.elasticsearch.Version;
|
||||||
import org.elasticsearch.action.ActionListener;
|
|
||||||
import org.elasticsearch.action.ActionRequest;
|
import org.elasticsearch.action.ActionRequest;
|
||||||
import org.elasticsearch.action.ActionResponse;
|
import org.elasticsearch.action.ActionResponse;
|
||||||
import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateResponse;
|
|
||||||
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateResponse;
|
|
||||||
import org.elasticsearch.action.bulk.BulkRequestBuilder;
|
|
||||||
import org.elasticsearch.action.bulk.BulkResponse;
|
|
||||||
import org.elasticsearch.action.support.WriteRequest;
|
|
||||||
import org.elasticsearch.action.update.UpdateRequest;
|
|
||||||
import org.elasticsearch.client.Client;
|
|
||||||
import org.elasticsearch.cluster.metadata.AliasOrIndex;
|
|
||||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||||
import org.elasticsearch.cluster.service.ClusterService;
|
import org.elasticsearch.cluster.service.ClusterService;
|
||||||
import org.elasticsearch.common.settings.ClusterSettings;
|
import org.elasticsearch.common.settings.ClusterSettings;
|
||||||
import org.elasticsearch.common.settings.IndexScopedSettings;
|
import org.elasticsearch.common.settings.IndexScopedSettings;
|
||||||
import org.elasticsearch.common.settings.SecureString;
|
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.settings.SettingsFilter;
|
import org.elasticsearch.common.settings.SettingsFilter;
|
||||||
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||||
import org.elasticsearch.common.xcontent.XContentType;
|
|
||||||
import org.elasticsearch.index.query.QueryBuilders;
|
|
||||||
import org.elasticsearch.indices.IndexTemplateMissingException;
|
|
||||||
import org.elasticsearch.plugins.ActionPlugin;
|
import org.elasticsearch.plugins.ActionPlugin;
|
||||||
import org.elasticsearch.rest.RestController;
|
import org.elasticsearch.rest.RestController;
|
||||||
import org.elasticsearch.rest.RestHandler;
|
import org.elasticsearch.rest.RestHandler;
|
||||||
import org.elasticsearch.script.Script;
|
|
||||||
import org.elasticsearch.script.ScriptService;
|
import org.elasticsearch.script.ScriptService;
|
||||||
import org.elasticsearch.script.ScriptType;
|
|
||||||
import org.elasticsearch.search.SearchHit;
|
|
||||||
import org.elasticsearch.threadpool.ThreadPool;
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
import org.elasticsearch.transport.TransportResponse;
|
|
||||||
import org.elasticsearch.watcher.ResourceWatcherService;
|
import org.elasticsearch.watcher.ResourceWatcherService;
|
||||||
import org.elasticsearch.xpack.security.InternalClient;
|
import org.elasticsearch.xpack.security.InternalClient;
|
||||||
import org.elasticsearch.xpack.security.authc.support.Hasher;
|
|
||||||
import org.elasticsearch.xpack.security.user.User;
|
|
||||||
import org.elasticsearch.xpack.template.TemplateUtils;
|
|
||||||
import org.elasticsearch.xpack.upgrade.actions.IndexUpgradeAction;
|
import org.elasticsearch.xpack.upgrade.actions.IndexUpgradeAction;
|
||||||
import org.elasticsearch.xpack.upgrade.actions.IndexUpgradeInfoAction;
|
import org.elasticsearch.xpack.upgrade.actions.IndexUpgradeInfoAction;
|
||||||
import org.elasticsearch.xpack.upgrade.rest.RestIndexUpgradeAction;
|
import org.elasticsearch.xpack.upgrade.rest.RestIndexUpgradeAction;
|
||||||
import org.elasticsearch.xpack.upgrade.rest.RestIndexUpgradeInfoAction;
|
import org.elasticsearch.xpack.upgrade.rest.RestIndexUpgradeInfoAction;
|
||||||
import org.elasticsearch.xpack.watcher.client.WatcherClient;
|
|
||||||
import org.elasticsearch.xpack.watcher.execution.TriggeredWatchStore;
|
|
||||||
import org.elasticsearch.xpack.watcher.support.WatcherIndexTemplateRegistry;
|
|
||||||
import org.elasticsearch.xpack.watcher.transport.actions.service.WatcherServiceRequest;
|
|
||||||
import org.elasticsearch.xpack.watcher.watch.Watch;
|
|
||||||
|
|
||||||
import java.nio.charset.StandardCharsets;
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.HashSet;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Set;
|
|
||||||
import java.util.function.BiFunction;
|
import java.util.function.BiFunction;
|
||||||
import java.util.function.Supplier;
|
import java.util.function.Supplier;
|
||||||
import java.util.regex.Pattern;
|
|
||||||
|
|
||||||
import static org.elasticsearch.xpack.security.SecurityLifecycleService.SECURITY_INDEX_NAME;
|
|
||||||
import static org.elasticsearch.xpack.security.authc.esnative.NativeUsersStore.INDEX_TYPE;
|
|
||||||
import static org.elasticsearch.xpack.security.authc.esnative.NativeUsersStore.RESERVED_USER_TYPE;
|
|
||||||
|
|
||||||
public class Upgrade implements ActionPlugin {
|
public class Upgrade implements ActionPlugin {
|
||||||
|
|
||||||
|
@ -87,9 +51,6 @@ public class Upgrade implements ActionPlugin {
|
||||||
public Upgrade(Settings settings) {
|
public Upgrade(Settings settings) {
|
||||||
this.settings = settings;
|
this.settings = settings;
|
||||||
this.upgradeCheckFactories = new ArrayList<>();
|
this.upgradeCheckFactories = new ArrayList<>();
|
||||||
upgradeCheckFactories.add(getWatchesIndexUpgradeCheckFactory(settings));
|
|
||||||
upgradeCheckFactories.add(getTriggeredWatchesIndexUpgradeCheckFactory(settings));
|
|
||||||
upgradeCheckFactories.add(getSecurityUpgradeCheckFactory(settings));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public Collection<Object> createComponents(InternalClient internalClient, ClusterService clusterService, ThreadPool threadPool,
|
public Collection<Object> createComponents(InternalClient internalClient, ClusterService clusterService, ThreadPool threadPool,
|
||||||
|
@ -127,296 +88,4 @@ public class Upgrade implements ActionPlugin {
|
||||||
public static boolean checkInternalIndexFormat(IndexMetaData indexMetaData) {
|
public static boolean checkInternalIndexFormat(IndexMetaData indexMetaData) {
|
||||||
return indexMetaData.getSettings().getAsInt(IndexMetaData.INDEX_FORMAT_SETTING.getKey(), 0) == EXPECTED_INDEX_FORMAT_VERSION;
|
return indexMetaData.getSettings().getAsInt(IndexMetaData.INDEX_FORMAT_SETTING.getKey(), 0) == EXPECTED_INDEX_FORMAT_VERSION;
|
||||||
}
|
}
|
||||||
|
|
||||||
static BiFunction<InternalClient, ClusterService, IndexUpgradeCheck> getSecurityUpgradeCheckFactory(Settings settings) {
|
|
||||||
return (internalClient, clusterService) ->
|
|
||||||
new IndexUpgradeCheck<Void>("security",
|
|
||||||
settings,
|
|
||||||
indexMetaData -> {
|
|
||||||
if (".security".equals(indexMetaData.getIndex().getName())
|
|
||||||
|| indexMetaData.getAliases().containsKey(".security")) {
|
|
||||||
|
|
||||||
if (checkInternalIndexFormat(indexMetaData)) {
|
|
||||||
return UpgradeActionRequired.UP_TO_DATE;
|
|
||||||
} else {
|
|
||||||
return UpgradeActionRequired.UPGRADE;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
return UpgradeActionRequired.NOT_APPLICABLE;
|
|
||||||
}
|
|
||||||
},
|
|
||||||
internalClient,
|
|
||||||
clusterService,
|
|
||||||
new String[] { "user", "reserved-user", "role", "doc" },
|
|
||||||
new Script(ScriptType.INLINE, "painless",
|
|
||||||
"ctx._source.type = ctx._type;\n" +
|
|
||||||
"if (!ctx._type.equals(\"doc\")) {\n" +
|
|
||||||
" ctx._id = ctx._type + \"-\" + ctx._id;\n" +
|
|
||||||
" ctx._type = \"doc\";" +
|
|
||||||
"}\n",
|
|
||||||
new HashMap<>()),
|
|
||||||
listener -> listener.onResponse(null),
|
|
||||||
(success, listener) -> postSecurityUpgrade(internalClient, listener));
|
|
||||||
}
|
|
||||||
|
|
||||||
private static void postSecurityUpgrade(Client client, ActionListener<TransportResponse.Empty> listener) {
|
|
||||||
// update passwords to the new style, if they are in the old default password mechanism
|
|
||||||
client.prepareSearch(SECURITY_INDEX_NAME)
|
|
||||||
.setQuery(QueryBuilders.termQuery(User.Fields.TYPE.getPreferredName(), RESERVED_USER_TYPE))
|
|
||||||
.setFetchSource(true)
|
|
||||||
.execute(ActionListener.wrap(searchResponse -> {
|
|
||||||
assert searchResponse.getHits().getTotalHits() <= 10 :
|
|
||||||
"there are more than 10 reserved users we need to change this to retrieve them all!";
|
|
||||||
Set<String> toConvert = new HashSet<>();
|
|
||||||
for (SearchHit searchHit : searchResponse.getHits()) {
|
|
||||||
Map<String, Object> sourceMap = searchHit.getSourceAsMap();
|
|
||||||
if (hasOldStyleDefaultPassword(sourceMap)) {
|
|
||||||
toConvert.add(searchHit.getId());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (toConvert.isEmpty()) {
|
|
||||||
listener.onResponse(TransportResponse.Empty.INSTANCE);
|
|
||||||
} else {
|
|
||||||
final BulkRequestBuilder bulkRequestBuilder = client.prepareBulk();
|
|
||||||
for (final String id : toConvert) {
|
|
||||||
final UpdateRequest updateRequest = new UpdateRequest(SECURITY_INDEX_NAME,
|
|
||||||
INDEX_TYPE, RESERVED_USER_TYPE + "-" + id);
|
|
||||||
updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
|
|
||||||
.doc(User.Fields.PASSWORD.getPreferredName(), "",
|
|
||||||
User.Fields.TYPE.getPreferredName(), RESERVED_USER_TYPE);
|
|
||||||
bulkRequestBuilder.add(updateRequest);
|
|
||||||
}
|
|
||||||
bulkRequestBuilder.execute(new ActionListener<BulkResponse>() {
|
|
||||||
@Override
|
|
||||||
public void onResponse(BulkResponse bulkItemResponses) {
|
|
||||||
if (bulkItemResponses.hasFailures()) {
|
|
||||||
final String msg = "failed to update old style reserved user passwords: " +
|
|
||||||
bulkItemResponses.buildFailureMessage();
|
|
||||||
listener.onFailure(new ElasticsearchException(msg));
|
|
||||||
} else {
|
|
||||||
listener.onResponse(TransportResponse.Empty.INSTANCE);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onFailure(Exception e) {
|
|
||||||
listener.onFailure(e);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}, listener::onFailure));
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Determines whether the supplied source as a {@link Map} has its password explicitly set to be the default password
|
|
||||||
*/
|
|
||||||
private static boolean hasOldStyleDefaultPassword(Map<String, Object> userSource) {
|
|
||||||
// TODO we should store the hash as something other than a string... bytes?
|
|
||||||
final String passwordHash = (String) userSource.get(User.Fields.PASSWORD.getPreferredName());
|
|
||||||
if (passwordHash == null) {
|
|
||||||
throw new IllegalStateException("passwordHash should never be null");
|
|
||||||
} else if (passwordHash.isEmpty()) {
|
|
||||||
// we know empty is the new style
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
try (SecureString secureString = new SecureString(passwordHash.toCharArray())) {
|
|
||||||
return Hasher.BCRYPT.verify(new SecureString("".toCharArray()), secureString.getChars());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static BiFunction<InternalClient, ClusterService, IndexUpgradeCheck> getWatchesIndexUpgradeCheckFactory(Settings settings) {
|
|
||||||
return (internalClient, clusterService) ->
|
|
||||||
new IndexUpgradeCheck<Boolean>("watches",
|
|
||||||
settings,
|
|
||||||
indexMetaData -> {
|
|
||||||
if (indexOrAliasExists(indexMetaData, ".watches")) {
|
|
||||||
if (checkInternalIndexFormat(indexMetaData)) {
|
|
||||||
return UpgradeActionRequired.UP_TO_DATE;
|
|
||||||
} else {
|
|
||||||
return UpgradeActionRequired.UPGRADE;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
return UpgradeActionRequired.NOT_APPLICABLE;
|
|
||||||
}
|
|
||||||
}, internalClient,
|
|
||||||
clusterService,
|
|
||||||
new String[]{"watch"},
|
|
||||||
new Script(ScriptType.INLINE, "painless", "ctx._type = \"doc\";\n" +
|
|
||||||
"if (ctx._source.containsKey(\"_status\") && !ctx._source.containsKey(\"status\") ) {\n" +
|
|
||||||
" ctx._source.status = ctx._source.remove(\"_status\");\n" +
|
|
||||||
"}",
|
|
||||||
new HashMap<>()),
|
|
||||||
booleanActionListener -> preWatchesIndexUpgrade(internalClient, clusterService, booleanActionListener),
|
|
||||||
(shouldStartWatcher, listener) -> postWatchesIndexUpgrade(internalClient, shouldStartWatcher, listener)
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
static BiFunction<InternalClient, ClusterService, IndexUpgradeCheck> getTriggeredWatchesIndexUpgradeCheckFactory(Settings settings) {
|
|
||||||
return (internalClient, clusterService) ->
|
|
||||||
new IndexUpgradeCheck<Boolean>("triggered-watches",
|
|
||||||
settings,
|
|
||||||
indexMetaData -> {
|
|
||||||
if (indexOrAliasExists(indexMetaData, TriggeredWatchStore.INDEX_NAME)) {
|
|
||||||
if (checkInternalIndexFormat(indexMetaData)) {
|
|
||||||
return UpgradeActionRequired.UP_TO_DATE;
|
|
||||||
} else {
|
|
||||||
return UpgradeActionRequired.UPGRADE;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
return UpgradeActionRequired.NOT_APPLICABLE;
|
|
||||||
}
|
|
||||||
}, internalClient,
|
|
||||||
clusterService,
|
|
||||||
new String[]{"triggered-watch"},
|
|
||||||
new Script(ScriptType.INLINE, "painless", "ctx._type = \"doc\";\n", new HashMap<>()),
|
|
||||||
booleanActionListener -> preTriggeredWatchesIndexUpgrade(internalClient, clusterService, booleanActionListener),
|
|
||||||
(shouldStartWatcher, listener) -> postWatchesIndexUpgrade(internalClient, shouldStartWatcher, listener)
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
private static boolean indexOrAliasExists(IndexMetaData indexMetaData, String name) {
|
|
||||||
return name.equals(indexMetaData.getIndex().getName()) || indexMetaData.getAliases().containsKey(name);
|
|
||||||
}
|
|
||||||
|
|
||||||
private static void preTriggeredWatchesIndexUpgrade(Client client, ClusterService clusterService, ActionListener<Boolean> listener) {
|
|
||||||
AliasOrIndex aliasOrIndex = clusterService.state().getMetaData().getAliasAndIndexLookup().get(Watch.INDEX);
|
|
||||||
boolean isWatchesIndexReady = aliasOrIndex == null || checkInternalIndexFormat(aliasOrIndex.getIndices().get(0));
|
|
||||||
|
|
||||||
new WatcherClient(client).prepareWatcherStats().execute(ActionListener.wrap(
|
|
||||||
stats -> {
|
|
||||||
if (stats.watcherMetaData().manuallyStopped()) {
|
|
||||||
preTriggeredWatchesIndexUpgrade(client, listener, false);
|
|
||||||
} else {
|
|
||||||
new WatcherClient(client).prepareWatchService().stop().execute(ActionListener.wrap(
|
|
||||||
watcherServiceResponse -> {
|
|
||||||
if (watcherServiceResponse.isAcknowledged()) {
|
|
||||||
preTriggeredWatchesIndexUpgrade(client, listener, isWatchesIndexReady);
|
|
||||||
} else {
|
|
||||||
listener.onFailure(new IllegalStateException("unable to stop watcher service"));
|
|
||||||
}
|
|
||||||
|
|
||||||
},
|
|
||||||
listener::onFailure));
|
|
||||||
}
|
|
||||||
},
|
|
||||||
listener::onFailure));
|
|
||||||
}
|
|
||||||
|
|
||||||
static void preTriggeredWatchesIndexUpgrade(final Client client, final ActionListener<Boolean> listener, final boolean restart) {
|
|
||||||
final String legacyTriggeredWatchesTemplateName = "triggered_watches";
|
|
||||||
|
|
||||||
ActionListener<DeleteIndexTemplateResponse> returnToCallerListener =
|
|
||||||
deleteIndexTemplateListener(legacyTriggeredWatchesTemplateName, listener, () -> listener.onResponse(restart));
|
|
||||||
|
|
||||||
// step 2, after put new .triggered_watches template: delete triggered_watches index template, then return to caller
|
|
||||||
ActionListener<PutIndexTemplateResponse> putTriggeredWatchesListener =
|
|
||||||
putIndexTemplateListener(WatcherIndexTemplateRegistry.TRIGGERED_TEMPLATE_NAME, listener,
|
|
||||||
() -> client.admin().indices().prepareDeleteTemplate(legacyTriggeredWatchesTemplateName)
|
|
||||||
.execute(returnToCallerListener));
|
|
||||||
|
|
||||||
// step 1, put new .triggered_watches template
|
|
||||||
final byte[] triggeredWatchesTemplate = TemplateUtils.loadTemplate("/triggered-watches.json",
|
|
||||||
WatcherIndexTemplateRegistry.INDEX_TEMPLATE_VERSION,
|
|
||||||
Pattern.quote("${xpack.watcher.template.version}")).getBytes(StandardCharsets.UTF_8);
|
|
||||||
|
|
||||||
client.admin().indices().preparePutTemplate(WatcherIndexTemplateRegistry.TRIGGERED_TEMPLATE_NAME)
|
|
||||||
.setSource(triggeredWatchesTemplate, XContentType.JSON).execute(putTriggeredWatchesListener);
|
|
||||||
}
|
|
||||||
|
|
||||||
private static void preWatchesIndexUpgrade(Client client, ClusterService clusterService, ActionListener<Boolean> listener) {
|
|
||||||
AliasOrIndex aliasOrIndex = clusterService.state().getMetaData().getAliasAndIndexLookup().get(TriggeredWatchStore.INDEX_NAME);
|
|
||||||
boolean isTriggeredWatchesIndexReady = aliasOrIndex == null || checkInternalIndexFormat(aliasOrIndex.getIndices().get(0));
|
|
||||||
|
|
||||||
new WatcherClient(client).prepareWatcherStats().execute(ActionListener.wrap(
|
|
||||||
stats -> {
|
|
||||||
if (stats.watcherMetaData().manuallyStopped()) {
|
|
||||||
preWatchesIndexUpgrade(client, listener, false);
|
|
||||||
} else {
|
|
||||||
new WatcherClient(client).prepareWatchService().stop().execute(ActionListener.wrap(
|
|
||||||
watcherServiceResponse -> {
|
|
||||||
if (watcherServiceResponse.isAcknowledged()) {
|
|
||||||
preWatchesIndexUpgrade(client, listener, isTriggeredWatchesIndexReady);
|
|
||||||
} else {
|
|
||||||
listener.onFailure(new IllegalStateException("unable to stop watcher service"));
|
|
||||||
}
|
|
||||||
|
|
||||||
},
|
|
||||||
listener::onFailure));
|
|
||||||
}
|
|
||||||
},
|
|
||||||
listener::onFailure));
|
|
||||||
}
|
|
||||||
|
|
||||||
static void preWatchesIndexUpgrade(final Client client, final ActionListener<Boolean> listener, final boolean restart) {
|
|
||||||
final String legacyWatchesTemplateName = "watches";
|
|
||||||
ActionListener<DeleteIndexTemplateResponse> returnToCallerListener =
|
|
||||||
deleteIndexTemplateListener(legacyWatchesTemplateName, listener, () -> listener.onResponse(restart));
|
|
||||||
|
|
||||||
// step 3, after put new .watches template: delete watches index template, then return to caller
|
|
||||||
ActionListener<PutIndexTemplateResponse> putTriggeredWatchesListener =
|
|
||||||
putIndexTemplateListener(WatcherIndexTemplateRegistry.TRIGGERED_TEMPLATE_NAME, listener,
|
|
||||||
() -> client.admin().indices().prepareDeleteTemplate(legacyWatchesTemplateName)
|
|
||||||
.execute(returnToCallerListener));
|
|
||||||
|
|
||||||
// step 2, after delete watch history templates: put new .watches template
|
|
||||||
final byte[] watchesTemplate = TemplateUtils.loadTemplate("/watches.json",
|
|
||||||
WatcherIndexTemplateRegistry.INDEX_TEMPLATE_VERSION,
|
|
||||||
Pattern.quote("${xpack.watcher.template.version}")).getBytes(StandardCharsets.UTF_8);
|
|
||||||
|
|
||||||
ActionListener<DeleteIndexTemplateResponse> deleteWatchHistoryTemplatesListener = deleteIndexTemplateListener("watch_history_*",
|
|
||||||
listener,
|
|
||||||
() -> client.admin().indices().preparePutTemplate(WatcherIndexTemplateRegistry.WATCHES_TEMPLATE_NAME)
|
|
||||||
.setSource(watchesTemplate, XContentType.JSON)
|
|
||||||
.execute(putTriggeredWatchesListener));
|
|
||||||
|
|
||||||
// step 1, delete watch history index templates
|
|
||||||
client.admin().indices().prepareDeleteTemplate("watch_history_*").execute(deleteWatchHistoryTemplatesListener);
|
|
||||||
}
|
|
||||||
|
|
||||||
private static void postWatchesIndexUpgrade(Client client, Boolean shouldStartWatcher,
|
|
||||||
ActionListener<TransportResponse.Empty> listener) {
|
|
||||||
if (shouldStartWatcher) {
|
|
||||||
// Start the watcher service
|
|
||||||
new WatcherClient(client).watcherService(new WatcherServiceRequest().start(), ActionListener.wrap(
|
|
||||||
r -> listener.onResponse(TransportResponse.Empty.INSTANCE), listener::onFailure
|
|
||||||
));
|
|
||||||
} else {
|
|
||||||
listener.onResponse(TransportResponse.Empty.INSTANCE);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private static ActionListener<PutIndexTemplateResponse> putIndexTemplateListener(String name, ActionListener<Boolean> listener,
|
|
||||||
Runnable runnable) {
|
|
||||||
return ActionListener.wrap(
|
|
||||||
r -> {
|
|
||||||
if (r.isAcknowledged()) {
|
|
||||||
runnable.run();
|
|
||||||
} else {
|
|
||||||
listener.onFailure(new ElasticsearchException("Putting [{}] template was not acknowledged", name));
|
|
||||||
}
|
|
||||||
},
|
|
||||||
listener::onFailure);
|
|
||||||
}
|
|
||||||
|
|
||||||
private static ActionListener<DeleteIndexTemplateResponse> deleteIndexTemplateListener(String name, ActionListener<Boolean> listener,
|
|
||||||
Runnable runnable) {
|
|
||||||
return ActionListener.wrap(
|
|
||||||
r -> {
|
|
||||||
if (r.isAcknowledged()) {
|
|
||||||
runnable.run();
|
|
||||||
} else {
|
|
||||||
listener.onFailure(new ElasticsearchException("Deleting [{}] template was not acknowledged", name));
|
|
||||||
}
|
|
||||||
},
|
|
||||||
// if the index template we tried to delete is gone already, no need to worry
|
|
||||||
e -> {
|
|
||||||
if (e instanceof IndexTemplateMissingException) {
|
|
||||||
runnable.run();
|
|
||||||
} else {
|
|
||||||
listener.onFailure(e);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,86 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
|
||||||
* or more contributor license agreements. Licensed under the Elastic License;
|
|
||||||
* you may not use this file except in compliance with the Elastic License.
|
|
||||||
*/
|
|
||||||
package org.elasticsearch.xpack.upgrade;
|
|
||||||
|
|
||||||
import org.elasticsearch.Version;
|
|
||||||
import org.elasticsearch.cluster.metadata.AliasMetaData;
|
|
||||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
|
||||||
import org.elasticsearch.common.UUIDs;
|
|
||||||
import org.elasticsearch.common.settings.Settings;
|
|
||||||
import org.elasticsearch.test.ESTestCase;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
|
|
||||||
import static org.hamcrest.core.IsEqual.equalTo;
|
|
||||||
|
|
||||||
public class IndexUpgradeCheckTests extends ESTestCase {
|
|
||||||
|
|
||||||
public void testWatchesIndexUpgradeCheck() throws Exception {
|
|
||||||
IndexUpgradeCheck check = Upgrade.getWatchesIndexUpgradeCheckFactory(Settings.EMPTY).apply(null, null);
|
|
||||||
assertThat(check.getName(), equalTo("watches"));
|
|
||||||
|
|
||||||
IndexMetaData goodKibanaIndex = newTestIndexMeta(".kibana", Settings.EMPTY);
|
|
||||||
assertThat(check.actionRequired(goodKibanaIndex), equalTo(UpgradeActionRequired.NOT_APPLICABLE));
|
|
||||||
|
|
||||||
IndexMetaData watcherIndex = newTestIndexMeta(".watches", Settings.EMPTY);
|
|
||||||
assertThat(check.actionRequired(watcherIndex), equalTo(UpgradeActionRequired.UPGRADE));
|
|
||||||
|
|
||||||
IndexMetaData watcherIndexWithAlias = newTestIndexMeta("my_watches", ".watches", Settings.EMPTY);
|
|
||||||
assertThat(check.actionRequired(watcherIndexWithAlias), equalTo(UpgradeActionRequired.UPGRADE));
|
|
||||||
|
|
||||||
IndexMetaData watcherIndexWithAliasUpgraded = newTestIndexMeta("my_watches", ".watches",
|
|
||||||
Settings.builder().put(IndexMetaData.INDEX_FORMAT_SETTING.getKey(), "6").put().build());
|
|
||||||
assertThat(check.actionRequired(watcherIndexWithAliasUpgraded), equalTo(UpgradeActionRequired.UP_TO_DATE));
|
|
||||||
}
|
|
||||||
|
|
||||||
public void testTriggeredWatchesIndexUpgradeCheck() throws Exception {
|
|
||||||
IndexUpgradeCheck check = Upgrade.getTriggeredWatchesIndexUpgradeCheckFactory(Settings.EMPTY).apply(null, null);
|
|
||||||
assertThat(check.getName(), equalTo("triggered-watches"));
|
|
||||||
|
|
||||||
IndexMetaData goodKibanaIndex = newTestIndexMeta(".kibana", Settings.EMPTY);
|
|
||||||
assertThat(check.actionRequired(goodKibanaIndex), equalTo(UpgradeActionRequired.NOT_APPLICABLE));
|
|
||||||
|
|
||||||
IndexMetaData watcherIndex = newTestIndexMeta(".triggered_watches", Settings.EMPTY);
|
|
||||||
assertThat(check.actionRequired(watcherIndex), equalTo(UpgradeActionRequired.UPGRADE));
|
|
||||||
|
|
||||||
IndexMetaData watcherIndexWithAlias = newTestIndexMeta("my_triggered_watches", ".triggered_watches", Settings.EMPTY);
|
|
||||||
assertThat(check.actionRequired(watcherIndexWithAlias), equalTo(UpgradeActionRequired.UPGRADE));
|
|
||||||
|
|
||||||
IndexMetaData watcherIndexWithAliasUpgraded = newTestIndexMeta("my_triggered_watches", ".triggered_watches",
|
|
||||||
Settings.builder().put(IndexMetaData.INDEX_FORMAT_SETTING.getKey(), "6").put().build());
|
|
||||||
assertThat(check.actionRequired(watcherIndexWithAliasUpgraded), equalTo(UpgradeActionRequired.UP_TO_DATE));
|
|
||||||
}
|
|
||||||
|
|
||||||
public void testSecurityIndexUpgradeCheck() throws Exception{
|
|
||||||
IndexUpgradeCheck check = Upgrade.getSecurityUpgradeCheckFactory(Settings.EMPTY).apply(null, null);
|
|
||||||
assertThat(check.getName(), equalTo("security"));
|
|
||||||
|
|
||||||
IndexMetaData securityIndex = newTestIndexMeta(".security", Settings.EMPTY);
|
|
||||||
assertThat(check.actionRequired(securityIndex), equalTo(UpgradeActionRequired.UPGRADE));
|
|
||||||
}
|
|
||||||
|
|
||||||
public static IndexMetaData newTestIndexMeta(String name, String alias, Settings indexSettings) throws IOException {
|
|
||||||
Settings build = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
|
|
||||||
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
|
|
||||||
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
|
|
||||||
.put(IndexMetaData.SETTING_CREATION_DATE, 1)
|
|
||||||
.put(IndexMetaData.SETTING_INDEX_UUID, UUIDs.randomBase64UUID())
|
|
||||||
.put(IndexMetaData.SETTING_VERSION_UPGRADED, Version.V_5_0_0_beta1)
|
|
||||||
.put(indexSettings)
|
|
||||||
.build();
|
|
||||||
IndexMetaData.Builder builder = IndexMetaData.builder(name).settings(build);
|
|
||||||
if (alias != null) {
|
|
||||||
// Create alias
|
|
||||||
builder.putAlias(AliasMetaData.newAliasMetaDataBuilder(alias).build());
|
|
||||||
}
|
|
||||||
return builder.build();
|
|
||||||
}
|
|
||||||
|
|
||||||
public static IndexMetaData newTestIndexMeta(String name, Settings indexSettings) throws IOException {
|
|
||||||
return newTestIndexMeta(name, null, indexSettings);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -151,114 +151,4 @@ public class IndexUpgradeIT extends IndexUpgradeIntegTestCase {
|
||||||
// but calling on a particular index should fail
|
// but calling on a particular index should fail
|
||||||
assertThrows(client().prepareExecute(IndexUpgradeInfoAction.INSTANCE).setIndices("test"), IndexNotFoundException.class);
|
assertThrows(client().prepareExecute(IndexUpgradeInfoAction.INSTANCE).setIndices("test"), IndexNotFoundException.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testPreWatchesUpgrade() throws Exception {
|
|
||||||
Settings templateSettings = Settings.builder().put("index.number_of_shards", 2).build();
|
|
||||||
|
|
||||||
// create legacy watches template
|
|
||||||
if (randomBoolean()) {
|
|
||||||
assertAcked(client().admin().indices().preparePutTemplate("watches")
|
|
||||||
.setSettings(templateSettings).setTemplate(".watches*")
|
|
||||||
.get());
|
|
||||||
}
|
|
||||||
|
|
||||||
// create old watch history template
|
|
||||||
if (randomBoolean()) {
|
|
||||||
assertAcked(client().admin().indices().preparePutTemplate("watch_history_foo")
|
|
||||||
.setSettings(templateSettings).setTemplate("watch_history-*")
|
|
||||||
.get());
|
|
||||||
}
|
|
||||||
|
|
||||||
CountDownLatch latch = new CountDownLatch(1);
|
|
||||||
AtomicReference<Exception> exception = new AtomicReference<>();
|
|
||||||
ActionListener<Boolean> listener = ActionListener.wrap(
|
|
||||||
r -> latch.countDown(),
|
|
||||||
e -> {
|
|
||||||
latch.countDown();
|
|
||||||
exception.set(e);
|
|
||||||
});
|
|
||||||
|
|
||||||
// use the internal client from the master, instead of client(), so we dont have to deal with remote transport exceptions
|
|
||||||
// and it works like the real implementation
|
|
||||||
InternalClient client = internalCluster().getInstance(InternalClient.class, internalCluster().getMasterName());
|
|
||||||
Upgrade.preWatchesIndexUpgrade(client, listener, false);
|
|
||||||
|
|
||||||
assertThat("Latch was not counted down", latch.await(10, TimeUnit.SECONDS), is(true));
|
|
||||||
assertThat(exception.get(), is(nullValue()));
|
|
||||||
|
|
||||||
// ensure old index templates are gone, new ones are created
|
|
||||||
List<String> templateNames = getTemplateNames();
|
|
||||||
assertThat(templateNames, not(hasItem(startsWith("watch_history"))));
|
|
||||||
assertThat(templateNames, not(hasItem("watches")));
|
|
||||||
assertThat(templateNames, hasItem(".watches"));
|
|
||||||
|
|
||||||
// last let's be sure that the watcher index template registry does not add back any template by accident with the current state
|
|
||||||
Settings settings = internalCluster().getInstance(Settings.class, internalCluster().getMasterName());
|
|
||||||
ClusterService clusterService = internalCluster().getInstance(ClusterService.class, internalCluster().getMasterName());
|
|
||||||
ThreadPool threadPool = internalCluster().getInstance(ThreadPool.class, internalCluster().getMasterName());
|
|
||||||
WatcherIndexTemplateRegistry registry =
|
|
||||||
new WatcherIndexTemplateRegistry(settings, clusterService, threadPool, client);
|
|
||||||
|
|
||||||
ClusterState state = clusterService.state();
|
|
||||||
ClusterChangedEvent event = new ClusterChangedEvent("whatever", state, state);
|
|
||||||
registry.clusterChanged(event);
|
|
||||||
|
|
||||||
List<String> templateNamesAfterClusterChangedEvent = getTemplateNames();
|
|
||||||
assertThat(templateNamesAfterClusterChangedEvent, not(hasItem(startsWith("watch_history"))));
|
|
||||||
assertThat(templateNamesAfterClusterChangedEvent, not(hasItem("watches")));
|
|
||||||
assertThat(templateNamesAfterClusterChangedEvent, hasItem(".watches"));
|
|
||||||
}
|
|
||||||
|
|
||||||
public void testPreTriggeredWatchesUpgrade() throws Exception {
|
|
||||||
Settings templateSettings = Settings.builder().put("index.number_of_shards", 2).build();
|
|
||||||
// create legacy triggered watch template
|
|
||||||
if (randomBoolean()) {
|
|
||||||
assertAcked(client().admin().indices().preparePutTemplate("triggered_watches")
|
|
||||||
.setSettings(templateSettings).setTemplate(".triggered_watches*")
|
|
||||||
.get());
|
|
||||||
}
|
|
||||||
|
|
||||||
CountDownLatch latch = new CountDownLatch(1);
|
|
||||||
AtomicReference<Exception> exception = new AtomicReference<>();
|
|
||||||
ActionListener<Boolean> listener = ActionListener.wrap(
|
|
||||||
r -> latch.countDown(),
|
|
||||||
e -> {
|
|
||||||
latch.countDown();
|
|
||||||
exception.set(e);
|
|
||||||
});
|
|
||||||
|
|
||||||
// use the internal client from the master, instead of client(), so we dont have to deal with remote transport exceptions
|
|
||||||
// and it works like the real implementation
|
|
||||||
InternalClient client = internalCluster().getInstance(InternalClient.class, internalCluster().getMasterName());
|
|
||||||
Upgrade.preTriggeredWatchesIndexUpgrade(client, listener, false);
|
|
||||||
|
|
||||||
assertThat("Latch was not counted down", latch.await(10, TimeUnit.SECONDS), is(true));
|
|
||||||
assertThat(exception.get(), is(nullValue()));
|
|
||||||
|
|
||||||
// ensure old index templates are gone, new ones are created
|
|
||||||
List<String> templateNames = getTemplateNames();
|
|
||||||
assertThat(templateNames, not(hasItem("triggered_watches")));
|
|
||||||
assertThat(templateNames, hasItem(".triggered_watches"));
|
|
||||||
|
|
||||||
// last let's be sure that the watcher index template registry does not add back any template by accident with the current state
|
|
||||||
Settings settings = internalCluster().getInstance(Settings.class, internalCluster().getMasterName());
|
|
||||||
ClusterService clusterService = internalCluster().getInstance(ClusterService.class, internalCluster().getMasterName());
|
|
||||||
ThreadPool threadPool = internalCluster().getInstance(ThreadPool.class, internalCluster().getMasterName());
|
|
||||||
WatcherIndexTemplateRegistry registry =
|
|
||||||
new WatcherIndexTemplateRegistry(settings, clusterService, threadPool, client);
|
|
||||||
|
|
||||||
ClusterState state = clusterService.state();
|
|
||||||
ClusterChangedEvent event = new ClusterChangedEvent("whatever", state, state);
|
|
||||||
registry.clusterChanged(event);
|
|
||||||
List<String> templateNamesAfterClusterChangedEvent = getTemplateNames();
|
|
||||||
assertThat(templateNamesAfterClusterChangedEvent, not(hasItem("triggered_watches")));
|
|
||||||
assertThat(templateNamesAfterClusterChangedEvent, hasItem(".triggered_watches"));
|
|
||||||
}
|
|
||||||
|
|
||||||
private List<String> getTemplateNames() {
|
|
||||||
GetIndexTemplatesResponse templatesResponse = client().admin().indices().prepareGetTemplates().get();
|
|
||||||
return templatesResponse.getIndexTemplates().stream()
|
|
||||||
.map(IndexTemplateMetaData::getName)
|
|
||||||
.collect(Collectors.toList());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -10,16 +10,18 @@ import org.elasticsearch.Version;
|
||||||
import org.elasticsearch.action.support.IndicesOptions;
|
import org.elasticsearch.action.support.IndicesOptions;
|
||||||
import org.elasticsearch.cluster.ClusterName;
|
import org.elasticsearch.cluster.ClusterName;
|
||||||
import org.elasticsearch.cluster.ClusterState;
|
import org.elasticsearch.cluster.ClusterState;
|
||||||
|
import org.elasticsearch.cluster.metadata.AliasMetaData;
|
||||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||||
import org.elasticsearch.cluster.metadata.MetaData;
|
import org.elasticsearch.cluster.metadata.MetaData;
|
||||||
|
import org.elasticsearch.common.UUIDs;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.test.ESTestCase;
|
import org.elasticsearch.test.ESTestCase;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
|
|
||||||
import static org.elasticsearch.xpack.upgrade.IndexUpgradeCheckTests.newTestIndexMeta;
|
|
||||||
import static org.hamcrest.core.IsEqual.equalTo;
|
import static org.hamcrest.core.IsEqual.equalTo;
|
||||||
|
|
||||||
public class IndexUpgradeServiceTests extends ESTestCase {
|
public class IndexUpgradeServiceTests extends ESTestCase {
|
||||||
|
@ -156,4 +158,25 @@ public class IndexUpgradeServiceTests extends ESTestCase {
|
||||||
}
|
}
|
||||||
return ClusterState.builder(ClusterName.DEFAULT).metaData(metaDataBuilder).build();
|
return ClusterState.builder(ClusterName.DEFAULT).metaData(metaDataBuilder).build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static IndexMetaData newTestIndexMeta(String name, String alias, Settings indexSettings) throws IOException {
|
||||||
|
Settings build = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
|
||||||
|
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
|
||||||
|
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
|
||||||
|
.put(IndexMetaData.SETTING_CREATION_DATE, 1)
|
||||||
|
.put(IndexMetaData.SETTING_INDEX_UUID, UUIDs.randomBase64UUID())
|
||||||
|
.put(IndexMetaData.SETTING_VERSION_UPGRADED, Version.V_5_0_0_beta1)
|
||||||
|
.put(indexSettings)
|
||||||
|
.build();
|
||||||
|
IndexMetaData.Builder builder = IndexMetaData.builder(name).settings(build);
|
||||||
|
if (alias != null) {
|
||||||
|
// Create alias
|
||||||
|
builder.putAlias(AliasMetaData.newAliasMetaDataBuilder(alias).build());
|
||||||
|
}
|
||||||
|
return builder.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
public static IndexMetaData newTestIndexMeta(String name, Settings indexSettings) throws IOException {
|
||||||
|
return newTestIndexMeta(name, null, indexSettings);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,432 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
|
||||||
* or more contributor license agreements. Licensed under the Elastic License;
|
|
||||||
* you may not use this file except in compliance with the Elastic License.
|
|
||||||
*/
|
|
||||||
package org.elasticsearch.upgrades;
|
|
||||||
|
|
||||||
import org.elasticsearch.Version;
|
|
||||||
import org.elasticsearch.client.Response;
|
|
||||||
import org.elasticsearch.client.RestClient;
|
|
||||||
import org.elasticsearch.client.http.HttpHost;
|
|
||||||
import org.elasticsearch.client.http.entity.ContentType;
|
|
||||||
import org.elasticsearch.client.http.entity.StringEntity;
|
|
||||||
import org.elasticsearch.client.http.util.EntityUtils;
|
|
||||||
import org.elasticsearch.common.CheckedConsumer;
|
|
||||||
import org.elasticsearch.common.collect.MapBuilder;
|
|
||||||
import org.elasticsearch.common.settings.Settings;
|
|
||||||
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
|
||||||
import org.elasticsearch.common.xcontent.XContentHelper;
|
|
||||||
import org.elasticsearch.common.xcontent.XContentType;
|
|
||||||
import org.elasticsearch.common.xcontent.json.JsonXContent;
|
|
||||||
import org.elasticsearch.test.junit.annotations.TestLogging;
|
|
||||||
import org.elasticsearch.test.rest.ESRestTestCase;
|
|
||||||
import org.elasticsearch.test.rest.yaml.ObjectPath;
|
|
||||||
import org.elasticsearch.xpack.watcher.condition.AlwaysCondition;
|
|
||||||
import org.junit.Before;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.nio.charset.StandardCharsets;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.Base64;
|
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.Comparator;
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.stream.Collectors;
|
|
||||||
|
|
||||||
import static org.elasticsearch.xpack.security.SecurityLifecycleService.SECURITY_TEMPLATE_NAME;
|
|
||||||
import static org.elasticsearch.xpack.watcher.actions.ActionBuilders.loggingAction;
|
|
||||||
import static org.elasticsearch.xpack.watcher.client.WatchSourceBuilders.watchBuilder;
|
|
||||||
import static org.elasticsearch.xpack.watcher.input.InputBuilders.simpleInput;
|
|
||||||
import static org.elasticsearch.xpack.watcher.trigger.TriggerBuilders.schedule;
|
|
||||||
import static org.elasticsearch.xpack.watcher.trigger.schedule.Schedules.interval;
|
|
||||||
import static org.hamcrest.Matchers.anyOf;
|
|
||||||
import static org.hamcrest.Matchers.containsString;
|
|
||||||
import static org.hamcrest.Matchers.equalTo;
|
|
||||||
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
|
||||||
import static org.hamcrest.Matchers.not;
|
|
||||||
|
|
||||||
@TestLogging("org.elasticsearch.client:TRACE")
|
|
||||||
public class WatchBackwardsCompatibilityIT extends ESRestTestCase {
|
|
||||||
|
|
||||||
private final StringEntity entity = new StringEntity(watchBuilder()
|
|
||||||
.trigger(schedule(interval("5m")))
|
|
||||||
.input(simpleInput())
|
|
||||||
.condition(AlwaysCondition.INSTANCE)
|
|
||||||
.addAction("_action1", loggingAction("{{ctx.watch_id}}"))
|
|
||||||
.buildAsBytes(XContentType.JSON)
|
|
||||||
.utf8ToString(),
|
|
||||||
ContentType.APPLICATION_JSON);
|
|
||||||
private Nodes nodes;
|
|
||||||
|
|
||||||
@Before
|
|
||||||
public void waitForSecuritySetup() throws Exception {
|
|
||||||
String masterNode = null;
|
|
||||||
String catNodesResponse = EntityUtils.toString(
|
|
||||||
client().performRequest("GET", "/_cat/nodes?h=id,master").getEntity(),
|
|
||||||
StandardCharsets.UTF_8
|
|
||||||
);
|
|
||||||
for (String line : catNodesResponse.split("\n")) {
|
|
||||||
int indexOfStar = line.indexOf('*'); // * in the node's output denotes it is master
|
|
||||||
if (indexOfStar != -1) {
|
|
||||||
masterNode = line.substring(0, indexOfStar).trim();
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
assertNotNull(masterNode);
|
|
||||||
final String masterNodeId = masterNode;
|
|
||||||
|
|
||||||
assertBusy(() -> {
|
|
||||||
try {
|
|
||||||
Response nodeDetailsResponse = client().performRequest("GET", "/_nodes");
|
|
||||||
ObjectPath path = ObjectPath.createFromResponse(nodeDetailsResponse);
|
|
||||||
Map<String, Object> nodes = path.evaluate("nodes");
|
|
||||||
assertThat(nodes.size(), greaterThanOrEqualTo(2));
|
|
||||||
String masterVersion = null;
|
|
||||||
for (String key : nodes.keySet()) {
|
|
||||||
// get the ES version number master is on
|
|
||||||
if (key.startsWith(masterNodeId)) {
|
|
||||||
masterVersion = path.evaluate("nodes." + key + ".version");
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
assertNotNull(masterVersion);
|
|
||||||
final String masterTemplateVersion = masterVersion;
|
|
||||||
|
|
||||||
Response response = client().performRequest("GET", "/_cluster/state/metadata");
|
|
||||||
ObjectPath objectPath = ObjectPath.createFromResponse(response);
|
|
||||||
final String mappingsPath = "metadata.templates." + SECURITY_TEMPLATE_NAME + "" +
|
|
||||||
".mappings";
|
|
||||||
Map<String, Object> mappings = objectPath.evaluate(mappingsPath);
|
|
||||||
assertNotNull(mappings);
|
|
||||||
assertThat(mappings.size(), greaterThanOrEqualTo(1));
|
|
||||||
for (String key : mappings.keySet()) {
|
|
||||||
String templateVersion = objectPath.evaluate(mappingsPath + "." + key + "" +
|
|
||||||
"._meta.security-version");
|
|
||||||
final Version mVersion = Version.fromString(masterTemplateVersion);
|
|
||||||
final Version tVersion = Version.fromString(templateVersion);
|
|
||||||
assertTrue(mVersion.onOrBefore(tVersion));
|
|
||||||
}
|
|
||||||
} catch (Exception e) {
|
|
||||||
throw new AssertionError("failed to get cluster state", e);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
nodes = buildNodeAndVersions();
|
|
||||||
logger.info("Nodes in cluster before test: bwc [{}], new [{}], master [{}]", nodes.getBWCNodes(), nodes.getNewNodes(),
|
|
||||||
nodes.getMaster());
|
|
||||||
|
|
||||||
Map<String, String> params = Collections.singletonMap("error_trace", "true");
|
|
||||||
executeAgainstMasterNode(client -> {
|
|
||||||
// create a watch before each test, most of the time this is just overwriting...
|
|
||||||
assertOK(client.performRequest("PUT", "/_xpack/watcher/watch/my-watch", params, entity));
|
|
||||||
// just a check to see if we can execute a watch, purely optional
|
|
||||||
if (randomBoolean()) {
|
|
||||||
assertOK(client.performRequest("POST", "/_xpack/watcher/watch/my-watch/_execute", params,
|
|
||||||
new StringEntity("{ \"record_execution\" : true }", ContentType.APPLICATION_JSON)));
|
|
||||||
}
|
|
||||||
if (randomBoolean()) {
|
|
||||||
Map<String, String> ignore404Params = MapBuilder.newMapBuilder(params).put("ignore", "404").immutableMap();
|
|
||||||
Response indexExistsResponse = client.performRequest("HEAD", "/.triggered_watches", ignore404Params);
|
|
||||||
if (indexExistsResponse.getStatusLine().getStatusCode() == 404) {
|
|
||||||
logger.info("Created triggered watches index to ensure it gets upgraded");
|
|
||||||
client.performRequest("PUT", "/.triggered_watches");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
// helping debugging output
|
|
||||||
executeAgainstMasterNode(client -> {
|
|
||||||
Map<String, String> filterPathParams = MapBuilder.newMapBuilder(params)
|
|
||||||
.put("filter_path", "*.template,*.index_patterns").immutableMap();
|
|
||||||
Response r = client.performRequest("GET", "_template/*watch*", filterPathParams);
|
|
||||||
logger.info("existing watcher templates response [{}]", EntityUtils.toString(r.getEntity(), StandardCharsets.UTF_8));
|
|
||||||
});
|
|
||||||
|
|
||||||
// set logging to debug
|
|
||||||
// executeAgainstMasterNode(client -> {
|
|
||||||
// StringEntity entity = new StringEntity("{ \"transient\" : { \"logger.org.elasticsearch.xpack.watcher\" : \"TRACE\" } }",
|
|
||||||
// ContentType.APPLICATION_JSON);
|
|
||||||
// Response response = client.performRequest("PUT", "_cluster/settings", params, entity);
|
|
||||||
// logger.info("cluster update settings response [{}]", EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8));
|
|
||||||
// });
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected boolean preserveIndicesUponCompletion() {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected boolean preserveTemplatesUponCompletion() {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected Settings restClientSettings() {
|
|
||||||
String token = "Basic " + Base64.getEncoder()
|
|
||||||
.encodeToString(("test_user:x-pack-test-password").getBytes(StandardCharsets.UTF_8));
|
|
||||||
return Settings.builder()
|
|
||||||
.put(ThreadContext.PREFIX + ".Authorization", token)
|
|
||||||
.build();
|
|
||||||
}
|
|
||||||
|
|
||||||
public void testWatcherStats() throws Exception {
|
|
||||||
executeAgainstAllNodes(client ->
|
|
||||||
assertOK(client.performRequest("GET", "/_xpack/watcher/stats"))
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void testWatcherRestart() throws Exception {
|
|
||||||
executeUpgradeIfNeeded();
|
|
||||||
|
|
||||||
executeAgainstRandomNode(client -> assertOK(client.performRequest("POST", "/_xpack/watcher/_stop")));
|
|
||||||
ensureWatcherStopped();
|
|
||||||
|
|
||||||
executeAgainstRandomNode(client -> assertOK(client.performRequest("POST", "/_xpack/watcher/_start")));
|
|
||||||
ensureWatcherStarted();
|
|
||||||
}
|
|
||||||
|
|
||||||
public void testWatchCrudApis() throws Exception {
|
|
||||||
assumeFalse("new nodes is empty", nodes.getNewNodes().isEmpty());
|
|
||||||
|
|
||||||
// execute upgrade if new nodes are in the cluster
|
|
||||||
executeUpgradeIfNeeded();
|
|
||||||
|
|
||||||
executeAgainstAllNodes(client -> {
|
|
||||||
Map<String, String> params = Collections.singletonMap("error_trace", "true");
|
|
||||||
assertOK(client.performRequest("PUT", "/_xpack/watcher/watch/my-watch", params, entity));
|
|
||||||
assertOK(client.performRequest("GET", "/_xpack/watcher/watch/my-watch", params));
|
|
||||||
assertOK(client.performRequest("POST", "/_xpack/watcher/watch/my-watch/_execute", params));
|
|
||||||
assertOK(client.performRequest("PUT", "/_xpack/watcher/watch/my-watch/_deactivate", params));
|
|
||||||
assertOK(client.performRequest("PUT", "/_xpack/watcher/watch/my-watch/_activate", params));
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
public void executeUpgradeIfNeeded() throws Exception {
|
|
||||||
// if new nodes exists, this is a mixed cluster
|
|
||||||
boolean only6xNodes = nodes.getBWCVersion().major >= 6;
|
|
||||||
final List<Node> nodesToQuery = only6xNodes ? nodes.getBWCNodes() : nodes.getNewNodes();
|
|
||||||
final HttpHost[] httpHosts = nodesToQuery.stream().map(Node::getPublishAddress).toArray(HttpHost[]::new);
|
|
||||||
if (httpHosts.length > 0) {
|
|
||||||
try (RestClient client = buildClient(restClientSettings(), httpHosts)) {
|
|
||||||
logger.info("checking that upgrade procedure on the new cluster is required, hosts [{}]", Arrays.asList(httpHosts));
|
|
||||||
Map<String, String> params = Collections.singletonMap("error_trace", "true");
|
|
||||||
Response assistanceResponse = client().performRequest("GET", "_xpack/migration/assistance", params);
|
|
||||||
String assistanceResponseData = EntityUtils.toString(assistanceResponse.getEntity());
|
|
||||||
logger.info("assistance response is: [{}]", assistanceResponseData);
|
|
||||||
Map<String, Object> response = toMap(assistanceResponseData);
|
|
||||||
String watchIndexUpgradeRequired = ObjectPath.evaluate(response, "indices.\\.watches.action_required");
|
|
||||||
String triggeredWatchIndexUpgradeRequired = ObjectPath.evaluate(response, "indices.\\.triggered_watches.action_required");
|
|
||||||
if ("upgrade".equals(watchIndexUpgradeRequired) || "upgrade".equals(triggeredWatchIndexUpgradeRequired)) {
|
|
||||||
boolean stopWatcherBeforeUpgrade = randomBoolean();
|
|
||||||
if (stopWatcherBeforeUpgrade) {
|
|
||||||
assertOK(client.performRequest("POST", "/_xpack/watcher/_stop"));
|
|
||||||
logger.info("stopped watcher manually before starting upgrade");
|
|
||||||
}
|
|
||||||
|
|
||||||
if ("upgrade".equals(watchIndexUpgradeRequired)) {
|
|
||||||
Response upgradeResponse = client.performRequest("POST", "_xpack/migration/upgrade/.watches", params);
|
|
||||||
logger.info("Upgrade .watches response is: [{}]", EntityUtils.toString(upgradeResponse.getEntity()));
|
|
||||||
}
|
|
||||||
|
|
||||||
if ("upgrade".equals(triggeredWatchIndexUpgradeRequired)) {
|
|
||||||
Response upgradeResponse = client.performRequest("POST", "_xpack/migration/upgrade/.triggered_watches", params);
|
|
||||||
logger.info("Upgrade .triggered_watches response is: [{}]", EntityUtils.toString(upgradeResponse.getEntity()));
|
|
||||||
}
|
|
||||||
|
|
||||||
// show templates after upgrade
|
|
||||||
executeAgainstMasterNode(c -> {
|
|
||||||
Map<String, String> filterPathParams = MapBuilder.newMapBuilder(params)
|
|
||||||
.put("filter_path", "*.template,*.index_patterns").immutableMap();
|
|
||||||
Response r = c.performRequest("GET", "_template/*watch*", filterPathParams);
|
|
||||||
logger.info("existing watcher templates response AFTER UPGRADE [{}]",
|
|
||||||
EntityUtils.toString(r.getEntity(), StandardCharsets.UTF_8));
|
|
||||||
});
|
|
||||||
|
|
||||||
|
|
||||||
if (stopWatcherBeforeUpgrade) {
|
|
||||||
ensureWatcherStopped();
|
|
||||||
assertOK(client.performRequest("POST", "/_xpack/watcher/_start"));
|
|
||||||
logger.info("started watcher manually after running upgrade");
|
|
||||||
ensureWatcherStarted();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void executeAgainstRandomNode(CheckedConsumer<RestClient, Exception> consumer) throws Exception {
|
|
||||||
List<Node> nodes = new ArrayList<>(this.nodes.values());
|
|
||||||
nodes.sort(Comparator.comparing(Node::getId));
|
|
||||||
Node node = randomFrom(nodes);
|
|
||||||
|
|
||||||
try (RestClient client = buildClient(restClientSettings(), new HttpHost[] { node.getPublishAddress() })) {
|
|
||||||
consumer.accept(client);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void executeAgainstMasterNode(CheckedConsumer<RestClient, Exception> consumer) throws Exception {
|
|
||||||
try (RestClient client = buildClient(restClientSettings(), new HttpHost[]{this.nodes.getMaster().publishAddress})) {
|
|
||||||
consumer.accept(client);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void executeAgainstAllNodes(CheckedConsumer<RestClient, IOException> consumer)
|
|
||||||
throws IOException {
|
|
||||||
HttpHost[] newHosts = nodes.getNewNodes().stream().map(Node::getPublishAddress).toArray(HttpHost[]::new);
|
|
||||||
HttpHost[] bwcHosts = nodes.getBWCNodes().stream().map(Node::getPublishAddress).toArray(HttpHost[]::new);
|
|
||||||
assertTrue("No nodes in cluster, cannot run any tests", newHosts.length > 0 || bwcHosts.length > 0);
|
|
||||||
|
|
||||||
if (newHosts.length > 0) {
|
|
||||||
try (RestClient newClient = buildClient(restClientSettings(), newHosts)) {
|
|
||||||
consumer.accept(newClient);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (bwcHosts.length > 0) {
|
|
||||||
try (RestClient bwcClient = buildClient(restClientSettings(), bwcHosts)) {
|
|
||||||
consumer.accept(bwcClient);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void ensureWatcherStopped() throws Exception {
|
|
||||||
executeAgainstMasterNode(client -> assertBusy(() -> {
|
|
||||||
Response stats = client.performRequest("GET", "_xpack/watcher/stats");
|
|
||||||
String responseBody = EntityUtils.toString(stats.getEntity());
|
|
||||||
assertThat(responseBody, not(containsString("\"watcher_state\":\"starting\"")));
|
|
||||||
assertThat(responseBody, not(containsString("\"watcher_state\":\"started\"")));
|
|
||||||
assertThat(responseBody, not(containsString("\"watcher_state\":\"stopping\"")));
|
|
||||||
}));
|
|
||||||
}
|
|
||||||
|
|
||||||
private void ensureWatcherStarted() throws Exception {
|
|
||||||
executeAgainstMasterNode(client -> assertBusy(() -> {
|
|
||||||
Response response = client.performRequest("GET", "_xpack/watcher/stats");
|
|
||||||
String responseBody = EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8);
|
|
||||||
assertThat(responseBody, not(containsString("\"watcher_state\":\"starting\"")));
|
|
||||||
assertThat(responseBody, not(containsString("\"watcher_state\":\"stopping\"")));
|
|
||||||
assertThat(responseBody, not(containsString("\"watcher_state\":\"stopped\"")));
|
|
||||||
}));
|
|
||||||
}
|
|
||||||
|
|
||||||
private void assertOK(Response response) throws IOException {
|
|
||||||
assertThat(response.getStatusLine().getStatusCode(), anyOf(equalTo(200), equalTo(201)));
|
|
||||||
// consume that entity, otherwise the input stream will not be closed
|
|
||||||
// side effect is, that everything needs to be asserted ok directly, you cannot check the body!
|
|
||||||
EntityUtils.consume(response.getEntity());
|
|
||||||
}
|
|
||||||
|
|
||||||
private Nodes buildNodeAndVersions() throws IOException {
|
|
||||||
Response response = client().performRequest("GET", "_nodes");
|
|
||||||
ObjectPath objectPath = ObjectPath.createFromResponse(response);
|
|
||||||
Map<String, Object> nodesAsMap = objectPath.evaluate("nodes");
|
|
||||||
Nodes nodes = new Nodes();
|
|
||||||
for (String id : nodesAsMap.keySet()) {
|
|
||||||
nodes.add(new Node(
|
|
||||||
id,
|
|
||||||
objectPath.evaluate("nodes." + id + ".name"),
|
|
||||||
Version.fromString(objectPath.evaluate("nodes." + id + ".version")),
|
|
||||||
HttpHost.create(objectPath.evaluate("nodes." + id + ".http.publish_address"))));
|
|
||||||
}
|
|
||||||
response = client().performRequest("GET", "_cluster/state");
|
|
||||||
nodes.setMasterNodeId(ObjectPath.createFromResponse(response).evaluate("master_node"));
|
|
||||||
return nodes;
|
|
||||||
}
|
|
||||||
|
|
||||||
final class Nodes extends HashMap<String, Node> {
|
|
||||||
|
|
||||||
private String masterNodeId = null;
|
|
||||||
|
|
||||||
public Node getMaster() {
|
|
||||||
return get(masterNodeId);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setMasterNodeId(String id) {
|
|
||||||
if (get(id) == null) {
|
|
||||||
throw new IllegalArgumentException("node with id [" + id + "] not found. got:" +
|
|
||||||
toString());
|
|
||||||
}
|
|
||||||
masterNodeId = id;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void add(Node node) {
|
|
||||||
put(node.getId(), node);
|
|
||||||
}
|
|
||||||
|
|
||||||
public List<Node> getNewNodes() {
|
|
||||||
Version bwcVersion = getBWCVersion();
|
|
||||||
return values().stream().filter(n -> n.getVersion().after(bwcVersion))
|
|
||||||
.collect(Collectors.toList());
|
|
||||||
}
|
|
||||||
|
|
||||||
public List<Node> getBWCNodes() {
|
|
||||||
Version bwcVersion = getBWCVersion();
|
|
||||||
return values().stream().filter(n -> n.getVersion().equals(bwcVersion))
|
|
||||||
.collect(Collectors.toList());
|
|
||||||
}
|
|
||||||
|
|
||||||
public Version getBWCVersion() {
|
|
||||||
if (isEmpty()) {
|
|
||||||
throw new IllegalStateException("no nodes available");
|
|
||||||
}
|
|
||||||
return Version.fromId(values().stream().map(node -> node.getVersion().id).min(Integer::compareTo).get());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String toString() {
|
|
||||||
return "Nodes{" +
|
|
||||||
"masterNodeId='" + masterNodeId + "'\n" +
|
|
||||||
values().stream().map(Node::toString).collect(Collectors.joining("\n")) +
|
|
||||||
'}';
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
final class Node {
|
|
||||||
private final String id;
|
|
||||||
private final String nodeName;
|
|
||||||
private final Version version;
|
|
||||||
private final HttpHost publishAddress;
|
|
||||||
|
|
||||||
Node(String id, String nodeName, Version version, HttpHost publishAddress) {
|
|
||||||
this.id = id;
|
|
||||||
this.nodeName = nodeName;
|
|
||||||
this.version = version;
|
|
||||||
this.publishAddress = publishAddress;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getId() {
|
|
||||||
return id;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getNodeName() {
|
|
||||||
return nodeName;
|
|
||||||
}
|
|
||||||
|
|
||||||
public HttpHost getPublishAddress() {
|
|
||||||
return publishAddress;
|
|
||||||
}
|
|
||||||
|
|
||||||
public Version getVersion() {
|
|
||||||
return version;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String toString() {
|
|
||||||
return "Node{" +
|
|
||||||
"id='" + id + '\'' +
|
|
||||||
", nodeName='" + nodeName + '\'' +
|
|
||||||
", version=" + version +
|
|
||||||
", address=" + publishAddress +
|
|
||||||
'}';
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static Map<String, Object> toMap(String response) throws IOException {
|
|
||||||
return XContentHelper.convertToMap(JsonXContent.jsonXContent, response, false);
|
|
||||||
}
|
|
||||||
}
|
|
Loading…
Reference in New Issue