Node repurpose tool (#39403)

When a node is repurposed to master/no-data or no-master/no-data, v7.x
will not start (see #37748 and #37347). The `elasticsearch repurpose`
tool can fix this by cleaning up the problematic data.
This commit is contained in:
Henning Andersen 2019-03-19 11:50:21 +01:00 committed by Henning Andersen
parent c46dd6ad08
commit dde41cc2dd
10 changed files with 756 additions and 36 deletions

View File

@ -354,7 +354,7 @@ public abstract class ArchiveTestCase extends PackagingTestCase {
Platforms.PlatformAction action = () -> {
final Result result = sh.run(bin.elasticsearchNode + " -h");
assertThat(result.stdout,
containsString("A CLI tool to unsafely recover a cluster after the permanent loss of too many master-eligible nodes"));
containsString("A CLI tool to do unsafe cluster and index manipulations on current node"));
};
if (distribution().equals(Distribution.DEFAULT_LINUX) || distribution().equals(Distribution.DEFAULT_WINDOWS)) {

View File

@ -18,7 +18,6 @@
*/
package org.elasticsearch.cluster.coordination;
import joptsimple.OptionSet;
import org.elasticsearch.cli.Terminal;
import org.elasticsearch.cluster.metadata.Manifest;
import org.elasticsearch.cluster.metadata.MetaData;
@ -47,17 +46,9 @@ public class DetachClusterCommand extends ElasticsearchNodeCommand {
super("Detaches this node from its cluster, allowing it to unsafely join a new cluster");
}
@Override
protected void execute(Terminal terminal, OptionSet options, Environment env) throws Exception {
super.execute(terminal, options, env);
processNodePathsWithLock(terminal, options, env);
terminal.println(NODE_DETACHED_MSG);
}
@Override
protected void processNodePaths(Terminal terminal, Path[] dataPaths) throws IOException {
protected void processNodePaths(Terminal terminal, Path[] dataPaths, Environment env) throws IOException {
final Tuple<Manifest, MetaData> manifestMetaDataTuple = loadMetaData(terminal, dataPaths);
final Manifest manifest = manifestMetaDataTuple.v1();
final MetaData metaData = manifestMetaDataTuple.v2();
@ -65,6 +56,8 @@ public class DetachClusterCommand extends ElasticsearchNodeCommand {
confirm(terminal, CONFIRMATION_MSG);
writeNewMetaData(terminal, manifest, updateCurrentTerm(), metaData, updateMetaData(metaData), dataPaths);
terminal.println(NODE_DETACHED_MSG);
}
// package-private for tests

View File

@ -51,14 +51,14 @@ public abstract class ElasticsearchNodeCommand extends EnvironmentAwareCommand {
"\n" +
" WARNING: Elasticsearch MUST be stopped before running this tool." +
"\n";
static final String FAILED_TO_OBTAIN_NODE_LOCK_MSG = "failed to lock node's directory, is Elasticsearch still running?";
protected static final String FAILED_TO_OBTAIN_NODE_LOCK_MSG = "failed to lock node's directory, is Elasticsearch still running?";
static final String NO_NODE_FOLDER_FOUND_MSG = "no node folder is found in data folder(s), node has not been started yet?";
static final String NO_MANIFEST_FILE_FOUND_MSG = "no manifest file is found, do you run pre 7.0 Elasticsearch?";
static final String GLOBAL_GENERATION_MISSING_MSG = "no metadata is referenced from the manifest file, cluster has never been " +
"bootstrapped?";
protected static final String GLOBAL_GENERATION_MISSING_MSG =
"no metadata is referenced from the manifest file, cluster has never been bootstrapped?";
static final String NO_GLOBAL_METADATA_MSG = "failed to find global metadata, metadata corrupted?";
static final String WRITE_METADATA_EXCEPTION_MSG = "exception occurred when writing new metadata to disk";
static final String ABORTED_BY_USER_MSG = "aborted by user";
protected static final String ABORTED_BY_USER_MSG = "aborted by user";
final OptionSpec<Integer> nodeOrdinalOption;
public ElasticsearchNodeCommand(String description) {
@ -80,7 +80,7 @@ public abstract class ElasticsearchNodeCommand extends EnvironmentAwareCommand {
if (dataPaths.length == 0) {
throw new ElasticsearchException(NO_NODE_FOLDER_FOUND_MSG);
}
processNodePaths(terminal, dataPaths);
processNodePaths(terminal, dataPaths, env);
} catch (LockObtainFailedException ex) {
throw new ElasticsearchException(
FAILED_TO_OBTAIN_NODE_LOCK_MSG + " [" + ex.getMessage() + "]");
@ -116,11 +116,31 @@ public abstract class ElasticsearchNodeCommand extends EnvironmentAwareCommand {
}
@Override
protected void execute(Terminal terminal, OptionSet options, Environment env) throws Exception {
protected final void execute(Terminal terminal, OptionSet options, Environment env) throws Exception {
terminal.println(STOP_WARNING_MSG);
if (validateBeforeLock(terminal, env)) {
processNodePathsWithLock(terminal, options, env);
}
}
protected abstract void processNodePaths(Terminal terminal, Path[] dataPaths) throws IOException;
/**
* Validate that the command can run before taking any locks.
* @param terminal the terminal to print to
* @param env the env to validate.
* @return true to continue, false to stop (must print message in validate).
*/
protected boolean validateBeforeLock(Terminal terminal, Environment env) {
return true;
}
/**
* Process the paths. Locks for the paths is held during this method invocation.
* @param terminal the terminal to use for messages
* @param dataPaths the paths of the node to process
* @param env the env of the node to process
*/
protected abstract void processNodePaths(Terminal terminal, Path[] dataPaths, Environment env) throws IOException;
protected void writeNewMetaData(Terminal terminal, Manifest oldManifest, long newCurrentTerm,

View File

@ -21,6 +21,7 @@ package org.elasticsearch.cluster.coordination;
import org.elasticsearch.cli.CommandLoggingConfigurator;
import org.elasticsearch.cli.MultiCommand;
import org.elasticsearch.cli.Terminal;
import org.elasticsearch.env.NodeRepurposeCommand;
// NodeToolCli does not extend LoggingAwareCommand, because LoggingAwareCommand performs logging initialization
// after LoggingAwareCommand instance is constructed.
@ -32,10 +33,12 @@ import org.elasticsearch.cli.Terminal;
public class NodeToolCli extends MultiCommand {
public NodeToolCli() {
super("A CLI tool to unsafely recover a cluster after the permanent loss of too many master-eligible nodes", ()->{});
super("A CLI tool to do unsafe cluster and index manipulations on current node",
()->{});
CommandLoggingConfigurator.configureLoggingWithoutConfig();
subcommands.put("unsafe-bootstrap", new UnsafeBootstrapMasterCommand());
subcommands.put("detach-cluster", new DetachClusterCommand());
subcommands.put("repurpose", new NodeRepurposeCommand());
}
public static void main(String[] args) throws Exception {

View File

@ -18,7 +18,6 @@
*/
package org.elasticsearch.cluster.coordination;
import joptsimple.OptionSet;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
@ -72,9 +71,7 @@ public class UnsafeBootstrapMasterCommand extends ElasticsearchNodeCommand {
}
@Override
protected void execute(Terminal terminal, OptionSet options, Environment env) throws Exception {
super.execute(terminal, options, env);
protected boolean validateBeforeLock(Terminal terminal, Environment env) {
Settings settings = env.settings();
terminal.println(Terminal.Verbosity.VERBOSE, "Checking node.master setting");
Boolean master = Node.NODE_MASTER_SETTING.get(settings);
@ -82,12 +79,10 @@ public class UnsafeBootstrapMasterCommand extends ElasticsearchNodeCommand {
throw new ElasticsearchException(NOT_MASTER_NODE_MSG);
}
processNodePathsWithLock(terminal, options, env);
terminal.println(MASTER_NODE_BOOTSTRAPPED_MSG);
return true;
}
protected void processNodePaths(Terminal terminal, Path[] dataPaths) throws IOException {
protected void processNodePaths(Terminal terminal, Path[] dataPaths, Environment env) throws IOException {
terminal.println(Terminal.Verbosity.VERBOSE, "Loading node metadata");
final NodeMetaData nodeMetaData = NodeMetaData.FORMAT.loadLatestState(logger, namedXContentRegistry, dataPaths);
if (nodeMetaData == null) {
@ -130,5 +125,7 @@ public class UnsafeBootstrapMasterCommand extends ElasticsearchNodeCommand {
.build();
writeNewMetaData(terminal, manifest, manifest.getCurrentTerm(), metaData, newMetaData, dataPaths);
terminal.println(MASTER_NODE_BOOTSTRAPPED_MSG);
}
}

View File

@ -128,7 +128,11 @@ public final class NodeEnvironment implements Closeable {
* ${data.paths}/nodes/{node.id}/indices/{index.uuid}
*/
public Path resolve(Index index) {
return indicesPath.resolve(index.getUUID());
return resolve(index.getUUID());
}
Path resolve(String uuid) {
return indicesPath.resolve(uuid);
}
@Override
@ -1050,28 +1054,48 @@ public final class NodeEnvironment implements Closeable {
}
private void ensureNoShardData(final NodePath[] nodePaths) throws IOException {
List<Path> shardDataPaths = collectIndexSubPaths(nodePaths, this::isShardPath);
List<Path> shardDataPaths = collectShardDataPaths(nodePaths);
if (shardDataPaths.isEmpty() == false) {
throw new IllegalStateException("Node is started with "
+ Node.NODE_DATA_SETTING.getKey()
+ "=false, but has shard data: "
+ shardDataPaths);
+ shardDataPaths
+ ". Use 'elasticsearch-node repurpose' tool to clean up"
);
}
}
private void ensureNoIndexMetaData(final NodePath[] nodePaths) throws IOException {
List<Path> indexMetaDataPaths = collectIndexSubPaths(nodePaths, this::isIndexMetaDataPath);
List<Path> indexMetaDataPaths = collectIndexMetaDataPaths(nodePaths);
if (indexMetaDataPaths.isEmpty() == false) {
throw new IllegalStateException("Node is started with "
+ Node.NODE_DATA_SETTING.getKey()
+ "=false and "
+ Node.NODE_MASTER_SETTING.getKey()
+ "=false, but has index metadata: "
+ indexMetaDataPaths);
+ indexMetaDataPaths
+ ". Use 'elasticsearch-node repurpose' tool to clean up"
);
}
}
private List<Path> collectIndexSubPaths(NodePath[] nodePaths, Predicate<Path> subPathPredicate) throws IOException {
/**
* Collect the paths containing shard data in the indicated node paths. The returned paths will point to the shard data folder.
*/
static List<Path> collectShardDataPaths(NodePath[] nodePaths) throws IOException {
return collectIndexSubPaths(nodePaths, NodeEnvironment::isShardPath);
}
/**
* Collect the paths containing index meta data in the indicated node paths. The returned paths will point to the
* {@link MetaDataStateFormat#STATE_DIR_NAME} folder
*/
static List<Path> collectIndexMetaDataPaths(NodePath[] nodePaths) throws IOException {
return collectIndexSubPaths(nodePaths, NodeEnvironment::isIndexMetaDataPath);
}
private static List<Path> collectIndexSubPaths(NodePath[] nodePaths, Predicate<Path> subPathPredicate) throws IOException {
List<Path> indexSubPaths = new ArrayList<>();
for (NodePath nodePath : nodePaths) {
Path indicesPath = nodePath.indicesPath;
@ -1093,12 +1117,12 @@ public final class NodeEnvironment implements Closeable {
return indexSubPaths;
}
private boolean isShardPath(Path path) {
private static boolean isShardPath(Path path) {
return Files.isDirectory(path)
&& path.getFileName().toString().chars().allMatch(Character::isDigit);
}
private boolean isIndexMetaDataPath(Path path) {
private static boolean isIndexMetaDataPath(Path path) {
return Files.isDirectory(path)
&& path.getFileName().toString().equals(MetaDataStateFormat.STATE_DIR_NAME);
}

View File

@ -0,0 +1,241 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.env;
import joptsimple.OptionParser;
import joptsimple.OptionSet;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cli.Terminal;
import org.elasticsearch.cluster.coordination.ElasticsearchNodeCommand;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.Manifest;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.gateway.WriteStateException;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
public class NodeRepurposeCommand extends ElasticsearchNodeCommand {
private static final Logger logger = LogManager.getLogger(NodeRepurposeCommand.class);
static final String ABORTED_BY_USER_MSG = ElasticsearchNodeCommand.ABORTED_BY_USER_MSG;
static final String FAILED_TO_OBTAIN_NODE_LOCK_MSG = ElasticsearchNodeCommand.FAILED_TO_OBTAIN_NODE_LOCK_MSG;
static final String NO_CLEANUP = "Node has node.data=true -> no clean up necessary";
static final String NO_DATA_TO_CLEAN_UP_FOUND = "No data to clean-up found";
static final String NO_SHARD_DATA_TO_CLEAN_UP_FOUND = "No shard data to clean-up found";
static final String PRE_V7_MESSAGE =
"No manifest file found. If you were previously running this node on Elasticsearch version 6, please proceed.\n" +
"If this node was ever started on Elasticsearch version 7 or higher, it might mean metadata corruption, please abort.";
public NodeRepurposeCommand() {
super("Repurpose this node to another master/data role, cleaning up any excess persisted data");
}
void testExecute(Terminal terminal, OptionSet options, Environment env) throws Exception {
execute(terminal, options, env);
}
@Override
protected boolean validateBeforeLock(Terminal terminal, Environment env) {
Settings settings = env.settings();
if (DiscoveryNode.isDataNode(settings)) {
terminal.println(Terminal.Verbosity.NORMAL, NO_CLEANUP);
return false;
}
return true;
}
@Override
protected void processNodePaths(Terminal terminal, Path[] dataPaths, Environment env) throws IOException {
assert DiscoveryNode.isDataNode(env.settings()) == false;
if (DiscoveryNode.isMasterNode(env.settings()) == false) {
processNoMasterNoDataNode(terminal, dataPaths);
} else {
processMasterNoDataNode(terminal, dataPaths);
}
}
private void processNoMasterNoDataNode(Terminal terminal, Path[] dataPaths) throws IOException {
NodeEnvironment.NodePath[] nodePaths = toNodePaths(dataPaths);
terminal.println(Terminal.Verbosity.VERBOSE, "Collecting shard data paths");
List<Path> shardDataPaths = NodeEnvironment.collectShardDataPaths(nodePaths);
terminal.println(Terminal.Verbosity.VERBOSE, "Collecting index metadata paths");
List<Path> indexMetaDataPaths = NodeEnvironment.collectIndexMetaDataPaths(nodePaths);
Set<Path> indexPaths = uniqueParentPaths(shardDataPaths, indexMetaDataPaths);
if (indexPaths.isEmpty()) {
terminal.println(Terminal.Verbosity.NORMAL, NO_DATA_TO_CLEAN_UP_FOUND);
return;
}
Set<String> indexUUIDs = indexUUIDsFor(indexPaths);
outputVerboseInformation(terminal, nodePaths, indexPaths, indexUUIDs);
terminal.println(noMasterMessage(indexUUIDs.size(), shardDataPaths.size(), indexMetaDataPaths.size()));
outputHowToSeeVerboseInformation(terminal);
final Manifest manifest = loadManifest(terminal, dataPaths);
terminal.println("Node is being re-purposed as no-master and no-data. Clean-up of index data will be performed.");
confirm(terminal, "Do you want to proceed?");
if (manifest != null) {
rewriteManifest(terminal, manifest, dataPaths);
}
removePaths(terminal, indexPaths);
terminal.println("Node successfully repurposed to no-master and no-data.");
}
private void processMasterNoDataNode(Terminal terminal, Path[] dataPaths) throws IOException {
NodeEnvironment.NodePath[] nodePaths = toNodePaths(dataPaths);
terminal.println(Terminal.Verbosity.VERBOSE, "Collecting shard data paths");
List<Path> shardDataPaths = NodeEnvironment.collectShardDataPaths(nodePaths);
if (shardDataPaths.isEmpty()) {
terminal.println(NO_SHARD_DATA_TO_CLEAN_UP_FOUND);
return;
}
Set<Path> indexPaths = uniqueParentPaths(shardDataPaths);
Set<String> indexUUIDs = indexUUIDsFor(indexPaths);
outputVerboseInformation(terminal, nodePaths, shardDataPaths, indexUUIDs);
terminal.println(shardMessage(shardDataPaths.size(), indexUUIDs.size()));
outputHowToSeeVerboseInformation(terminal);
terminal.println("Node is being re-purposed as master and no-data. Clean-up of shard data will be performed.");
confirm(terminal, "Do you want to proceed?");
removePaths(terminal, shardDataPaths);
terminal.println("Node successfully repurposed to master and no-data.");
}
private void outputVerboseInformation(Terminal terminal, NodeEnvironment.NodePath[] nodePaths,
Collection<Path> pathsToCleanup, Set<String> indexUUIDs) {
if (terminal.isPrintable(Terminal.Verbosity.VERBOSE)) {
terminal.println(Terminal.Verbosity.VERBOSE, "Paths to clean up:");
pathsToCleanup.forEach(p -> terminal.println(Terminal.Verbosity.VERBOSE, " " + p.toString()));
terminal.println(Terminal.Verbosity.VERBOSE, "Indices affected:");
indexUUIDs.forEach(uuid -> terminal.println(Terminal.Verbosity.VERBOSE, " " + toIndexName(nodePaths, uuid)));
}
}
private void outputHowToSeeVerboseInformation(Terminal terminal) {
if (terminal.isPrintable(Terminal.Verbosity.VERBOSE) == false) {
terminal.println("Use -v to see list of paths and indices affected");
}
}
private String toIndexName(NodeEnvironment.NodePath[] nodePaths, String uuid) {
Path[] indexPaths = new Path[nodePaths.length];
for (int i = 0; i < nodePaths.length; i++) {
indexPaths[i] = nodePaths[i].resolve(uuid);
}
try {
IndexMetaData metaData = IndexMetaData.FORMAT.loadLatestState(logger, namedXContentRegistry, indexPaths);
return metaData.getIndex().getName();
} catch (Exception e) {
return "no name for uuid: " + uuid + ": " + e;
}
}
private NodeEnvironment.NodePath[] toNodePaths(Path[] dataPaths) {
return Arrays.stream(dataPaths).map(NodeRepurposeCommand::createNodePath).toArray(NodeEnvironment.NodePath[]::new);
}
private Set<String> indexUUIDsFor(Set<Path> indexPaths) {
return indexPaths.stream().map(Path::getFileName).map(Path::toString).collect(Collectors.toSet());
}
static String noMasterMessage(int indexes, int shards, int indexMetaData) {
return "Found " + indexes + " indices ("
+ shards + " shards and " + indexMetaData + " index meta data) to clean up";
}
static String shardMessage(int shards, int indices) {
return "Found " + shards + " shards in " + indices + " indices to clean up";
}
private void rewriteManifest(Terminal terminal, Manifest manifest, Path[] dataPaths) throws WriteStateException {
terminal.println(Terminal.Verbosity.VERBOSE, "Re-writing manifest");
Manifest newManifest = new Manifest(manifest.getCurrentTerm(), manifest.getClusterStateVersion(), manifest.getGlobalGeneration(),
new HashMap<>());
Manifest.FORMAT.writeAndCleanup(newManifest, dataPaths);
}
private Manifest loadManifest(Terminal terminal, Path[] dataPaths) throws IOException {
terminal.println(Terminal.Verbosity.VERBOSE, "Loading manifest");
final Manifest manifest = Manifest.FORMAT.loadLatestState(logger, namedXContentRegistry, dataPaths);
if (manifest == null) {
terminal.println(Terminal.Verbosity.SILENT, PRE_V7_MESSAGE);
}
return manifest;
}
private void removePaths(Terminal terminal, Collection<Path> paths) {
terminal.println(Terminal.Verbosity.VERBOSE, "Removing data");
paths.forEach(this::removePath);
}
private void removePath(Path path) {
try {
IOUtils.rm(path);
} catch (IOException e) {
throw new ElasticsearchException("Unable to clean up path: " + path + ": " + e.getMessage());
}
}
@SafeVarargs
@SuppressWarnings("varargs")
private final Set<Path> uniqueParentPaths(Collection<Path>... paths) {
// equals on Path is good enough here due to the way these are collected.
return Arrays.stream(paths).flatMap(Collection::stream).map(Path::getParent).collect(Collectors.toSet());
}
private static NodeEnvironment.NodePath createNodePath(Path path) {
try {
return new NodeEnvironment.NodePath(path);
} catch (IOException e) {
throw new ElasticsearchException("Unable to investigate path: " + path + ": " + e.getMessage());
}
}
//package-private for testing
OptionParser getParser() {
return parser;
}
}

View File

@ -50,7 +50,7 @@ import static org.hamcrest.Matchers.equalTo;
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, autoMinMasterNodes = false)
@TestLogging("_root:DEBUG,org.elasticsearch.cluster.service:TRACE,org.elasticsearch.cluster.coordination:TRACE")
public class ElasticsearchNodeCommandIT extends ESIntegTestCase {
public class UnsafeBootstrapAndDetachCommandIT extends ESIntegTestCase {
private MockTerminal executeCommand(ElasticsearchNodeCommand command, Environment environment, int nodeOrdinal, boolean abort)
throws Exception {

View File

@ -0,0 +1,116 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.env;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.NoShardAvailableActionException;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.node.Node;
import org.elasticsearch.test.ESIntegTestCase;
import org.hamcrest.Matcher;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.not;
import static org.mockito.Matchers.contains;
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
public class NodeRepurposeCommandIT extends ESIntegTestCase {
public void testRepurpose() throws Exception {
final String indexName = "test-repurpose";
logger.info("--> starting two nodes");
internalCluster().startMasterOnlyNode();
internalCluster().startDataOnlyNode();
logger.info("--> creating index");
prepareCreate(indexName, Settings.builder()
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", 0)
).get();
final String indexUUID = resolveIndex(indexName).getUUID();
logger.info("--> indexing a simple document");
client().prepareIndex(indexName, "type1", "1").setSource("field1", "value1").get();
ensureGreen();
assertTrue(client().prepareGet(indexName, "type1", "1").get().isExists());
final Settings noMasterNoDataSettings = Settings.builder()
.put(Node.NODE_DATA_SETTING.getKey(), false)
.put(Node.NODE_MASTER_SETTING.getKey(), false)
.build();
internalCluster().stopRandomDataNode();
// verify test setup
logger.info("--> restarting node with node.data=false and node.master=false");
IllegalStateException ex = expectThrows(IllegalStateException.class,
"Node started with node.data=false and node.master=false while having existing index metadata must fail",
() -> internalCluster().startCoordinatingOnlyNode(Settings.EMPTY)
);
logger.info("--> Repurposing node 1");
executeRepurposeCommandForOrdinal(noMasterNoDataSettings, indexUUID, 1, 1);
ElasticsearchException lockedException = expectThrows(ElasticsearchException.class,
() -> executeRepurposeCommandForOrdinal(noMasterNoDataSettings, indexUUID, 0, 1)
);
assertThat(lockedException.getMessage(), containsString(NodeRepurposeCommand.FAILED_TO_OBTAIN_NODE_LOCK_MSG));
logger.info("--> Starting node after repurpose");
internalCluster().startCoordinatingOnlyNode(Settings.EMPTY);
assertTrue(indexExists(indexName));
expectThrows(NoShardAvailableActionException.class, () -> client().prepareGet(indexName, "type1", "1").get());
logger.info("--> Restarting and repurposing other node");
internalCluster().stopRandomNode(s -> true);
internalCluster().stopRandomNode(s -> true);
executeRepurposeCommandForOrdinal(noMasterNoDataSettings, indexUUID, 0, 0);
// by restarting as master and data node, we can check that the index definition was really deleted and also that the tool
// does not mess things up so much that the nodes cannot boot as master or data node any longer.
internalCluster().startMasterOnlyNode();
internalCluster().startDataOnlyNode();
ensureGreen();
// index is gone.
assertFalse(indexExists(indexName));
}
private void executeRepurposeCommandForOrdinal(Settings settings, String indexUUID, int ordinal,
int expectedShardCount) throws Exception {
boolean verbose = randomBoolean();
Settings settingsWithPath = Settings.builder().put(internalCluster().getDefaultSettings()).put(settings).build();
int expectedIndexCount = TestEnvironment.newEnvironment(settingsWithPath).dataFiles().length;
Matcher<String> matcher = allOf(
containsString(NodeRepurposeCommand.noMasterMessage(1, expectedShardCount, expectedIndexCount)),
not(contains(NodeRepurposeCommand.PRE_V7_MESSAGE)),
NodeRepurposeCommandTests.conditionalNot(containsString(indexUUID), verbose == false));
NodeRepurposeCommandTests.verifySuccess(settingsWithPath, matcher,
verbose, ordinal);
}
}

View File

@ -0,0 +1,326 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.env;
import joptsimple.OptionSet;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.cli.MockTerminal;
import org.elasticsearch.cli.Terminal;
import org.elasticsearch.cluster.ClusterModule;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.Manifest;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.CheckedRunnable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.index.Index;
import org.elasticsearch.node.Node;
import org.elasticsearch.test.ESTestCase;
import org.hamcrest.Matcher;
import org.junit.Before;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collections;
import java.util.stream.Stream;
import static org.elasticsearch.env.NodeRepurposeCommand.NO_CLEANUP;
import static org.elasticsearch.env.NodeRepurposeCommand.NO_DATA_TO_CLEAN_UP_FOUND;
import static org.elasticsearch.env.NodeRepurposeCommand.NO_SHARD_DATA_TO_CLEAN_UP_FOUND;
import static org.elasticsearch.env.NodeRepurposeCommand.PRE_V7_MESSAGE;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.not;
public class NodeRepurposeCommandTests extends ESTestCase {
private static final Index INDEX = new Index("testIndex", "testUUID");
private Settings dataMasterSettings;
private Environment environment;
private Path[] nodePaths;
private Settings dataNoMasterSettings;
private Settings noDataNoMasterSettings;
private Settings noDataMasterSettings;
@Before
public void createNodePaths() throws IOException {
dataMasterSettings = buildEnvSettings(Settings.EMPTY);
environment = TestEnvironment.newEnvironment(dataMasterSettings);
try (NodeEnvironment nodeEnvironment = new NodeEnvironment(dataMasterSettings, environment)) {
nodePaths = nodeEnvironment.nodeDataPaths();
}
dataNoMasterSettings = Settings.builder()
.put(dataMasterSettings)
.put(Node.NODE_MASTER_SETTING.getKey(), false)
.build();
noDataNoMasterSettings = Settings.builder()
.put(dataMasterSettings)
.put(Node.NODE_DATA_SETTING.getKey(), false)
.put(Node.NODE_MASTER_SETTING.getKey(), false)
.build();
noDataMasterSettings = Settings.builder()
.put(dataMasterSettings)
.put(Node.NODE_DATA_SETTING.getKey(), false)
.put(Node.NODE_MASTER_SETTING.getKey(), true)
.build();
}
public void testEarlyExitNoCleanup() throws Exception {
createIndexDataFiles(dataMasterSettings, randomInt(10));
verifyNoQuestions(dataMasterSettings, containsString(NO_CLEANUP));
verifyNoQuestions(dataNoMasterSettings, containsString(NO_CLEANUP));
}
public void testNothingToCleanup() throws Exception {
verifyNoQuestions(noDataNoMasterSettings, allOf(containsString(NO_DATA_TO_CLEAN_UP_FOUND), not(containsString(PRE_V7_MESSAGE))));
verifyNoQuestions(noDataMasterSettings,
allOf(containsString(NO_SHARD_DATA_TO_CLEAN_UP_FOUND), not(containsString(PRE_V7_MESSAGE))));
createManifest(null);
verifyNoQuestions(noDataNoMasterSettings, allOf(containsString(NO_DATA_TO_CLEAN_UP_FOUND), not(containsString(PRE_V7_MESSAGE))));
verifyNoQuestions(noDataMasterSettings,
allOf(containsString(NO_SHARD_DATA_TO_CLEAN_UP_FOUND), not(containsString(PRE_V7_MESSAGE))));
createIndexDataFiles(dataMasterSettings, 0);
verifyNoQuestions(noDataMasterSettings,
allOf(containsString(NO_SHARD_DATA_TO_CLEAN_UP_FOUND), not(containsString(PRE_V7_MESSAGE))));
}
public void testLocked() throws IOException {
try (NodeEnvironment env = new NodeEnvironment(dataMasterSettings, TestEnvironment.newEnvironment(dataMasterSettings))) {
assertThat(expectThrows(ElasticsearchException.class,
() -> verifyNoQuestions(noDataNoMasterSettings, null)).getMessage(),
containsString(NodeRepurposeCommand.FAILED_TO_OBTAIN_NODE_LOCK_MSG));
}
}
public void testCleanupAll() throws Exception {
Manifest oldManifest = createManifest(INDEX);
checkCleanupAll(not(containsString(PRE_V7_MESSAGE)));
Manifest newManifest = loadManifest();
assertThat(newManifest.getIndexGenerations().entrySet(), hasSize(0));
assertManifestIdenticalExceptIndices(oldManifest, newManifest);
}
public void testCleanupAllPreV7() throws Exception {
checkCleanupAll(containsString(PRE_V7_MESSAGE));
}
private void checkCleanupAll(Matcher<String> additionalOutputMatcher) throws Exception {
int shardCount = randomInt(10);
boolean verbose = randomBoolean();
createIndexDataFiles(dataMasterSettings, shardCount);
String messageText = NodeRepurposeCommand.noMasterMessage(
1,
environment.dataFiles().length*shardCount,
environment.dataFiles().length);
Matcher<String> outputMatcher = allOf(
containsString(messageText),
additionalOutputMatcher,
conditionalNot(containsString("testUUID"), verbose == false),
conditionalNot(containsString("testIndex"), verbose == false)
);
verifyUnchangedOnAbort(noDataNoMasterSettings, outputMatcher, verbose);
// verify test setup
expectThrows(IllegalStateException.class, () -> new NodeEnvironment(noDataNoMasterSettings, environment).close());
verifySuccess(noDataNoMasterSettings, outputMatcher, verbose);
//verify cleaned.
new NodeEnvironment(noDataNoMasterSettings, environment).close();
}
public void testCleanupShardData() throws Exception {
int shardCount = randomIntBetween(1, 10);
boolean verbose = randomBoolean();
Manifest manifest = randomBoolean() ? createManifest(INDEX) : null;
createIndexDataFiles(dataMasterSettings, shardCount);
Matcher<String> matcher = allOf(
containsString(NodeRepurposeCommand.shardMessage(environment.dataFiles().length * shardCount, 1)),
conditionalNot(containsString("testUUID"), verbose == false),
conditionalNot(containsString("testIndex"), verbose == false)
);
verifyUnchangedOnAbort(noDataMasterSettings,
matcher, verbose);
// verify test setup
expectThrows(IllegalStateException.class, () -> new NodeEnvironment(noDataMasterSettings, environment).close());
verifySuccess(noDataMasterSettings, matcher, verbose);
//verify clean.
new NodeEnvironment(noDataMasterSettings, environment).close();
if (manifest != null) {
Manifest newManifest = loadManifest();
assertThat(newManifest.getIndexGenerations().entrySet(), hasSize(1));
assertManifestIdenticalExceptIndices(manifest, newManifest);
}
}
private static void verifySuccess(Settings settings, Matcher<String> outputMatcher, boolean verbose) throws Exception {
verifySuccess(settings, outputMatcher, verbose, 0);
}
static void verifySuccess(Settings settings, Matcher<String> outputMatcher, boolean verbose, int ordinal) throws Exception {
withTerminal(verbose, outputMatcher, terminal -> {
terminal.addTextInput(randomFrom("y", "Y"));
executeRepurposeCommand(terminal, settings, ordinal);
assertThat(terminal.getOutput(), containsString("Node successfully repurposed"));
});
}
private void verifyUnchangedOnAbort(Settings settings, Matcher<String> outputMatcher, boolean verbose) throws Exception {
withTerminal(verbose, outputMatcher, terminal -> {
terminal.addTextInput(randomFrom("yy", "Yy", "n", "yes", "true", "N", "no"));
verifyUnchangedDataFiles(() -> {
ElasticsearchException exception = expectThrows(ElasticsearchException.class,
() -> executeRepurposeCommand(terminal, settings, 0));
assertThat(exception.getMessage(), containsString(NodeRepurposeCommand.ABORTED_BY_USER_MSG));
});
});
}
private void verifyNoQuestions(Settings settings, Matcher<String> outputMatcher) throws Exception {
withTerminal(false, outputMatcher, terminal -> {
executeRepurposeCommand(terminal, settings, 0);
});
}
private static void withTerminal(boolean verbose, Matcher<String> outputMatcher,
CheckedConsumer<MockTerminal, Exception> consumer) throws Exception {
MockTerminal terminal = new MockTerminal();
if (verbose) {
terminal.setVerbosity(Terminal.Verbosity.VERBOSE);
}
consumer.accept(terminal);
assertThat(terminal.getOutput(), outputMatcher);
expectThrows(IllegalStateException.class, "Must consume input", () -> terminal.readText(""));
}
private static void executeRepurposeCommand(MockTerminal terminal, Settings settings, int ordinal) throws Exception {
NodeRepurposeCommand nodeRepurposeCommand = new NodeRepurposeCommand();
OptionSet options = nodeRepurposeCommand.getParser()
.parse(ordinal != 0 ? new String[]{"--ordinal", Integer.toString(ordinal)} : new String[0]);
Environment env = TestEnvironment.newEnvironment(settings);
nodeRepurposeCommand.testExecute(terminal, options, env);
}
private Manifest createManifest(Index index) throws org.elasticsearch.gateway.WriteStateException {
Manifest manifest = new Manifest(randomIntBetween(1,100), randomIntBetween(1,100), randomIntBetween(1,100),
index != null ? Collections.singletonMap(index, randomLongBetween(1,100)) : Collections.emptyMap());
Manifest.FORMAT.writeAndCleanup(manifest, nodePaths);
return manifest;
}
private Manifest loadManifest() throws IOException {
return Manifest.FORMAT.loadLatestState(logger, new NamedXContentRegistry(ClusterModule.getNamedXWriteables()), nodePaths);
}
private void assertManifestIdenticalExceptIndices(Manifest oldManifest, Manifest newManifest) {
assertEquals(oldManifest.getGlobalGeneration(), newManifest.getGlobalGeneration());
assertEquals(oldManifest.getClusterStateVersion(), newManifest.getClusterStateVersion());
assertEquals(oldManifest.getCurrentTerm(), newManifest.getCurrentTerm());
}
private void createIndexDataFiles(Settings settings, int shardCount) throws IOException {
int shardDataDirNumber = randomInt(10);
try (NodeEnvironment env = new NodeEnvironment(settings, TestEnvironment.newEnvironment(settings))) {
IndexMetaData.FORMAT.write(IndexMetaData.builder(INDEX.getName())
.settings(Settings.builder().put("index.version.created", Version.CURRENT))
.numberOfShards(1)
.numberOfReplicas(1)
.build(), env.indexPaths(INDEX));
for (Path path : env.indexPaths(INDEX)) {
for (int i = 0; i < shardCount; ++i) {
Files.createDirectories(path.resolve(Integer.toString(shardDataDirNumber)));
shardDataDirNumber += randomIntBetween(1,10);
}
}
}
}
private void verifyUnchangedDataFiles(CheckedRunnable<? extends Exception> runnable) throws Exception {
long before = digestPaths();
runnable.run();
long after = digestPaths();
assertEquals("Must not touch files", before, after);
}
private long digestPaths() {
// use a commutative digest to avoid dependency on file system order.
return Arrays.stream(environment.dataFiles()).mapToLong(this::digestPath).sum();
}
private long digestPath(Path path) {
try (Stream<Path> paths = Files.walk(path)) {
return paths.mapToLong(this::digestSinglePath).sum();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
private long digestSinglePath(Path path) {
if (Files.isDirectory(path))
return path.toString().hashCode();
else
return path.toString().hashCode() + digest(readAllBytes(path));
}
private byte[] readAllBytes(Path path) {
try {
return Files.readAllBytes(path);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
private long digest(byte[] bytes) {
long result = 0;
for (byte b : bytes) {
result *= 31;
result += b;
}
return result;
}
static <T> Matcher<T> conditionalNot(Matcher<T> matcher, boolean condition) {
return condition ? not(matcher) : matcher;
}
}