Add watcher upgrade procedure (elastic/x-pack-elasticsearch#1603)

Relates to elastic/x-pack-elasticsearch#1214

Original commit: elastic/x-pack-elasticsearch@1017d60df4
This commit is contained in:
Igor Motov 2017-06-13 08:43:33 -04:00
parent b564e6e102
commit 3502a9901b
8 changed files with 332 additions and 153 deletions

View File

@ -10,14 +10,16 @@ import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.script.Script;
import org.elasticsearch.transport.TransportResponse;
import java.util.Map;
import java.util.function.Predicate;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
/**
* Generic upgrade check applicable to all indices to be upgraded from the current version
@ -29,74 +31,56 @@ import java.util.function.Predicate;
* - reindex is performed
* - postUpgrade is called if reindex was successful
*/
public class IndexUpgradeCheck extends AbstractComponent {
public class IndexUpgradeCheck<T> extends AbstractComponent {
public static final int UPRADE_VERSION = 6;
private final String name;
private final Predicate<Tuple<IndexMetaData, Map<String, String>>> isSupported;
private final InternalIndexReindexer reindexer;
private final UpgradeActionRequired matchAction;
private final UpgradeActionRequired noMatchAction;
private final BiFunction<IndexMetaData, Map<String, String>, UpgradeActionRequired> actionRequired;
private final InternalIndexReindexer<T> reindexer;
/**
* Creates a new upgrade check that doesn't support upgrade
* Creates a new upgrade check
*
* @param name - the name of the check
* @param settings - system settings
* @param isSupported - return true if they can work with the index with specified name
* @param matchAction - action if isSupported return true
* @param noMatchAction - action if isSupported return false
* @param name - the name of the check
* @param settings - system settings
* @param actionRequired - return true if they can work with the index with specified name
* @param client - client
* @param clusterService - cluster service
* @param types - a list of types that the reindexing should be limited to
* @param updateScript - the upgrade script that should be used during reindexing
*/
public IndexUpgradeCheck(String name, Settings settings,
Predicate<Tuple<IndexMetaData, Map<String, String>>> isSupported,
UpgradeActionRequired matchAction, UpgradeActionRequired noMatchAction) {
super(settings);
this.name = name;
this.isSupported = isSupported;
this.reindexer = null;
this.matchAction = matchAction;
this.noMatchAction = noMatchAction;
BiFunction<IndexMetaData, Map<String, String>, UpgradeActionRequired> actionRequired,
Client client, ClusterService clusterService, String[] types, Script updateScript) {
this(name, settings, actionRequired, client, clusterService, types, updateScript,
listener -> listener.onResponse(null), (t, listener) -> listener.onResponse(TransportResponse.Empty.INSTANCE));
}
/**
* Creates a new upgrade check
*
* @param name - the name of the check
* @param settings - system settings
* @param client - client
* @param clusterService - cluster service
* @param isSupported - return true if they can work with the index with specified name
* @param types - a list of types that the reindexing should be limited to
* @param updateScript - the upgrade script that should be used during reindexing
* @param name - the name of the check
* @param settings - system settings
* @param actionRequired - return true if they can work with the index with specified name
* @param client - client
* @param clusterService - cluster service
* @param types - a list of types that the reindexing should be limited to
* @param updateScript - the upgrade script that should be used during reindexing
* @param preUpgrade - action that should be performed before upgrade
* @param postUpgrade - action that should be performed after upgrade
*/
public IndexUpgradeCheck(String name, Settings settings, Client client, ClusterService clusterService,
Predicate<Tuple<IndexMetaData, Map<String, String>>> isSupported,
String[] types, Script updateScript) {
this(name, settings, client, clusterService, isSupported, types, updateScript, UpgradeActionRequired.UPGRADE,
UpgradeActionRequired.NOT_APPLICABLE);
}
/**
* Creates a new upgrade check
*
* @param name - the name of the check
* @param settings - system settings
* @param client - client
* @param clusterService - cluster service
* @param isSupported - return true if they can work with the index with specified name
* @param types - a list of types that the reindexing should be limited to
* @param updateScript - the upgrade script that should be used during reindexing
*/
public IndexUpgradeCheck(String name, Settings settings, Client client, ClusterService clusterService,
Predicate<Tuple<IndexMetaData, Map<String, String>>> isSupported,
String[] types, Script updateScript, UpgradeActionRequired matchAction, UpgradeActionRequired noMatchAction) {
public IndexUpgradeCheck(String name, Settings settings,
BiFunction<IndexMetaData, Map<String, String>, UpgradeActionRequired> actionRequired,
Client client, ClusterService clusterService, String[] types, Script updateScript,
Consumer<ActionListener<T>> preUpgrade,
BiConsumer<T, ActionListener<TransportResponse.Empty>> postUpgrade) {
super(settings);
this.name = name;
this.isSupported = isSupported;
this.reindexer = new InternalIndexReindexer(client, clusterService, UPRADE_VERSION, updateScript, types);
this.matchAction = matchAction;
this.noMatchAction = noMatchAction;
this.actionRequired = actionRequired;
this.reindexer = new InternalIndexReindexer<>(client, clusterService, UPRADE_VERSION, updateScript, types, preUpgrade,
postUpgrade);
}
/**
* Returns the name of the check
*/
@ -109,31 +93,21 @@ public class IndexUpgradeCheck extends AbstractComponent {
*
* @param indexMetaData index metadata
* @param params additional user-specified parameters see {@link IndexUpgradeCheckFactory#supportedParams}
* @param state current cluster state
* @return required action or UpgradeActionRequired.NOT_APPLICABLE if this check cannot be performed on the index
*/
public UpgradeActionRequired actionRequired(IndexMetaData indexMetaData, Map<String, String> params, ClusterState state) {
if (isSupported.test(new Tuple<>(indexMetaData, params))) {
return matchAction;
}
return noMatchAction;
public UpgradeActionRequired actionRequired(IndexMetaData indexMetaData, Map<String, String> params) {
return actionRequired.apply(indexMetaData, params);
}
/**
* Perform the index upgrade
*
* @param indexMetaData index metadata
* @param params additional user-specified parameters see {@link IndexUpgradeCheckFactory#supportedParams}
* @param state current cluster state
* @param listener the listener that should be called upon completion of the upgrade
*/
public void upgrade(IndexMetaData indexMetaData, Map<String, String> params, ClusterState state,
public void upgrade(IndexMetaData indexMetaData, ClusterState state,
ActionListener<BulkByScrollResponse> listener) {
if (reindexer == null) {
throw new UnsupportedOperationException(getName() + " check doesn't support index upgrade");
} else {
reindexer.upgrade(indexMetaData.getIndex().getName(), state, listener);
}
reindexer.upgrade(indexMetaData.getIndex().getName(), state, listener);
}
}

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.upgrade;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.ClusterState;
@ -49,32 +50,43 @@ public class IndexUpgradeService extends AbstractComponent {
MetaData metaData = state.getMetaData();
for (String index : concreteIndexNames) {
IndexMetaData indexMetaData = metaData.index(index);
indexCheck:
for (IndexUpgradeCheck check : upgradeChecks) {
UpgradeActionRequired upgradeActionRequired = check.actionRequired(indexMetaData, params, state);
logger.trace("[{}] check [{}] returned [{}]", index, check.getName(), upgradeActionRequired);
switch (upgradeActionRequired) {
case UPGRADE:
case REINDEX:
// this index needs to be upgraded or reindexed - skipping all other checks
results.put(index, upgradeActionRequired);
break indexCheck;
case UP_TO_DATE:
// this index is good - skipping all other checks
break indexCheck;
case NOT_APPLICABLE:
// this action is not applicable to this index - skipping to the next one
break;
default:
throw new IllegalStateException("unknown upgrade action " + upgradeActionRequired + " for the index "
+ index);
}
UpgradeActionRequired upgradeActionRequired = upgradeInfo(indexMetaData, index, params);
if (upgradeActionRequired != null) {
results.put(index, upgradeActionRequired);
}
}
return results;
}
private UpgradeActionRequired upgradeInfo(IndexMetaData indexMetaData, String index, Map<String, String> params) {
for (IndexUpgradeCheck check : upgradeChecks) {
UpgradeActionRequired upgradeActionRequired = check.actionRequired(indexMetaData, params);
logger.trace("[{}] check [{}] returned [{}]", index, check.getName(), upgradeActionRequired);
switch (upgradeActionRequired) {
case UPGRADE:
case REINDEX:
// this index needs to be upgraded or reindexed - skipping all other checks
return upgradeActionRequired;
case UP_TO_DATE:
// this index is good - skipping all other checks
return null;
case NOT_APPLICABLE:
// this action is not applicable to this index - skipping to the next one
break;
default:
throw new IllegalStateException("unknown upgrade action " + upgradeActionRequired + " for the index "
+ index);
}
}
// Catch all check for all indices that didn't match the specific checks
if (indexMetaData.getCreationVersion().before(Version.V_5_0_0)) {
return UpgradeActionRequired.REINDEX;
} else {
return null;
}
}
public void upgrade(String index, Map<String, String> params, ClusterState state,
ActionListener<BulkByScrollResponse> listener) {
IndexMetaData indexMetaData = state.metaData().index(index);
@ -82,11 +94,11 @@ public class IndexUpgradeService extends AbstractComponent {
throw new IndexNotFoundException(index);
}
for (IndexUpgradeCheck check : upgradeChecks) {
UpgradeActionRequired upgradeActionRequired = check.actionRequired(indexMetaData, params, state);
UpgradeActionRequired upgradeActionRequired = check.actionRequired(indexMetaData, params);
switch (upgradeActionRequired) {
case UPGRADE:
// this index needs to be upgraded - start the upgrade procedure
check.upgrade(indexMetaData, params, state, listener);
check.upgrade(indexMetaData, state, listener);
return;
case REINDEX:
// this index needs to be re-indexed

View File

@ -16,7 +16,6 @@ import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexNotFoundException;
@ -26,6 +25,9 @@ import org.elasticsearch.index.reindex.ReindexRequest;
import org.elasticsearch.script.Script;
import org.elasticsearch.transport.TransportResponse;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
/**
* A component that performs the following upgrade procedure:
* <p>
@ -35,23 +37,41 @@ import org.elasticsearch.transport.TransportResponse;
* - Reindex from .{name} to .{name}-v6 with transform
* - Delete index .{name} and add alias .{name} to .{name}-v6
*/
public class InternalIndexReindexer {
public class InternalIndexReindexer<T> {
private final Client client;
private final ClusterService clusterService;
private final Script transformScript;
private final String[] types;
private final int version;
private final Consumer<ActionListener<T>> preUpgrade;
private final BiConsumer<T, ActionListener<TransportResponse.Empty>> postUpgrade;
public InternalIndexReindexer(Client client, ClusterService clusterService, int version, Script transformScript, String[] types) {
public InternalIndexReindexer(Client client, ClusterService clusterService, int version, Script transformScript, String[] types,
Consumer<ActionListener<T>> preUpgrade,
BiConsumer<T, ActionListener<TransportResponse.Empty>> postUpgrade) {
this.client = client;
this.clusterService = clusterService;
this.transformScript = transformScript;
this.types = types;
this.version = version;
this.preUpgrade = preUpgrade;
this.postUpgrade = postUpgrade;
}
public void upgrade(String index, ClusterState clusterState, ActionListener<BulkByScrollResponse> listener) {
preUpgrade.accept(ActionListener.wrap(
t -> innerUpgrade(index, clusterState, ActionListener.wrap(
response -> postUpgrade.accept(t, ActionListener.wrap(
empty -> listener.onResponse(response),
listener::onFailure
)),
listener::onFailure
)),
listener::onFailure));
}
private void innerUpgrade(String index, ClusterState clusterState, ActionListener<BulkByScrollResponse> listener) {
String newIndex = index + "_v" + version;
try {
checkMasterAndDataNodeVersion(clusterState);

View File

@ -6,8 +6,10 @@
package org.elasticsearch.xpack.upgrade;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
@ -19,6 +21,7 @@ import org.elasticsearch.common.settings.IndexScopedSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsFilter;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestHandler;
@ -26,12 +29,16 @@ import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.script.ScriptType;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.watcher.ResourceWatcherService;
import org.elasticsearch.xpack.security.InternalClient;
import org.elasticsearch.xpack.upgrade.actions.IndexUpgradeAction;
import org.elasticsearch.xpack.upgrade.actions.IndexUpgradeInfoAction;
import org.elasticsearch.xpack.upgrade.rest.RestIndexUpgradeAction;
import org.elasticsearch.xpack.upgrade.rest.RestIndexUpgradeInfoAction;
import org.elasticsearch.xpack.watcher.client.WatcherClient;
import org.elasticsearch.xpack.watcher.transport.actions.service.WatcherServiceRequest;
import org.elasticsearch.xpack.watcher.transport.actions.stats.WatcherStatsRequest;
import java.util.ArrayList;
import java.util.Arrays;
@ -58,7 +65,7 @@ public class Upgrade implements ActionPlugin {
this.upgradeCheckFactories = new ArrayList<>();
for (Tuple<Collection<String>, BiFunction<InternalClient, ClusterService, IndexUpgradeCheck>> checkFactory : Arrays.asList(
getKibanaUpgradeCheckFactory(settings),
getGenericCheckFactory(settings))) {
getWatcherUpgradeCheckFactory(settings))) {
extraParameters.addAll(checkFactory.v1());
upgradeCheckFactories.add(checkFactory.v2());
}
@ -93,32 +100,24 @@ public class Upgrade implements ActionPlugin {
);
}
static Tuple<Collection<String>, BiFunction<InternalClient, ClusterService, IndexUpgradeCheck>> getGenericCheckFactory(
Settings settings) {
return new Tuple<>(
Collections.emptyList(),
(internalClient, clusterService) ->
new IndexUpgradeCheck("generic", settings,
indexAndParams -> indexAndParams.v1().getCreationVersion().before(Version.V_5_0_0_alpha1),
UpgradeActionRequired.REINDEX,
UpgradeActionRequired.UP_TO_DATE));
}
static Tuple<Collection<String>, BiFunction<InternalClient, ClusterService, IndexUpgradeCheck>> getKibanaUpgradeCheckFactory(
Settings settings) {
return new Tuple<>(
Collections.singletonList("kibana_indices"),
(internalClient, clusterService) ->
new IndexUpgradeCheck("kibana",
new IndexUpgradeCheck<Void>("kibana",
settings,
internalClient,
clusterService,
indexAndParams -> {
String indexName = indexAndParams.v1().getIndex().getName();
String kibanaIndicesMasks = indexAndParams.v2().getOrDefault("kibana_indices", ".kibana");
(indexMetaData, params) -> {
String indexName = indexMetaData.getIndex().getName();
String kibanaIndicesMasks = params.getOrDefault("kibana_indices", ".kibana");
String[] kibanaIndices = Strings.delimitedListToStringArray(kibanaIndicesMasks, ",");
return Regex.simpleMatch(kibanaIndices, indexName);
},
if (Regex.simpleMatch(kibanaIndices, indexName)) {
return UpgradeActionRequired.UPGRADE;
} else {
return UpgradeActionRequired.NOT_APPLICABLE;
}
}, internalClient,
clusterService,
Strings.EMPTY_ARRAY,
new Script(ScriptType.INLINE, "painless", "ctx._id = ctx._type + \"-\" + ctx._id;\n" +
"ctx._source = [ ctx._type : ctx._source ];\n" +
@ -126,4 +125,80 @@ public class Upgrade implements ActionPlugin {
"ctx._type = \"doc\";",
new HashMap<>())));
}
static Tuple<Collection<String>, BiFunction<InternalClient, ClusterService, IndexUpgradeCheck>> getWatcherUpgradeCheckFactory(
Settings settings) {
return new Tuple<>(
Collections.emptyList(),
(internalClient, clusterService) ->
new IndexUpgradeCheck<Boolean>("watcher",
settings,
(indexMetaData, params) -> {
if (".watches".equals(indexMetaData.getIndex().getName()) ||
indexMetaData.getAliases().containsKey(".watches")) {
if (indexMetaData.getMappings().size() == 1 && indexMetaData.getMappings().containsKey("doc") ) {
return UpgradeActionRequired.UP_TO_DATE;
} else {
return UpgradeActionRequired.UPGRADE;
}
} else {
return UpgradeActionRequired.NOT_APPLICABLE;
}
}, internalClient,
clusterService,
new String[]{"watch"},
new Script(ScriptType.INLINE, "painless", "ctx._type = \"doc\";\n" +
"if (ctx._source.containsKey(\"_status\") && !ctx._source.containsKey(\"status\") ) {}\n" +
" ctx._source.status = ctx._source.remove(\"_status\");\n" +
"}",
new HashMap<>()),
booleanActionListener -> preWatcherUpgrade(internalClient, booleanActionListener),
(shouldStartWatcher, listener) -> postWatcherUpgrade(internalClient, shouldStartWatcher, listener)
));
}
private static void preWatcherUpgrade(Client client, ActionListener<Boolean> listener) {
new WatcherClient(client).watcherStats(new WatcherStatsRequest(), ActionListener.wrap(
stats -> {
if (stats.watcherMetaData().manuallyStopped()) {
// don't start the watcher after upgrade
listener.onResponse(false);
} else {
// stop the watcher
new WatcherClient(client).watcherService(new WatcherServiceRequest().stop(), ActionListener.wrap(
stopResponse -> {
if (stopResponse.isAcknowledged()) {
listener.onResponse(true);
} else {
listener.onFailure(new IllegalStateException("unable to stop watcher service"));
}
}, listener::onFailure
));
}
}, listener::onFailure));
}
private static void postWatcherUpgrade(Client client, Boolean shouldStartWatcher, ActionListener<TransportResponse.Empty> listener) {
client.admin().indices().prepareDelete("triggered-watches").execute(ActionListener.wrap(deleteIndexResponse -> {
startWatcherIfNeeded(shouldStartWatcher, client, listener);
}, e -> {
if (e instanceof IndexNotFoundException) {
startWatcherIfNeeded(shouldStartWatcher, client, listener);
} else {
listener.onFailure(e);
}
}
));
}
private static void startWatcherIfNeeded(Boolean shouldStartWatcher, Client client, ActionListener<TransportResponse.Empty> listener) {
if (shouldStartWatcher) {
// Start the watcher service
new WatcherClient(client).watcherService(new WatcherServiceRequest().start(), ActionListener.wrap(
stopResponse -> listener.onResponse(TransportResponse.Empty.INSTANCE), listener::onFailure
));
} else {
listener.onResponse(TransportResponse.Empty.INSTANCE);
}
}
}

View File

@ -6,58 +6,67 @@
package org.elasticsearch.xpack.upgrade;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.AliasMetaData;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import java.util.Collections;
import static org.hamcrest.core.IsEqual.equalTo;
public class IndexUpgradeCheckTests extends ESTestCase {
public void testGenericUpgradeCheck() {
IndexUpgradeCheck check = Upgrade.getGenericCheckFactory(Settings.EMPTY).v2().apply(null, null);
assertThat(check.getName(), equalTo("generic"));
IndexMetaData goodIndex = newTestIndexMeta("good", Settings.EMPTY);
IndexMetaData badIndex = newTestIndexMeta("bad",
Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.fromString("2.0.0")).build());
assertThat(check.actionRequired(goodIndex, Collections.emptyMap(), ClusterState.EMPTY_STATE),
equalTo(UpgradeActionRequired.UP_TO_DATE));
assertThat(check.actionRequired(badIndex, Collections.emptyMap(), ClusterState.EMPTY_STATE),
equalTo(UpgradeActionRequired.REINDEX));
}
public void testKibanaUpgradeCheck() {
public void testKibanaUpgradeCheck() throws Exception {
IndexUpgradeCheck check = Upgrade.getKibanaUpgradeCheckFactory(Settings.EMPTY).v2().apply(null, null);
assertThat(check.getName(), equalTo("kibana"));
IndexMetaData goodKibanaIndex = newTestIndexMeta(".kibana", Settings.EMPTY);
assertThat(check.actionRequired(goodKibanaIndex, Collections.emptyMap(), ClusterState.EMPTY_STATE),
assertThat(check.actionRequired(goodKibanaIndex, Collections.emptyMap()),
equalTo(UpgradeActionRequired.UPGRADE));
IndexMetaData renamedKibanaIndex = newTestIndexMeta(".kibana2", Settings.EMPTY);
assertThat(check.actionRequired(renamedKibanaIndex, Collections.emptyMap(), ClusterState.EMPTY_STATE),
assertThat(check.actionRequired(renamedKibanaIndex, Collections.emptyMap()),
equalTo(UpgradeActionRequired.NOT_APPLICABLE));
assertThat(check.actionRequired(renamedKibanaIndex, Collections.singletonMap("kibana_indices", ".kibana*"),
ClusterState.EMPTY_STATE), equalTo(UpgradeActionRequired.UPGRADE));
assertThat(check.actionRequired(renamedKibanaIndex, Collections.singletonMap("kibana_indices", ".kibana*")
), equalTo(UpgradeActionRequired.UPGRADE));
assertThat(check.actionRequired(renamedKibanaIndex, Collections.singletonMap("kibana_indices", ".kibana1,.kibana2"),
ClusterState.EMPTY_STATE), equalTo(UpgradeActionRequired.UPGRADE));
assertThat(check.actionRequired(renamedKibanaIndex, Collections.singletonMap("kibana_indices", ".kibana1,.kibana2")
), equalTo(UpgradeActionRequired.UPGRADE));
IndexMetaData watcherIndex = newTestIndexMeta(".watches", Settings.EMPTY);
assertThat(check.actionRequired(watcherIndex, Collections.singletonMap("kibana_indices", ".kibana*"), ClusterState.EMPTY_STATE),
assertThat(check.actionRequired(watcherIndex, Collections.singletonMap("kibana_indices", ".kibana*")),
equalTo(UpgradeActionRequired.NOT_APPLICABLE));
IndexMetaData securityIndex = newTestIndexMeta(".security", Settings.EMPTY);
assertThat(check.actionRequired(securityIndex, Collections.singletonMap("kibana_indices", ".kibana*"), ClusterState.EMPTY_STATE),
assertThat(check.actionRequired(securityIndex, Collections.singletonMap("kibana_indices", ".kibana*")),
equalTo(UpgradeActionRequired.NOT_APPLICABLE));
}
public static IndexMetaData newTestIndexMeta(String name, Settings indexSettings) {
public void testWatcherIndexUpgradeCheck() throws Exception{
IndexUpgradeCheck check = Upgrade.getWatcherUpgradeCheckFactory(Settings.EMPTY).v2().apply(null, null);
assertThat(check.getName(), equalTo("watcher"));
IndexMetaData goodKibanaIndex = newTestIndexMeta(".kibana", Settings.EMPTY);
assertThat(check.actionRequired(goodKibanaIndex, Collections.emptyMap()),
equalTo(UpgradeActionRequired.NOT_APPLICABLE));
IndexMetaData watcherIndex = newTestIndexMeta(".watches", Settings.EMPTY);
assertThat(check.actionRequired(watcherIndex, Collections.singletonMap("kibana_indices", ".kibana*")),
equalTo(UpgradeActionRequired.UPGRADE));
IndexMetaData watcherIndexWithAlias = newTestIndexMeta("my_watches", ".watches", Settings.EMPTY, "watch");
assertThat(check.actionRequired(watcherIndexWithAlias, Collections.emptyMap()),
equalTo(UpgradeActionRequired.UPGRADE));
IndexMetaData watcherIndexWithAliasUpgraded = newTestIndexMeta("my_watches", ".watches", Settings.EMPTY, "doc");
assertThat(check.actionRequired(watcherIndexWithAliasUpgraded, Collections.emptyMap()),
equalTo(UpgradeActionRequired.UP_TO_DATE));
}
public static IndexMetaData newTestIndexMeta(String name, String alias, Settings indexSettings, String type) throws IOException {
Settings build = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
@ -66,7 +75,20 @@ public class IndexUpgradeCheckTests extends ESTestCase {
.put(IndexMetaData.SETTING_VERSION_UPGRADED, Version.V_5_0_0_beta1)
.put(indexSettings)
.build();
return IndexMetaData.builder(name).settings(build).build();
IndexMetaData.Builder builder = IndexMetaData.builder(name).settings(build);
if (alias != null) {
// Create alias
builder.putAlias(AliasMetaData.newAliasMetaDataBuilder(alias).build());
}
if (type != null) {
// Create fake type
builder.putMapping(type, "{}");
}
return builder.build();
}
public static IndexMetaData newTestIndexMeta(String name, Settings indexSettings) throws IOException {
return newTestIndexMeta(name, null, indexSettings, "foo");
}
}

View File

@ -7,15 +7,19 @@ package org.elasticsearch.xpack.upgrade;
import org.elasticsearch.ElasticsearchSecurityException;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.xpack.upgrade.actions.IndexUpgradeAction;
import org.elasticsearch.xpack.upgrade.actions.IndexUpgradeInfoAction;
import org.elasticsearch.xpack.upgrade.actions.IndexUpgradeInfoAction.Response;
import org.junit.Before;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.containsString;
@ -70,4 +74,43 @@ public class IndexUpgradeIT extends IndexUpgradeIntegTestCase {
assertEquals(2L, searchResponse.getHits().getTotalHits());
}
public void testInternalUpgradePrePostChecks() {
Long val = randomLong();
AtomicBoolean preUpgradeIsCalled = new AtomicBoolean();
AtomicBoolean postUpgradeIsCalled = new AtomicBoolean();
IndexUpgradeCheck check = new IndexUpgradeCheck<Long>(
"test", Settings.EMPTY,
(indexMetaData, stringStringMap) -> {
if (indexMetaData.getIndex().getName().equals("internal_index")) {
return UpgradeActionRequired.UPGRADE;
} else {
return UpgradeActionRequired.NOT_APPLICABLE;
}
},
client(), internalCluster().clusterService(internalCluster().getMasterName()), Strings.EMPTY_ARRAY, null,
listener -> {
assertFalse(preUpgradeIsCalled.getAndSet(true));
assertFalse(postUpgradeIsCalled.get());
listener.onResponse(val);
},
(aLong, listener) -> {
assertTrue(preUpgradeIsCalled.get());
assertFalse(postUpgradeIsCalled.getAndSet(true));
assertEquals(aLong, val);
listener.onResponse(TransportResponse.Empty.INSTANCE);
});
assertAcked(client().admin().indices().prepareCreate("internal_index").get());
IndexUpgradeService service = new IndexUpgradeService(Settings.EMPTY, Collections.singletonList(check));
PlainActionFuture<BulkByScrollResponse> future = PlainActionFuture.newFuture();
service.upgrade("internal_index", Collections.emptyMap(), clusterService().state(), future);
future.actionGet();
assertTrue(preUpgradeIsCalled.get());
assertTrue(postUpgradeIsCalled.get());
}
}

View File

@ -6,6 +6,7 @@
package org.elasticsearch.xpack.upgrade;
import org.elasticsearch.Version;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
@ -17,6 +18,7 @@ import org.elasticsearch.test.ESTestCase;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.function.BiFunction;
import static org.elasticsearch.xpack.upgrade.IndexUpgradeCheckTests.newTestIndexMeta;
import static org.hamcrest.core.IsEqual.equalTo;
@ -24,25 +26,33 @@ import static org.hamcrest.core.IsEqual.equalTo;
public class IndexUpgradeServiceTests extends ESTestCase {
private IndexUpgradeCheck upgradeBarCheck = new IndexUpgradeCheck("upgrade_bar", Settings.EMPTY,
indexAndParams -> "bar".equals(indexAndParams.v1().getSettings().get("test.setting")),
UpgradeActionRequired.UPGRADE, UpgradeActionRequired.NOT_APPLICABLE);
(BiFunction<IndexMetaData, Map<String, String>, UpgradeActionRequired>) (indexMetaData, stringStringMap) -> {
if ("bar".equals(indexMetaData.getSettings().get("test.setting"))) {
return UpgradeActionRequired.UPGRADE;
} else {
return UpgradeActionRequired.NOT_APPLICABLE;
}
}, null, null, null, null);
private IndexUpgradeCheck reindexFooCheck = new IndexUpgradeCheck("reindex_foo", Settings.EMPTY,
indexAndParams -> "foo".equals(indexAndParams.v1().getSettings().get("test.setting")),
UpgradeActionRequired.REINDEX, UpgradeActionRequired.NOT_APPLICABLE);
(BiFunction<IndexMetaData, Map<String, String>, UpgradeActionRequired>) (indexMetaData, stringStringMap) -> {
if ("foo".equals(indexMetaData.getSettings().get("test.setting"))) {
return UpgradeActionRequired.REINDEX;
} else {
return UpgradeActionRequired.NOT_APPLICABLE;
}
}, null, null, null, null);
private IndexUpgradeCheck everythingIsFineCheck = new IndexUpgradeCheck("everything_is_fine", Settings.EMPTY,
indexAndParams -> true,
UpgradeActionRequired.UP_TO_DATE, UpgradeActionRequired.NOT_APPLICABLE);
(indexMetaData, stringStringMap) -> UpgradeActionRequired.UP_TO_DATE, null, null, null, null);
private IndexUpgradeCheck unreachableCheck = new IndexUpgradeCheck("unreachable", Settings.EMPTY,
indexAndParams -> {
(BiFunction<IndexMetaData, Map<String, String>, UpgradeActionRequired>) (indexMetaData, stringStringMap) -> {
fail("Unreachable check is called");
return false;
}, UpgradeActionRequired.UP_TO_DATE, UpgradeActionRequired.NOT_APPLICABLE);
return null;
}, null, null, null, null);
public void testIndexUpgradeServiceMultipleCheck() {
public void testIndexUpgradeServiceMultipleCheck() throws Exception {
IndexUpgradeService service;
if (randomBoolean()) {
service = new IndexUpgradeService(Settings.EMPTY, Arrays.asList(
@ -80,7 +90,7 @@ public class IndexUpgradeServiceTests extends ESTestCase {
}
public void testNoMatchingChecks() {
public void testNoMatchingChecks() throws Exception {
IndexUpgradeService service = new IndexUpgradeService(Settings.EMPTY, Arrays.asList(
upgradeBarCheck,
reindexFooCheck
@ -100,7 +110,7 @@ public class IndexUpgradeServiceTests extends ESTestCase {
assertThat(result.get("foo"), equalTo(UpgradeActionRequired.REINDEX));
}
public void testEarlierChecksWin() {
public void testEarlierChecksWin() throws Exception {
IndexUpgradeService service = new IndexUpgradeService(Settings.EMPTY, Arrays.asList(
everythingIsFineCheck,
upgradeBarCheck,
@ -117,9 +127,29 @@ public class IndexUpgradeServiceTests extends ESTestCase {
IndicesOptions.lenientExpandOpen(), Collections.emptyMap(), clusterState);
assertThat(result.size(), equalTo(0)); // everything as the first checker should indicate that everything is fine
}
public void testGenericTest() throws Exception {
IndexUpgradeService service = new IndexUpgradeService(Settings.EMPTY, Arrays.asList(
upgradeBarCheck,
reindexFooCheck
));
IndexMetaData goodIndex = newTestIndexMeta("good", Settings.EMPTY);
IndexMetaData badIndex = newTestIndexMeta("bad",
Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.fromString("2.0.0")).build());
ClusterState clusterState = mockClusterState(goodIndex, badIndex);
Map<String, UpgradeActionRequired> result = service.upgradeInfo(new String[]{"good", "bad"},
IndicesOptions.lenientExpandOpen(), Collections.emptyMap(), clusterState);
assertThat(result.size(), equalTo(1));
assertThat(result.get("bad"), equalTo(UpgradeActionRequired.REINDEX));
}
private ClusterState mockClusterState(IndexMetaData... indices) {
MetaData.Builder metaDataBuilder = MetaData.builder();
for (IndexMetaData indexMetaData : indices) {

View File

@ -29,6 +29,7 @@ import org.elasticsearch.script.MockScriptPlugin;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptType;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.xpack.XPackPlugin;
import java.util.ArrayList;
@ -180,8 +181,10 @@ public class InternalIndexReindexerIT extends IndexUpgradeIntegTestCase {
}
private InternalIndexReindexer createIndexReindexer(int version, Script transformScript, String[] types) {
return new InternalIndexReindexer(client(), internalCluster().clusterService(internalCluster().getMasterName()),
version, transformScript, types);
return new InternalIndexReindexer<Void>(client(), internalCluster().clusterService(internalCluster().getMasterName()),
version, transformScript, types, voidActionListener -> voidActionListener.onResponse(null),
(aVoid, listener) -> listener.onResponse(TransportResponse.Empty.INSTANCE));
}
private ClusterState clusterState() {