From ac53c90ff44dd0dde8b497e5875567bad4abd4cc Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Thu, 30 Jun 2016 16:40:37 -0600 Subject: [PATCH] Add 'elasticsearch-translog' CLI tool with 'translog' command MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This adds the `bin/elasticsearch-translate` bin file that will be used for CLI tasks pertaining to Elasticsearch. Currently it implements only a single sub-command, `truncate-translog`, that creates a truncated translog for a given folder. Here's what running the tool looks like: ``` λ bin/elasticsearch-translog truncate -d data/nodes/0/indices/P45vf_YQRhqjfwLMUvSqDw/0/translog/ Checking existing translog files !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! ! WARNING: Elasticsearch MUST be stopped before running this tool ! ! ! ! WARNING: Documents inside of translog files will be lost ! ! ! ! WARNING: The following files will be DELETED! ! !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! --> data/nodes/0/indices/P45vf_YQRhqjfwLMUvSqDw/0/translog/translog-10.tlog --> data/nodes/0/indices/P45vf_YQRhqjfwLMUvSqDw/0/translog/translog-18.tlog --> data/nodes/0/indices/P45vf_YQRhqjfwLMUvSqDw/0/translog/translog-21.tlog --> data/nodes/0/indices/P45vf_YQRhqjfwLMUvSqDw/0/translog/translog-12.ckp --> data/nodes/0/indices/P45vf_YQRhqjfwLMUvSqDw/0/translog/translog-25.ckp --> data/nodes/0/indices/P45vf_YQRhqjfwLMUvSqDw/0/translog/translog-29.tlog --> data/nodes/0/indices/P45vf_YQRhqjfwLMUvSqDw/0/translog/translog-2.tlog --> data/nodes/0/indices/P45vf_YQRhqjfwLMUvSqDw/0/translog/translog-5.tlog --> data/nodes/0/indices/P45vf_YQRhqjfwLMUvSqDw/0/translog/translog-41.ckp --> data/nodes/0/indices/P45vf_YQRhqjfwLMUvSqDw/0/translog/translog-6.ckp --> data/nodes/0/indices/P45vf_YQRhqjfwLMUvSqDw/0/translog/translog-37.ckp --> data/nodes/0/indices/P45vf_YQRhqjfwLMUvSqDw/0/translog/translog-24.ckp --> data/nodes/0/indices/P45vf_YQRhqjfwLMUvSqDw/0/translog/translog-11.ckp Continue and DELETE files? [y/N] y Reading translog UUID information from Lucene commit from shard at [data/nodes/0/indices/P45vf_YQRhqjfwLMUvSqDw/0/index] Translog Generation: 3 Translog UUID : AxqC4rocTC6e0fwsljAh-Q Removing existing translog files Creating new empty checkpoint at [data/nodes/0/indices/P45vf_YQRhqjfwLMUvSqDw/0/translog/translog.ckp] Creating new empty translog at [data/nodes/0/indices/P45vf_YQRhqjfwLMUvSqDw/0/translog/translog-3.tlog] Done. ``` It also includes a `-b` batch operation that can be used to skip the confirmation diaglog. Resolves #19123 --- .../index/translog/Checkpoint.java | 2 +- .../index/translog/TranslogReader.java | 4 +- .../index/translog/TranslogToolCli.java | 56 +++++ .../index/translog/TranslogWriter.java | 12 +- .../translog/TruncateTranslogCommand.java | 224 +++++++++++++++++ .../index/translog/TruncateTranslogIT.java | 236 ++++++++++++++++++ .../main/resources/bin/elasticsearch-translog | 90 +++++++ .../resources/bin/elasticsearch-translog.bat | 61 +++++ 8 files changed, 679 insertions(+), 6 deletions(-) create mode 100644 core/src/main/java/org/elasticsearch/index/translog/TranslogToolCli.java create mode 100644 core/src/main/java/org/elasticsearch/index/translog/TruncateTranslogCommand.java create mode 100644 core/src/test/java/org/elasticsearch/index/translog/TruncateTranslogIT.java create mode 100755 distribution/src/main/resources/bin/elasticsearch-translog create mode 100644 distribution/src/main/resources/bin/elasticsearch-translog.bat diff --git a/core/src/main/java/org/elasticsearch/index/translog/Checkpoint.java b/core/src/main/java/org/elasticsearch/index/translog/Checkpoint.java index f630ba3faba..0fd59090944 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/Checkpoint.java +++ b/core/src/main/java/org/elasticsearch/index/translog/Checkpoint.java @@ -61,7 +61,7 @@ class Checkpoint { Channels.writeToChannel(buffer, channel); } - private void write(DataOutput out) throws IOException { + void write(DataOutput out) throws IOException { out.writeLong(offset); out.writeInt(numOps); out.writeLong(generation); diff --git a/core/src/main/java/org/elasticsearch/index/translog/TranslogReader.java b/core/src/main/java/org/elasticsearch/index/translog/TranslogReader.java index fcb3daea796..581e8d6a903 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/TranslogReader.java +++ b/core/src/main/java/org/elasticsearch/index/translog/TranslogReader.java @@ -113,7 +113,9 @@ public class TranslogReader extends BaseTranslogReader implements Closeable { headerStream.read(ref.bytes, ref.offset, ref.length); BytesRef uuidBytes = new BytesRef(translogUUID); if (uuidBytes.bytesEquals(ref) == false) { - throw new TranslogCorruptedException("expected shard UUID [" + uuidBytes + "] but got: [" + ref + "] this translog file belongs to a different translog. path:" + path); + throw new TranslogCorruptedException("expected shard UUID " + uuidBytes + "/" + uuidBytes.utf8ToString() + + " but got: " + ref + "/" + ref.utf8ToString() + + " this translog file belongs to a different translog. path:" + path); } return new TranslogReader(checkpoint.generation, channel, path, ref.length + CodecUtil.headerLength(TranslogWriter.TRANSLOG_CODEC) + Integer.BYTES, checkpoint.offset, checkpoint.numOps); default: diff --git a/core/src/main/java/org/elasticsearch/index/translog/TranslogToolCli.java b/core/src/main/java/org/elasticsearch/index/translog/TranslogToolCli.java new file mode 100644 index 00000000000..eaf50f25a01 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/index/translog/TranslogToolCli.java @@ -0,0 +1,56 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.translog; + +import org.elasticsearch.cli.MultiCommand; +import org.elasticsearch.cli.Terminal; +import org.elasticsearch.common.logging.LogConfigurator; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.env.Environment; +import org.elasticsearch.index.translog.TruncateTranslogCommand; +import org.elasticsearch.node.internal.InternalSettingsPreparer; + +/** + * Class encapsulating and dispatching commands from the {@code elasticsearch-translog} command line tool + */ +public class TranslogToolCli extends MultiCommand { + + public TranslogToolCli() { + super("A CLI tool for various Elasticsearch translog actions"); + subcommands.put("truncate", new TruncateTranslogCommand()); + } + + public static void main(String[] args) throws Exception { + // initialize default for es.logger.level because we will not read the logging.yml + String loggerLevel = System.getProperty("es.logger.level", "INFO"); + String pathHome = System.getProperty("es.path.home"); + // Set the appender for all potential log files to terminal so that other components that use the logger print out the + // same terminal. + Environment loggingEnvironment = InternalSettingsPreparer.prepareEnvironment(Settings.builder() + .put("path.home", pathHome) + .put("appender.terminal.type", "terminal") + .put("rootLogger", "${logger.level}, terminal") + .put("logger.level", loggerLevel) + .build(), Terminal.DEFAULT); + LogConfigurator.configure(loggingEnvironment.settings(), false); + + exit(new TranslogToolCli().main(args, Terminal.DEFAULT)); + } +} diff --git a/core/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java b/core/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java index 89d12983b07..bb4a84651c5 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java +++ b/core/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java @@ -76,10 +76,16 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable { return getHeaderLength(new BytesRef(translogUUID).length); } - private static int getHeaderLength(int uuidLength) { + static int getHeaderLength(int uuidLength) { return CodecUtil.headerLength(TRANSLOG_CODEC) + uuidLength + Integer.BYTES; } + static void writeHeader(OutputStreamDataOutput out, BytesRef ref) throws IOException { + CodecUtil.writeHeader(out, TRANSLOG_CODEC, VERSION); + out.writeInt(ref.length); + out.writeBytes(ref.bytes, ref.offset, ref.length); + } + public static TranslogWriter create(ShardId shardId, String translogUUID, long fileGeneration, Path file, ChannelFactory channelFactory, ByteSizeValue bufferSize) throws IOException { final BytesRef ref = new BytesRef(translogUUID); final int headerLength = getHeaderLength(ref.length); @@ -88,9 +94,7 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable { // This OutputStreamDataOutput is intentionally not closed because // closing it will close the FileChannel final OutputStreamDataOutput out = new OutputStreamDataOutput(java.nio.channels.Channels.newOutputStream(channel)); - CodecUtil.writeHeader(out, TRANSLOG_CODEC, VERSION); - out.writeInt(ref.length); - out.writeBytes(ref.bytes, ref.offset, ref.length); + writeHeader(out, ref); channel.force(true); writeCheckpoint(channelFactory, headerLength, 0, file.getParent(), fileGeneration); final TranslogWriter writer = new TranslogWriter(channelFactory, shardId, fileGeneration, channel, file, bufferSize); diff --git a/core/src/main/java/org/elasticsearch/index/translog/TruncateTranslogCommand.java b/core/src/main/java/org/elasticsearch/index/translog/TruncateTranslogCommand.java new file mode 100644 index 00000000000..b6b91f14ba8 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/index/translog/TruncateTranslogCommand.java @@ -0,0 +1,224 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.translog; + +import joptsimple.OptionParser; +import joptsimple.OptionSet; +import joptsimple.OptionSpec; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexCommit; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FSDirectory; +import org.apache.lucene.store.Lock; +import org.apache.lucene.store.LockObtainFailedException; +import org.apache.lucene.store.NativeFSLockFactory; +import org.apache.lucene.store.OutputStreamDataOutput; +import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.IOUtils; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.cli.SettingCommand; +import org.elasticsearch.cli.Terminal; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.SuppressForbidden; +import org.elasticsearch.common.io.PathUtils; +import org.elasticsearch.index.IndexNotFoundException; +import org.elasticsearch.index.translog.Checkpoint; + +import java.io.IOException; +import java.nio.channels.Channels; +import java.nio.channels.FileChannel; +import java.nio.file.DirectoryStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardCopyOption; +import java.nio.file.StandardOpenOption; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class TruncateTranslogCommand extends SettingCommand { + + private final OptionSpec translogFolder; + private final OptionSpec batchMode; + + public TruncateTranslogCommand() { + super("Truncates a translog to create a new, empty translog"); + this.translogFolder = parser.acceptsAll(Arrays.asList("d", "dir"), + "Translog Directory location on disk") + .withRequiredArg() + .required(); + this.batchMode = parser.acceptsAll(Arrays.asList("b", "batch"), + "Enable batch mode explicitly, automatic confirmation of warnings"); + } + + // Visible for testing + public OptionParser getParser() { + return this.parser; + } + + @Override + protected void printAdditionalHelp(Terminal terminal) { + terminal.println("This tool truncates the translog and translog"); + terminal.println("checkpoint files to create a new translog"); + } + + @SuppressForbidden(reason = "Necessary to use the path passed in") + private Path getTranslogPath(OptionSet options) { + return PathUtils.get(translogFolder.value(options), "", ""); + } + + @Override + protected void execute(Terminal terminal, OptionSet options, Map settings) throws Exception { + boolean batch = options.has(batchMode); + + Path translogPath = getTranslogPath(options); + Path idxLocation = translogPath.getParent().resolve("index"); + + if (Files.exists(translogPath) == false || Files.isDirectory(translogPath) == false) { + throw new ElasticsearchException("translog directory [" + translogPath + "], must exist and be a directory"); + } + + if (Files.exists(idxLocation) == false || Files.isDirectory(idxLocation) == false) { + throw new ElasticsearchException("unable to find a shard at [" + idxLocation + "], which must exist and be a directory"); + } + + // Hold the lock open for the duration of the tool running + try (Directory dir = FSDirectory.open(idxLocation, NativeFSLockFactory.INSTANCE); + Lock writeLock = dir.obtainLock(IndexWriter.WRITE_LOCK_NAME)) { + Set translogFiles; + try { + terminal.println("Checking existing translog files"); + translogFiles = filesInDirectory(translogPath); + } catch (IOException e) { + terminal.println("encountered IOException while listing directory, aborting..."); + throw new ElasticsearchException("failed to find existing translog files", e); + } + + // Warn about ES being stopped and files being deleted + warnAboutDeletingFiles(terminal, translogFiles, batch); + + List commits; + try { + terminal.println("Reading translog UUID information from Lucene commit from shard at [" + idxLocation + "]"); + commits = DirectoryReader.listCommits(dir); + } catch (IndexNotFoundException infe) { + throw new ElasticsearchException("unable to find a valid shard at [" + idxLocation + "]", infe); + } + + // Retrieve the generation and UUID from the existing data + Map commitData = commits.get(commits.size() - 1).getUserData(); + String translogGeneration = commitData.get(Translog.TRANSLOG_GENERATION_KEY); + String translogUUID = commitData.get(Translog.TRANSLOG_UUID_KEY); + if (translogGeneration == null || translogUUID == null) { + throw new ElasticsearchException("shard must have a valid translog generation and UUID but got: [{}] and: [{}]", + translogGeneration, translogUUID); + } + terminal.println("Translog Generation: " + translogGeneration); + terminal.println("Translog UUID : " + translogUUID); + + Path tempEmptyCheckpoint = translogPath.resolve("temp-" + Translog.CHECKPOINT_FILE_NAME); + Path realEmptyCheckpoint = translogPath.resolve(Translog.CHECKPOINT_FILE_NAME); + Path tempEmptyTranslog = translogPath.resolve("temp-" + Translog.TRANSLOG_FILE_PREFIX + + translogGeneration + Translog.TRANSLOG_FILE_SUFFIX); + Path realEmptyTranslog = translogPath.resolve(Translog.TRANSLOG_FILE_PREFIX + + translogGeneration + Translog.TRANSLOG_FILE_SUFFIX); + + // Write empty checkpoint and translog to empty files + long gen = Long.parseLong(translogGeneration); + int translogLen = writeEmptyTranslog(tempEmptyTranslog, translogUUID); + writeEmptyCheckpoint(tempEmptyCheckpoint, translogLen, gen); + + terminal.println("Removing existing translog files"); + IOUtils.rm(translogFiles.toArray(new Path[]{})); + + terminal.println("Creating new empty checkpoint at [" + realEmptyCheckpoint + "]"); + Files.move(tempEmptyCheckpoint, realEmptyCheckpoint, StandardCopyOption.ATOMIC_MOVE); + terminal.println("Creating new empty translog at [" + realEmptyTranslog + "]"); + Files.move(tempEmptyTranslog, realEmptyTranslog, StandardCopyOption.ATOMIC_MOVE); + + // Fsync the translog directory after rename + IOUtils.fsync(translogPath, true); + + } catch (LockObtainFailedException lofe) { + throw new ElasticsearchException("Failed to lock shard's directory at [" + idxLocation + "], is Elasticsearch still running?"); + } + + terminal.println("Done."); + } + + /** Write a checkpoint file to the given location with the given generation */ + public static void writeEmptyCheckpoint(Path filename, int translogLength, long translogGeneration) throws IOException { + try (FileChannel fc = FileChannel.open(filename, StandardOpenOption.WRITE, StandardOpenOption.READ, StandardOpenOption.CREATE_NEW); + OutputStreamDataOutput out = new OutputStreamDataOutput(Channels.newOutputStream(fc))) { + Checkpoint emptyCheckpoint = new Checkpoint(translogLength, 0, translogGeneration); + emptyCheckpoint.write(out); + fc.force(true); + } + } + + /** + * Write a translog containing the given translog UUID to the given location. Returns the number of bytes written. + */ + public static int writeEmptyTranslog(Path filename, String translogUUID) throws IOException { + final BytesRef translogRef = new BytesRef(translogUUID); + try (FileChannel fc = FileChannel.open(filename, StandardOpenOption.WRITE, StandardOpenOption.READ, StandardOpenOption.CREATE_NEW); + OutputStreamDataOutput out = new OutputStreamDataOutput(Channels.newOutputStream(fc))) { + TranslogWriter.writeHeader(out, translogRef); + fc.force(true); + } + return TranslogWriter.getHeaderLength(translogRef.length); + } + + /** Show a warning about deleting files, asking for a confirmation if {@code batchMode} is false */ + public static void warnAboutDeletingFiles(Terminal terminal, Set files, boolean batchMode) { + terminal.println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!"); + terminal.println("! WARNING: Elasticsearch MUST be stopped before running this tool !"); + terminal.println("! !"); + terminal.println("! WARNING: Documents inside of translog files will be lost !"); + terminal.println("! !"); + terminal.println("! WARNING: The following files will be DELETED! !"); + terminal.println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!"); + for (Path file : files) { + terminal.println("--> " + file); + } + terminal.println(""); + if (batchMode == false) { + String text = terminal.readText("Continue and DELETE files? [y/N] "); + if (!text.equalsIgnoreCase("y")) { + throw new ElasticsearchException("aborted by user"); + } + } + } + + /** Return a Set of all files in a given directory */ + public static Set filesInDirectory(Path directory) throws IOException { + Set files = new HashSet<>(); + try (DirectoryStream stream = Files.newDirectoryStream(directory)) { + for (Path file : stream) { + files.add(file); + } + } + return files; + } + +} diff --git a/core/src/test/java/org/elasticsearch/index/translog/TruncateTranslogIT.java b/core/src/test/java/org/elasticsearch/index/translog/TruncateTranslogIT.java new file mode 100644 index 00000000000..f6a28169898 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/index/translog/TruncateTranslogIT.java @@ -0,0 +1,236 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.translog; + +import com.carrotsearch.randomizedtesting.generators.RandomPicks; +import joptsimple.OptionParser; +import joptsimple.OptionSet; +import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse; +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.action.search.SearchPhaseExecutionException; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.cli.MockTerminal; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.routing.GroupShardsIterator; +import org.elasticsearch.cluster.routing.ShardIterator; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.common.Priority; +import org.elasticsearch.common.io.PathUtils; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.MockEngineFactoryPlugin; +import org.elasticsearch.index.translog.TruncateTranslogCommand; +import org.elasticsearch.monitor.fs.FsInfo; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.engine.MockEngineSupport; +import org.elasticsearch.test.hamcrest.ElasticsearchAssertions; +import org.elasticsearch.test.transport.MockTransportService; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.file.DirectoryStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Set; +import java.util.TreeSet; +import java.util.concurrent.TimeUnit; + +import static org.elasticsearch.common.util.CollectionUtils.iterableAsArrayList; +import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.notNullValue; + +@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, numDataNodes = 0) +public class TruncateTranslogIT extends ESIntegTestCase { + + @Override + protected Collection> nodePlugins() { + return pluginList(MockTransportService.TestPlugin.class, MockEngineFactoryPlugin.class); + } + + public void testCorruptTranslogTruncation() throws Exception { + internalCluster().startNodesAsync(1, Settings.EMPTY).get(); + + assertAcked(prepareCreate("test").setSettings(Settings.builder() + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", 0) + .put("index.refresh_interval", "-1") + .put(MockEngineSupport.DISABLE_FLUSH_ON_CLOSE.getKey(), true) // never flush - always recover from translog + )); + ensureYellow(); + + // Index some documents + int numDocs = scaledRandomIntBetween(100, 1000); + IndexRequestBuilder[] builders = new IndexRequestBuilder[numDocs]; + for (int i = 0; i < builders.length; i++) { + builders[i] = client().prepareIndex("test", "type").setSource("foo", "bar"); + } + disableTranslogFlush("test"); + indexRandom(false, false, false, Arrays.asList(builders)); + Set translogDirs = getTranslogDirs("test"); + + TruncateTranslogCommand ttc = new TruncateTranslogCommand(); + MockTerminal t = new MockTerminal(); + OptionParser parser = ttc.getParser(); + + for (Path translogDir : translogDirs) { + OptionSet options = parser.parse("-d", translogDir.toAbsolutePath().toString(), "-b"); + // Try running it before the shard is closed, it should flip out because it can't acquire the lock + try { + logger.info("--> running truncate while index is open on [{}]", translogDir.toAbsolutePath()); + ttc.execute(t, options, new HashMap()); + fail("expected the truncate command to fail not being able to acquire the lock"); + } catch (Exception e) { + assertThat(e.getMessage(), containsString("Failed to lock shard's directory")); + } + } + + // Corrupt the translog file(s) + logger.info("--> corrupting translog"); + corruptRandomTranslogFiles("test"); + + // Restart the single node + logger.info("--> restarting node"); + internalCluster().fullRestart(); + client().admin().cluster().prepareHealth().setWaitForYellowStatus() + .setTimeout(new TimeValue(1000, TimeUnit.MILLISECONDS)) + .setWaitForEvents(Priority.LANGUID) + .get(); + + try { + client().prepareSearch("test").setQuery(matchAllQuery()).get(); + fail("all shards should be failed due to a corrupted translog"); + } catch (SearchPhaseExecutionException e) { + // Good, all shards should be failed because there is only a + // single shard and its translog is corrupt + } + + // Close the index so we can actually truncate the translog + logger.info("--> closing 'test' index"); + client().admin().indices().prepareClose("test").get(); + + for (Path translogDir : translogDirs) { + OptionSet options = parser.parse("-d", translogDir.toAbsolutePath().toString(), "-b"); + logger.info("--> running truncate translog command for [{}]", translogDir.toAbsolutePath()); + ttc.execute(t, options, new HashMap()); + logger.info("--> output:\n{}", t.getOutput()); + } + + // Re-open index + logger.info("--> opening 'test' index"); + client().admin().indices().prepareOpen("test").get(); + ensureYellow("test"); + + // Run a search and make sure it succeeds + SearchResponse resp = client().prepareSearch("test").setQuery(matchAllQuery()).get(); + ElasticsearchAssertions.assertNoFailures(resp); + } + + private Set getTranslogDirs(String indexName) throws IOException { + ClusterState state = client().admin().cluster().prepareState().get().getState(); + GroupShardsIterator shardIterators = state.getRoutingTable().activePrimaryShardsGrouped(new String[]{indexName}, false); + final Index idx = state.metaData().index(indexName).getIndex(); + List iterators = iterableAsArrayList(shardIterators); + ShardIterator shardIterator = RandomPicks.randomFrom(random(), iterators); + ShardRouting shardRouting = shardIterator.nextOrNull(); + assertNotNull(shardRouting); + assertTrue(shardRouting.primary()); + assertTrue(shardRouting.assignedToNode()); + String nodeId = shardRouting.currentNodeId(); + NodesStatsResponse nodeStatses = client().admin().cluster().prepareNodesStats(nodeId).setFs(true).get(); + Set translogDirs = new TreeSet<>(); // treeset makes sure iteration order is deterministic + for (FsInfo.Path fsPath : nodeStatses.getNodes().get(0).getFs()) { + String path = fsPath.getPath(); + final String relativeDataLocationPath = "indices/"+ idx.getUUID() +"/" + Integer.toString(shardRouting.getId()) + "/translog"; + Path translogPath = PathUtils.get(path).resolve(relativeDataLocationPath); + if (Files.isDirectory(translogPath)) { + translogDirs.add(translogPath); + } + } + return translogDirs; + } + + private void corruptRandomTranslogFiles(String indexName) throws IOException { + Set translogDirs = getTranslogDirs(indexName); + Set files = new TreeSet<>(); // treeset makes sure iteration order is deterministic + for (Path translogDir : translogDirs) { + if (Files.isDirectory(translogDir)) { + logger.info("--> path: {}", translogDir); + try (DirectoryStream stream = Files.newDirectoryStream(translogDir)) { + for (Path item : stream) { + logger.info("--> File: {}", item); + if (Files.isRegularFile(item) && item.getFileName().toString().startsWith("translog-")) { + files.add(item); + } + } + } + } + } + Path fileToCorrupt = null; + if (!files.isEmpty()) { + int corruptions = randomIntBetween(5, 20); + for (int i = 0; i < corruptions; i++) { + fileToCorrupt = RandomPicks.randomFrom(random(), files); + try (FileChannel raf = FileChannel.open(fileToCorrupt, StandardOpenOption.READ, StandardOpenOption.WRITE)) { + // read + raf.position(randomIntBetween(0, (int) Math.min(Integer.MAX_VALUE, raf.size() - 1))); + long filePointer = raf.position(); + ByteBuffer bb = ByteBuffer.wrap(new byte[1]); + raf.read(bb); + bb.flip(); + + // corrupt + byte oldValue = bb.get(0); + byte newValue = (byte) (oldValue + 1); + bb.put(0, newValue); + + // rewrite + raf.position(filePointer); + raf.write(bb); + logger.info("--> corrupting file {} -- flipping at position {} from {} to {} file: {}", + fileToCorrupt, filePointer, Integer.toHexString(oldValue), + Integer.toHexString(newValue), fileToCorrupt); + } + } + } + assertThat("no file corrupted", fileToCorrupt, notNullValue()); + } + + /** Disables translog flushing for the specified index */ + private static void disableTranslogFlush(String index) { + Settings settings = Settings.builder() + .put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), new ByteSizeValue(1, ByteSizeUnit.PB)) + .build(); + client().admin().indices().prepareUpdateSettings(index).setSettings(settings).get(); + } + +} diff --git a/distribution/src/main/resources/bin/elasticsearch-translog b/distribution/src/main/resources/bin/elasticsearch-translog new file mode 100755 index 00000000000..47a48f02b47 --- /dev/null +++ b/distribution/src/main/resources/bin/elasticsearch-translog @@ -0,0 +1,90 @@ +#!/bin/bash + +CDPATH="" +SCRIPT="$0" + +# SCRIPT may be an arbitrarily deep series of symlinks. Loop until we have the concrete path. +while [ -h "$SCRIPT" ] ; do + ls=`ls -ld "$SCRIPT"` + # Drop everything prior to -> + link=`expr "$ls" : '.*-> \(.*\)$'` + if expr "$link" : '/.*' > /dev/null; then + SCRIPT="$link" + else + SCRIPT=`dirname "$SCRIPT"`/"$link" + fi +done + +# determine elasticsearch home +ES_HOME=`dirname "$SCRIPT"`/.. + +# make ELASTICSEARCH_HOME absolute +ES_HOME=`cd "$ES_HOME"; pwd` + + +# Sets the default values for elasticsearch variables used in this script +if [ -z "$CONF_DIR" ]; then + CONF_DIR="${path.conf}" +fi + +# The default env file is defined at building/packaging time. +# For a ${project.name} package, the value is "${path.env}". +ES_ENV_FILE="${path.env}" + +# If an include is specified with the ES_INCLUDE environment variable, use it +if [ -n "$ES_INCLUDE" ]; then + ES_ENV_FILE="$ES_INCLUDE" +fi + +# Source the environment file +if [ -n "$ES_ENV_FILE" ]; then + + # If the ES_ENV_FILE is not found, try to resolve the path + # against the ES_HOME directory + if [ ! -f "$ES_ENV_FILE" ]; then + ES_ENV_FILE="$ELASTIC_HOME/$ES_ENV_FILE" + fi + + . "$ES_ENV_FILE" + if [ $? -ne 0 ]; then + echo "Unable to source environment file: $ES_ENV_FILE" >&2 + exit 1 + fi +fi + +# don't let JAVA_TOOL_OPTIONS slip in (e.g. crazy agents in ubuntu) +# works around https://bugs.launchpad.net/ubuntu/+source/jayatana/+bug/1441487 +if [ "x$JAVA_TOOL_OPTIONS" != "x" ]; then + echo "Warning: Ignoring JAVA_TOOL_OPTIONS=$JAVA_TOOL_OPTIONS" + unset JAVA_TOOL_OPTIONS +fi + +# CONF_FILE setting was removed +if [ ! -z "$CONF_FILE" ]; then + echo "CONF_FILE setting is no longer supported. elasticsearch.yml must be placed in the config directory and cannot be renamed." + exit 1 +fi + +if [ -x "$JAVA_HOME/bin/java" ]; then + JAVA=$JAVA_HOME/bin/java +else + JAVA=`which java` +fi + +if [ ! -x "$JAVA" ]; then + echo "Could not find any executable java binary. Please install java in your PATH or set JAVA_HOME" + exit 1 +fi + +# full hostname passed through cut for portability on systems that do not support hostname -s +# export on separate line for shells that do not support combining definition and export +HOSTNAME=`hostname | cut -d. -f1` +export HOSTNAME + +declare -a args=("$@") + +if [ -e "$CONF_DIR" ]; then + args=("${args[@]}" -Edefault.path.conf="$CONF_DIR") +fi + +exec "$JAVA" $ES_JAVA_OPTS -Delasticsearch -Des.path.home="$ES_HOME" -cp "$ES_HOME/lib/*" org.elasticsearch.index.translog.TranslogToolCli "${args[@]}" diff --git a/distribution/src/main/resources/bin/elasticsearch-translog.bat b/distribution/src/main/resources/bin/elasticsearch-translog.bat new file mode 100644 index 00000000000..636a6665c03 --- /dev/null +++ b/distribution/src/main/resources/bin/elasticsearch-translog.bat @@ -0,0 +1,61 @@ +@echo off + +SETLOCAL enabledelayedexpansion + +IF DEFINED JAVA_HOME ( + set JAVA=%JAVA_HOME%\bin\java.exe +) ELSE ( + FOR %%I IN (java.exe) DO set JAVA=%%~$PATH:I +) +IF NOT EXIST "%JAVA%" ( + ECHO Could not find any executable java binary. Please install java in your PATH or set JAVA_HOME 1>&2 + EXIT /B 1 +) + +set SCRIPT_DIR=%~dp0 +for %%I in ("%SCRIPT_DIR%..") do set ES_HOME=%%~dpfI + +TITLE Elasticsearch Plugin Manager ${project.version} + +SET properties= +SET args= + +:loop +SET "current=%~1" +SHIFT +IF "x!current!" == "x" GOTO breakloop + +IF "!current:~0,2%!" == "-D" ( + ECHO "!current!" | FINDSTR /C:"=">nul && ( + :: current matches -D*=* + IF "x!properties!" NEQ "x" ( + SET properties=!properties! "!current!" + ) ELSE ( + SET properties="!current!" + ) + ) || ( + :: current matches -D* + IF "x!properties!" NEQ "x" ( + SET properties=!properties! "!current!=%~1" + ) ELSE ( + SET properties="!current!=%~1" + ) + SHIFT + ) +) ELSE ( + :: current matches * + IF "x!args!" NEQ "x" ( + SET args=!args! "!current!" + ) ELSE ( + SET args="!current!" + ) +) + +GOTO loop +:breakloop + +SET HOSTNAME=%COMPUTERNAME% + +"%JAVA%" %ES_JAVA_OPTS% -Des.path.home="%ES_HOME%" !properties! -cp "%ES_HOME%/lib/*;" "org.elasticsearch.index.translog.TranslogToolCli" !args! + +ENDLOCAL