Throw TranslogCorruptedException in more cases (#44217)

Today we do not throw a `TranslogCorruptedException` in certain cases of
translog corruption, such as for a corrupted checkpoint file or when an
expected file (either checkpoint or translog) is completely missing. This means
that `elasticsearch-shard` will not truncate the translog in those cases.

This commit strengthens the translog corruption tests to corrupt and/or delete
both translog and checkpoint files, and ensures that a
`TranslogCorruptedException` is thrown in all cases. It also sometimes
simulates a recovery after a crash while rolling the translog generation,
including cases where the rolled checkpoint contains incorrect data.

It also adjusts (and renames) `RemoveCorruptedShardDataCommandIT.getDirs()` to
return only a single path, since in practice this was the only thing that could
happen and yet we were relying on its callers to verify this and not all
callers were doing so.
This commit is contained in:
David Turner 2019-07-15 15:19:54 +01:00
parent 2382701547
commit e3d2af64c4
11 changed files with 244 additions and 243 deletions

View File

@ -491,8 +491,7 @@ public class RemoveCorruptedShardDataCommand extends EnvironmentAwareCommand {
: new AllocateEmptyPrimaryAllocationCommand(index, id, nodeId, false));
terminal.println("");
terminal.println("POST /_cluster/reroute'\n"
+ Strings.toString(commands, true, true) + "'");
terminal.println("POST /_cluster/reroute\n" + Strings.toString(commands, true, true));
terminal.println("");
terminal.println("You must accept the possibility of data loss by changing parameter `accept_data_loss` to `true`.");
terminal.println("");

View File

@ -20,6 +20,9 @@
package org.elasticsearch.index.translog;
import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.IndexFormatTooNewException;
import org.apache.lucene.index.IndexFormatTooOldException;
import org.apache.lucene.store.DataInput;
import org.apache.lucene.store.DataOutput;
import org.apache.lucene.store.Directory;
@ -33,6 +36,7 @@ import org.elasticsearch.index.seqno.SequenceNumbers;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.file.NoSuchFileException;
import java.nio.file.OpenOption;
import java.nio.file.Path;
@ -200,6 +204,8 @@ final class Checkpoint {
assert indexInput.length() == V3_FILE_SIZE : indexInput.length();
return Checkpoint.readCheckpointV6_4_0(indexInput);
}
} catch (CorruptIndexException | NoSuchFileException | IndexFormatTooOldException | IndexFormatTooNewException e) {
throw new TranslogCorruptedException(path.toString(), e);
}
}
}

View File

@ -220,7 +220,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
private ArrayList<TranslogReader> recoverFromFiles(Checkpoint checkpoint) throws IOException {
boolean success = false;
ArrayList<TranslogReader> foundTranslogs = new ArrayList<>();
try (ReleasableLock lock = writeLock.acquire()) {
try (ReleasableLock ignored = writeLock.acquire()) {
logger.debug("open uncommitted translog checkpoint {}", checkpoint);
final long minGenerationToRecoverFrom;
@ -233,22 +233,22 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
minGenerationToRecoverFrom = checkpoint.minTranslogGeneration;
}
final String checkpointTranslogFile = getFilename(checkpoint.generation);
// we open files in reverse order in order to validate tranlsog uuid before we start traversing the translog based on
// we open files in reverse order in order to validate the translog uuid before we start traversing the translog based on
// the generation id we found in the lucene commit. This gives for better error messages if the wrong
// translog was found.
foundTranslogs.add(openReader(location.resolve(checkpointTranslogFile), checkpoint));
for (long i = checkpoint.generation - 1; i >= minGenerationToRecoverFrom; i--) {
for (long i = checkpoint.generation; i >= minGenerationToRecoverFrom; i--) {
Path committedTranslogFile = location.resolve(getFilename(i));
if (Files.exists(committedTranslogFile) == false) {
throw new IllegalStateException("translog file doesn't exist with generation: " + i + " recovering from: " +
minGenerationToRecoverFrom + " checkpoint: " + checkpoint.generation + " - translog ids must be consecutive");
throw new TranslogCorruptedException(committedTranslogFile.toString(),
"translog file doesn't exist with generation: " + i + " recovering from: " + minGenerationToRecoverFrom
+ " checkpoint: " + checkpoint.generation + " - translog ids must be consecutive");
}
final TranslogReader reader = openReader(committedTranslogFile,
Checkpoint.read(location.resolve(getCommitCheckpointFileName(i))));
final Checkpoint readerCheckpoint = i == checkpoint.generation ? checkpoint
: Checkpoint.read(location.resolve(getCommitCheckpointFileName(i)));
final TranslogReader reader = openReader(committedTranslogFile, readerCheckpoint);
assert reader.getPrimaryTerm() <= primaryTermSupplier.getAsLong() :
"Primary terms go backwards; current term [" + primaryTermSupplier.getAsLong() + "]" +
"translog path [ " + committedTranslogFile + ", existing term [" + reader.getPrimaryTerm() + "]";
"Primary terms go backwards; current term [" + primaryTermSupplier.getAsLong() + "] translog path [ "
+ committedTranslogFile + ", existing term [" + reader.getPrimaryTerm() + "]";
foundTranslogs.add(reader);
logger.debug("recovered local translog from checkpoint {}", checkpoint);
}
@ -263,8 +263,9 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
if (Files.exists(commitCheckpoint)) {
Checkpoint checkpointFromDisk = Checkpoint.read(commitCheckpoint);
if (checkpoint.equals(checkpointFromDisk) == false) {
throw new IllegalStateException("Checkpoint file " + commitCheckpoint.getFileName() +
" already exists but has corrupted content expected: " + checkpoint + " but got: " + checkpointFromDisk);
throw new TranslogCorruptedException(commitCheckpoint.toString(),
"checkpoint file " + commitCheckpoint.getFileName() + " already exists but has corrupted content: expected "
+ checkpoint + " but got " + checkpointFromDisk);
}
} else {
copyCheckpointTo(commitCheckpoint);

View File

@ -52,7 +52,6 @@ import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collection;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
@ -174,7 +173,7 @@ public class AllocationIdIT extends ESIntegTestCase {
assertThat(indexHealthStatus, is(healthStatus));
}
private int indexDocs(String indexName, Object ... source) throws InterruptedException, ExecutionException {
private int indexDocs(String indexName, Object ... source) throws InterruptedException {
// index some docs in several segments
int numDocs = 0;
for (int k = 0, attempts = randomIntBetween(5, 10); k < attempts; k++) {
@ -192,9 +191,7 @@ public class AllocationIdIT extends ESIntegTestCase {
}
private Path getIndexPath(String nodeName, ShardId shardId) {
final Set<Path> indexDirs = RemoveCorruptedShardDataCommandIT.getDirs(nodeName, shardId, ShardPath.INDEX_FOLDER_NAME);
assertThat(indexDirs, hasSize(1));
return indexDirs.iterator().next();
return RemoveCorruptedShardDataCommandIT.getPathToShardData(nodeName, shardId, ShardPath.INDEX_FOLDER_NAME);
}
private Set<String> getAllocationIds(String indexName) {

View File

@ -2862,19 +2862,15 @@ public class InternalEngineTests extends EngineTestCase {
// test that we can force start the engine , even if the translog is missing.
engine.close();
// fake a new translog, causing the engine to point to a missing one.
final long primaryTerm = randomNonNegativeLong();
Translog translog = createTranslog(() -> primaryTerm);
final long newPrimaryTerm = randomLongBetween(0L, primaryTerm.get());
final Translog translog = createTranslog(() -> newPrimaryTerm);
long id = translog.currentFileGeneration();
translog.close();
IOUtils.rm(translog.location().resolve(Translog.getFilename(id)));
try {
engine = createEngine(store, primaryTranslogDir);
fail("engine shouldn't start without a valid translog id");
} catch (EngineCreationFailureException ex) {
// expected
}
expectThrows(EngineCreationFailureException.class, "engine shouldn't start without a valid translog id",
() -> createEngine(store, primaryTranslogDir));
// when a new translog is created it should be ok
final String translogUUID = Translog.createEmptyTranslog(primaryTranslogDir, UNASSIGNED_SEQ_NO, shardId, primaryTerm);
final String translogUUID = Translog.createEmptyTranslog(primaryTranslogDir, UNASSIGNED_SEQ_NO, shardId, newPrimaryTerm);
store.associateIndexWithNewTranslog(translogUUID);
EngineConfig config = config(defaultSettings, store, primaryTranslogDir, newMergePolicy(), null);
engine = new InternalEngine(config);

View File

@ -28,8 +28,8 @@ import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.store.Lock;
import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.store.NativeFSLockFactory;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.admin.cluster.allocation.ClusterAllocationExplanation;
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse;
@ -62,9 +62,9 @@ import org.elasticsearch.index.MergePolicyConfig;
import org.elasticsearch.index.MockEngineFactoryPlugin;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.translog.TestTranslog;
import org.elasticsearch.index.translog.TranslogCorruptedException;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.monitor.fs.FsInfo;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.CorruptionUtils;
import org.elasticsearch.test.ESIntegTestCase;
@ -82,9 +82,10 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import static org.elasticsearch.common.util.CollectionUtils.iterableAsArrayList;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
@ -95,7 +96,9 @@ import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.startsWith;
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
@ -156,8 +159,7 @@ public class RemoveCorruptedShardDataCommandIT extends ESIntegTestCase {
containsString("is Elasticsearch still running ?")));
}
final Set<Path> indexDirs = getDirs(indexName, ShardPath.INDEX_FOLDER_NAME);
assertThat(indexDirs, hasSize(1));
final Path indexDir = getPathToShardData(indexName, ShardPath.INDEX_FOLDER_NAME);
internalCluster().restartNode(node, new InternalTestCluster.RestartCallback() {
@Override
@ -170,7 +172,7 @@ public class RemoveCorruptedShardDataCommandIT extends ESIntegTestCase {
assertThat(e.getMessage(), startsWith("Shard does not seem to be corrupted at"));
}
CorruptionUtils.corruptIndex(random(), indexDirs.iterator().next(), false);
CorruptionUtils.corruptIndex(random(), indexDir, false);
return super.onNodeStopped(nodeName);
}
});
@ -266,13 +268,11 @@ public class RemoveCorruptedShardDataCommandIT extends ESIntegTestCase {
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
.put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "-1")
.put(MockEngineSupport.DISABLE_FLUSH_ON_CLOSE.getKey(), true) // never flush - always recover from translog
.put("index.routing.allocation.exclude._name", node2)
));
.put("index.routing.allocation.exclude._name", node2)));
ensureYellow();
assertAcked(client().admin().indices().prepareUpdateSettings(indexName).setSettings(Settings.builder()
.put("index.routing.allocation.exclude._name", (String)null)
));
.putNull("index.routing.allocation.exclude._name")));
ensureGreen();
// Index some documents
@ -294,7 +294,6 @@ public class RemoveCorruptedShardDataCommandIT extends ESIntegTestCase {
builders[i] = client().prepareIndex(indexName, "type").setSource("foo", "bar");
}
indexRandom(false, false, false, Arrays.asList(builders));
Set<Path> translogDirs = getDirs(indexName, ShardPath.TRANSLOG_FOLDER_NAME);
RemoveCorruptedShardDataCommand command = new RemoveCorruptedShardDataCommand();
MockTerminal terminal = new MockTerminal();
@ -314,60 +313,57 @@ public class RemoveCorruptedShardDataCommandIT extends ESIntegTestCase {
// shut down the replica node to be tested later
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(node2));
// Corrupt the translog file(s)
logger.info("--> corrupting translog");
corruptRandomTranslogFiles(indexName);
final Path translogDir = getPathToShardData(indexName, ShardPath.TRANSLOG_FOLDER_NAME);
final Path indexDir = getPathToShardData(indexName, ShardPath.INDEX_FOLDER_NAME);
// Restart the single node
logger.info("--> restarting node");
internalCluster().restartRandomDataNode();
internalCluster().restartRandomDataNode(new InternalTestCluster.RestartCallback() {
@Override
public Settings onNodeStopped(String nodeName) throws Exception {
logger.info("--> corrupting translog on node {}", nodeName);
TestTranslog.corruptRandomTranslogFile(logger, random(), translogDir);
return super.onNodeStopped(nodeName);
}
});
// all shards should be failed due to a corrupted translog
assertBusy(() -> {
final ClusterAllocationExplanation explanation =
client().admin().cluster().prepareAllocationExplain()
.setIndex(indexName).setShard(0).setPrimary(true)
.get().getExplanation();
final UnassignedInfo unassignedInfo = explanation.getUnassignedInfo();
final UnassignedInfo unassignedInfo = client().admin().cluster().prepareAllocationExplain()
.setIndex(indexName).setShard(0).setPrimary(true).get().getExplanation().getUnassignedInfo();
assertThat(unassignedInfo.getReason(), equalTo(UnassignedInfo.Reason.ALLOCATION_FAILED));
assertThat(ExceptionsHelper.unwrap(unassignedInfo.getFailure(), TranslogCorruptedException.class), not(nullValue()));
});
// have to shut down primary node - otherwise node lock is present
final InternalTestCluster.RestartCallback callback =
new InternalTestCluster.RestartCallback() {
@Override
public Settings onNodeStopped(String nodeName) throws Exception {
// and we can actually truncate the translog
for (Path translogDir : translogDirs) {
final Path idxLocation = translogDir.getParent().resolve(ShardPath.INDEX_FOLDER_NAME);
assertBusy(() -> {
logger.info("--> checking that lock has been released for {}", idxLocation);
try (Directory dir = FSDirectory.open(idxLocation, NativeFSLockFactory.INSTANCE);
Lock writeLock = dir.obtainLock(IndexWriter.WRITE_LOCK_NAME)) {
// Great, do nothing, we just wanted to obtain the lock
} catch (LockObtainFailedException lofe) {
logger.info("--> failed acquiring lock for {}", idxLocation);
fail("still waiting for lock release at [" + idxLocation + "]");
} catch (IOException ioe) {
fail("Got an IOException: " + ioe);
}
});
final Environment environment = TestEnvironment.newEnvironment(
Settings.builder().put(internalCluster().getDefaultSettings()).put(node1PathSettings).build());
terminal.addTextInput("y");
OptionSet options = parser.parse("-d", translogDir.toAbsolutePath().toString());
logger.info("--> running command for [{}]", translogDir.toAbsolutePath());
command.execute(terminal, options, environment);
logger.info("--> output:\n{}", terminal.getOutput());
internalCluster().restartNode(node1, new InternalTestCluster.RestartCallback() {
@Override
public Settings onNodeStopped(String nodeName) throws Exception {
assertBusy(() -> {
logger.info("--> checking that lock has been released for {}", indexDir);
//noinspection EmptyTryBlock since we're just trying to obtain the lock
try (Directory dir = FSDirectory.open(indexDir, NativeFSLockFactory.INSTANCE);
Lock ignored = dir.obtainLock(IndexWriter.WRITE_LOCK_NAME)) {
} catch (LockObtainFailedException lofe) {
logger.info("--> failed acquiring lock for {}", indexDir);
throw new AssertionError("still waiting for lock release at [" + indexDir + "]", lofe);
} catch (IOException ioe) {
throw new AssertionError("unexpected IOException [" + indexDir + "]", ioe);
}
});
return super.onNodeStopped(nodeName);
}
};
internalCluster().restartNode(node1, callback);
final Environment environment = TestEnvironment.newEnvironment(
Settings.builder().put(internalCluster().getDefaultSettings()).put(node1PathSettings).build());
terminal.addTextInput("y");
OptionSet options = parser.parse("-d", translogDir.toAbsolutePath().toString());
logger.info("--> running command for [{}]", translogDir.toAbsolutePath());
command.execute(terminal, options, environment);
logger.info("--> output:\n{}", terminal.getOutput());
return super.onNodeStopped(nodeName);
}
});
String primaryNodeId = null;
final ClusterState state = client().admin().cluster().prepareState().get().getState();
@ -477,7 +473,7 @@ public class RemoveCorruptedShardDataCommandIT extends ESIntegTestCase {
// sample the replica node translog dirs
final ShardId shardId = new ShardId(resolveIndex(indexName), 0);
final Set<Path> translogDirs = getDirs(node2, shardId, ShardPath.TRANSLOG_FOLDER_NAME);
final Path translogDir = getPathToShardData(node2, shardId, ShardPath.TRANSLOG_FOLDER_NAME);
final Settings node1PathSettings = internalCluster().dataPathSettings(node1);
final Settings node2PathSettings = internalCluster().dataPathSettings(node2);
@ -488,7 +484,7 @@ public class RemoveCorruptedShardDataCommandIT extends ESIntegTestCase {
// Corrupt the translog file(s) on the replica
logger.info("--> corrupting translog");
TestTranslog.corruptRandomTranslogFile(logger, random(), translogDirs);
TestTranslog.corruptRandomTranslogFile(logger, random(), translogDir);
// Start the node with the non-corrupted data path
logger.info("--> starting node");
@ -504,15 +500,13 @@ public class RemoveCorruptedShardDataCommandIT extends ESIntegTestCase {
final MockTerminal terminal = new MockTerminal();
final OptionParser parser = command.getParser();
for (Path translogDir : translogDirs) {
final Environment environment = TestEnvironment.newEnvironment(
Settings.builder().put(internalCluster().getDefaultSettings()).put(node2PathSettings).build());
terminal.addTextInput("y");
OptionSet options = parser.parse("-d", translogDir.toAbsolutePath().toString());
logger.info("--> running command for [{}]", translogDir.toAbsolutePath());
command.execute(terminal, options, environment);
logger.info("--> output:\n{}", terminal.getOutput());
}
final Environment environment = TestEnvironment.newEnvironment(
Settings.builder().put(internalCluster().getDefaultSettings()).put(node2PathSettings).build());
terminal.addTextInput("y");
OptionSet options = parser.parse("-d", translogDir.toAbsolutePath().toString());
logger.info("--> running command for [{}]", translogDir.toAbsolutePath());
command.execute(terminal, options, environment);
logger.info("--> output:\n{}", terminal.getOutput());
logger.info("--> starting the replica node to test recovery");
internalCluster().startNode(node2PathSettings);
@ -566,9 +560,7 @@ public class RemoveCorruptedShardDataCommandIT extends ESIntegTestCase {
final Map<String, Environment> environmentByNodeName = new HashMap<>();
for (String nodeName : nodeNames) {
final String nodeId = nodeNameToNodeId.get(nodeName);
final Set<Path> indexDirs = getDirs(nodeId, shardId, ShardPath.INDEX_FOLDER_NAME);
assertThat(indexDirs, hasSize(1));
indexPathByNodeName.put(nodeName, indexDirs.iterator().next());
indexPathByNodeName.put(nodeName, getPathToShardData(nodeId, shardId, ShardPath.INDEX_FOLDER_NAME));
final Environment environment = TestEnvironment.newEnvironment(
Settings.builder().put(internalCluster().getDefaultSettings()).put(internalCluster().dataPathSettings(nodeName)).build());
@ -586,7 +578,7 @@ public class RemoveCorruptedShardDataCommandIT extends ESIntegTestCase {
}
}
private Set<Path> getDirs(String indexName, String dirSuffix) {
private Path getPathToShardData(String indexName, String dirSuffix) {
ClusterState state = client().admin().cluster().prepareState().get().getState();
GroupShardsIterator shardIterators = state.getRoutingTable().activePrimaryShardsGrouped(new String[]{indexName}, false);
List<ShardIterator> iterators = iterableAsArrayList(shardIterators);
@ -597,30 +589,21 @@ public class RemoveCorruptedShardDataCommandIT extends ESIntegTestCase {
assertTrue(shardRouting.assignedToNode());
String nodeId = shardRouting.currentNodeId();
ShardId shardId = shardRouting.shardId();
return getDirs(nodeId, shardId, dirSuffix);
return getPathToShardData(nodeId, shardId, dirSuffix);
}
public static Set<Path> getDirs(String nodeId, ShardId shardId, String dirSuffix) {
final NodesStatsResponse nodeStatses = client().admin().cluster().prepareNodesStats(nodeId).setFs(true).get();
final Set<Path> translogDirs = new TreeSet<>();
final NodeStats nodeStats = nodeStatses.getNodes().get(0);
for (FsInfo.Path fsPath : nodeStats.getFs()) {
final String path = fsPath.getPath();
final Path p = PathUtils.get(path)
public static Path getPathToShardData(String nodeId, ShardId shardId, String shardPathSubdirectory) {
final NodesStatsResponse nodeStatsResponse = client().admin().cluster().prepareNodesStats(nodeId).setFs(true).get();
final Set<Path> paths = StreamSupport.stream(nodeStatsResponse.getNodes().get(0).getFs().spliterator(), false)
.map(nodePath -> PathUtils.get(nodePath.getPath())
.resolve(NodeEnvironment.INDICES_FOLDER)
.resolve(shardId.getIndex().getUUID())
.resolve(Integer.toString(shardId.getId()))
.resolve(dirSuffix);
if (Files.isDirectory(p)) {
translogDirs.add(p);
}
}
return translogDirs;
}
private void corruptRandomTranslogFiles(String indexName) throws IOException {
Set<Path> translogDirs = getDirs(indexName, ShardPath.TRANSLOG_FOLDER_NAME);
TestTranslog.corruptRandomTranslogFile(logger, random(), translogDirs);
.resolve(shardPathSubdirectory))
.filter(Files::isDirectory)
.collect(Collectors.toSet());
assertThat(paths, hasSize(1));
return paths.iterator().next();
}
/** Disables translog flushing for the specified index */

View File

@ -52,7 +52,6 @@ import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collections;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@ -216,7 +215,7 @@ public class RemoveCorruptedShardDataCommandTests extends IndexShardTestCase {
// close shard
closeShards(indexShard);
TestTranslog.corruptRandomTranslogFile(logger, random(), Collections.singletonList(translogPath));
TestTranslog.corruptRandomTranslogFile(logger, random(), translogPath);
// test corrupted shard
final IndexShard corruptedShard = reopenIndexShard(true);
@ -282,7 +281,7 @@ public class RemoveCorruptedShardDataCommandTests extends IndexShardTestCase {
expectThrows(IndexShardRecoveryException.class, () -> newStartedShard(p -> corruptedShard, true));
closeShards(corruptedShard);
}
TestTranslog.corruptRandomTranslogFile(logger, random(), Collections.singletonList(translogPath));
TestTranslog.corruptRandomTranslogFile(logger, random(), translogPath);
final RemoveCorruptedShardDataCommand command = new RemoveCorruptedShardDataCommand();
final MockTerminal t = new MockTerminal();

View File

@ -19,48 +19,39 @@
package org.elasticsearch.index.store;
import com.carrotsearch.randomizedtesting.generators.RandomPicks;
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.admin.cluster.allocation.ClusterAllocationExplainResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.io.PathUtils;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.MockEngineFactoryPlugin;
import org.elasticsearch.index.translog.TestTranslog;
import org.elasticsearch.monitor.fs.FsInfo;
import org.elasticsearch.index.translog.TranslogCorruptedException;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.engine.MockEngineSupport;
import org.elasticsearch.test.transport.MockTransportService;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.common.util.CollectionUtils.iterableAsArrayList;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;
/**
* Integration test for corrupted translog files
*/
@ESIntegTestCase.ClusterScope(scope= ESIntegTestCase.Scope.SUITE, numDataNodes = 0)
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, numDataNodes = 0)
public class CorruptedTranslogIT extends ESIntegTestCase {
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
@ -68,78 +59,46 @@ public class CorruptedTranslogIT extends ESIntegTestCase {
}
public void testCorruptTranslogFiles() throws Exception {
internalCluster().startNodes(1, Settings.EMPTY);
internalCluster().startNode(Settings.EMPTY);
assertAcked(prepareCreate("test").setSettings(Settings.builder()
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", 0)
.put("index.refresh_interval", "-1")
.put(MockEngineSupport.DISABLE_FLUSH_ON_CLOSE.getKey(), true) // never flush - always recover from translog
));
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", 0)
.put("index.refresh_interval", "-1")
.put(MockEngineSupport.DISABLE_FLUSH_ON_CLOSE.getKey(), true) // never flush - always recover from translog
.put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), new ByteSizeValue(1, ByteSizeUnit.PB))));
// Index some documents
int numDocs = scaledRandomIntBetween(100, 1000);
IndexRequestBuilder[] builders = new IndexRequestBuilder[numDocs];
IndexRequestBuilder[] builders = new IndexRequestBuilder[scaledRandomIntBetween(100, 1000)];
for (int i = 0; i < builders.length; i++) {
builders[i] = client().prepareIndex("test", "type").setSource("foo", "bar");
}
disableTranslogFlush("test");
indexRandom(false, false, false, Arrays.asList(builders)); // this one
// Corrupt the translog file(s)
corruptRandomTranslogFile();
indexRandom(false, false, false, Arrays.asList(builders));
// Restart the single node
internalCluster().fullRestart();
client().admin().cluster().prepareHealth().setWaitForYellowStatus().
setTimeout(new TimeValue(1000, TimeUnit.MILLISECONDS)).setWaitForEvents(Priority.LANGUID).get();
final Path translogPath = internalCluster().getInstance(IndicesService.class)
.indexService(resolveIndex("test")).getShard(0).shardPath().resolveTranslog();
try {
client().prepareSearch("test").setQuery(matchAllQuery()).get();
fail("all shards should be failed due to a corrupted translog");
} catch (SearchPhaseExecutionException e) {
// Good, all shards should be failed because there is only a
// single shard and its translog is corrupt
}
}
private void corruptRandomTranslogFile() throws IOException {
ClusterState state = client().admin().cluster().prepareState().get().getState();
GroupShardsIterator shardIterators = state.getRoutingTable().activePrimaryShardsGrouped(new String[]{"test"}, false);
final Index test = state.metaData().index("test").getIndex();
List<ShardIterator> iterators = iterableAsArrayList(shardIterators);
ShardIterator shardIterator = RandomPicks.randomFrom(random(), iterators);
ShardRouting shardRouting = shardIterator.nextOrNull();
assertNotNull(shardRouting);
assertTrue(shardRouting.primary());
assertTrue(shardRouting.assignedToNode());
String nodeId = shardRouting.currentNodeId();
NodesStatsResponse nodeStatses = client().admin().cluster().prepareNodesStats(nodeId).setFs(true).get();
Set<Path> translogDirs = new HashSet<>();
for (FsInfo.Path fsPath : nodeStatses.getNodes().get(0).getFs()) {
String path = fsPath.getPath();
String relativeDataLocationPath = "indices/" + test.getUUID() + "/" + Integer.toString(shardRouting.getId()) + "/translog";
Path translogDir = PathUtils.get(path).resolve(relativeDataLocationPath);
if (Files.isDirectory(translogDir)) {
translogDirs.add(translogDir);
internalCluster().fullRestart(new InternalTestCluster.RestartCallback(){
@Override
public void onAllNodesStopped() throws Exception {
TestTranslog.corruptRandomTranslogFile(logger, random(), translogPath);
}
}
Path translogDir = RandomPicks.randomFrom(random(), translogDirs);
TestTranslog.corruptRandomTranslogFile(logger, random(), Arrays.asList(translogDir));
});
assertBusy(() -> {
final ClusterAllocationExplainResponse allocationExplainResponse
= client().admin().cluster().prepareAllocationExplain().setIndex("test").setShard(0).setPrimary(true).get();
final UnassignedInfo unassignedInfo = allocationExplainResponse.getExplanation().getUnassignedInfo();
assertThat(unassignedInfo, not(nullValue()));
final Throwable cause = ExceptionsHelper.unwrap(unassignedInfo.getFailure(), TranslogCorruptedException.class);
assertThat(cause, not(nullValue()));
assertThat(cause.getMessage(), containsString(translogPath.toString()));
});
assertThat(expectThrows(SearchPhaseExecutionException.class, () -> client().prepareSearch("test").setQuery(matchAllQuery()).get())
.getMessage(), containsString("all shards failed"));
}
/** Disables translog flushing for the specified index */
private static void disableTranslogFlush(String index) {
Settings settings = Settings.builder()
.put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), new ByteSizeValue(1, ByteSizeUnit.PB)).build();
client().admin().indices().prepareUpdateSettings(index).setSettings(settings).get();
}
/** Enables translog flushing for the specified index */
private static void enableTranslogFlush(String index) {
Settings settings = Settings.builder()
.put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), new ByteSizeValue(512, ByteSizeUnit.MB)).build();
client().admin().indices().prepareUpdateSettings(index).setSettings(settings).get();
}
}

View File

@ -25,6 +25,8 @@ import org.apache.logging.log4j.Logger;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.store.NIOFSDirectory;
import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.index.engine.CombinedDeletionPolicy;
import java.io.IOException;
@ -35,7 +37,6 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
@ -45,6 +46,7 @@ import java.util.TreeSet;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import static org.elasticsearch.index.translog.Translog.CHECKPOINT_FILE_NAME;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
@ -56,41 +58,94 @@ import static org.hamcrest.core.IsNot.not;
* Helpers for testing translog.
*/
public class TestTranslog {
private static final Pattern TRANSLOG_FILE_PATTERN = Pattern.compile("translog-(\\d+)\\.tlog");
private static final Pattern TRANSLOG_FILE_PATTERN = Pattern.compile("^translog-(\\d+)\\.(tlog|ckp)$");
public static void corruptRandomTranslogFile(Logger logger, Random random, Collection<Path> translogDirs) throws IOException {
for (Path translogDir : translogDirs) {
final long minTranslogGen = minTranslogGenUsedInRecovery(translogDir);
corruptRandomTranslogFile(logger, random, translogDir, minTranslogGen);
}
/**
* Corrupts random translog file (translog-N.tlog or translog-N.ckp or translog.ckp) from the given translog directory, ignoring
* translogs and checkpoints with generations below the generation recorded in the latest index commit found in translogDir/../index/,
* or writes a corrupted translog-N.ckp file as if from a crash while rolling a generation.
*
* <p>
* See {@link TestTranslog#corruptFile(Logger, Random, Path, boolean)} for details of the corruption applied.
*/
public static void corruptRandomTranslogFile(Logger logger, Random random, Path translogDir) throws IOException {
corruptRandomTranslogFile(logger, random, translogDir, minTranslogGenUsedInRecovery(translogDir));
}
/**
* Corrupts random translog file (translog-N.tlog) from the given translog directory.
* Corrupts random translog file (translog-N.tlog or translog-N.ckp or translog.ckp) from the given translog directory, or writes a
* corrupted translog-N.ckp file as if from a crash while rolling a generation.
* <p>
* See {@link TestTranslog#corruptFile(Logger, Random, Path, boolean)} for details of the corruption applied.
*
* @param minGeneration the minimum generation (N) to corrupt. Translogs and checkpoints with lower generation numbers are ignored.
*/
public static void corruptRandomTranslogFile(Logger logger, Random random, Path translogDir, long minGeneration)
throws IOException {
Set<Path> candidates = new TreeSet<>(); // TreeSet makes sure iteration order is deterministic
static void corruptRandomTranslogFile(Logger logger, Random random, Path translogDir, long minGeneration) throws IOException {
logger.info("--> corruptRandomTranslogFile: translogDir [{}], minUsedTranslogGen [{}]", translogDir, minGeneration);
Path unnecessaryCheckpointCopyPath = null;
try {
final Path checkpointPath = translogDir.resolve(CHECKPOINT_FILE_NAME);
final Checkpoint checkpoint = Checkpoint.read(checkpointPath);
unnecessaryCheckpointCopyPath = translogDir.resolve(Translog.getCommitCheckpointFileName(checkpoint.generation));
if (LuceneTestCase.rarely(random) && Files.exists(unnecessaryCheckpointCopyPath) == false) {
// if we crashed while rolling a generation then we might have copied `translog.ckp` to its numbered generation file but
// have not yet written a new `translog.ckp`. During recovery we must also verify that this file is intact, so it's ok to
// corrupt this file too (either by writing the wrong information, correctly formatted, or by properly corrupting it)
final Checkpoint checkpointCopy = LuceneTestCase.usually(random) ? checkpoint
: new Checkpoint(checkpoint.offset + random.nextInt(2), checkpoint.numOps + random.nextInt(2),
checkpoint.generation + random.nextInt(2), checkpoint.minSeqNo + random.nextInt(2),
checkpoint.maxSeqNo + random.nextInt(2), checkpoint.globalCheckpoint + random.nextInt(2),
checkpoint.minTranslogGeneration + random.nextInt(2), checkpoint.trimmedAboveSeqNo + random.nextInt(2));
Checkpoint.write(FileChannel::open, unnecessaryCheckpointCopyPath, checkpointCopy,
StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW);
if (checkpointCopy.equals(checkpoint) == false) {
logger.info("corruptRandomTranslogFile: created [{}] containing [{}] instead of [{}]", unnecessaryCheckpointCopyPath,
checkpointCopy, checkpoint);
return;
} // else checkpoint copy has the correct content so it's now a candidate for the usual kinds of corruption
}
} catch (TranslogCorruptedException e) {
// missing or corrupt checkpoint already, find something else to break...
}
Set<Path> candidates = new TreeSet<>(); // TreeSet makes sure iteration order is deterministic
try (DirectoryStream<Path> stream = Files.newDirectoryStream(translogDir)) {
for (Path item : stream) {
if (Files.isRegularFile(item)) {
final Matcher matcher = TRANSLOG_FILE_PATTERN.matcher(item.getFileName().toString());
if (matcher.matches() && Long.parseLong(matcher.group(1)) >= minGeneration) {
if (Files.isRegularFile(item) && Files.size(item) > 0) {
final String filename = item.getFileName().toString();
final Matcher matcher = TRANSLOG_FILE_PATTERN.matcher(filename);
if (filename.equals("translog.ckp") || (matcher.matches() && Long.parseLong(matcher.group(1)) >= minGeneration)) {
candidates.add(item);
}
}
}
}
assertThat("no translog files found in " + translogDir, candidates, is(not(empty())));
assertThat("no corruption candidates found in " + translogDir, candidates, is(not(empty())));
Path corruptedFile = RandomPicks.randomFrom(random, candidates);
corruptFile(logger, random, corruptedFile);
final Path fileToCorrupt = RandomPicks.randomFrom(random, candidates);
// deleting the unnecessary checkpoint file doesn't count as a corruption
final boolean maybeDelete = fileToCorrupt.equals(unnecessaryCheckpointCopyPath) == false;
corruptFile(logger, random, fileToCorrupt, maybeDelete);
}
static void corruptFile(Logger logger, Random random, Path fileToCorrupt) throws IOException {
/**
* Corrupt an (existing and nonempty) file by replacing any byte in the file with a random (different) byte, or by truncating the file
* to a random (strictly shorter) length, or by deleting the file.
*/
static void corruptFile(Logger logger, Random random, Path fileToCorrupt, boolean maybeDelete) throws IOException {
assertThat(fileToCorrupt + " should be a regular file", Files.isRegularFile(fileToCorrupt));
final long fileSize = Files.size(fileToCorrupt);
assertThat("cannot corrupt empty file " + fileToCorrupt, fileSize, greaterThan(0L));
assertThat(fileToCorrupt + " should not be an empty file", fileSize, greaterThan(0L));
if (maybeDelete && random.nextBoolean() && random.nextBoolean()) {
logger.info("corruptFile: deleting file {}", fileToCorrupt);
IOUtils.rm(fileToCorrupt);
return;
}
try (FileChannel fileChannel = FileChannel.open(fileToCorrupt, StandardOpenOption.READ, StandardOpenOption.WRITE)) {
final long corruptPosition = RandomNumbers.randomLongBetween(random, 0, fileSize - 1);
@ -114,10 +169,10 @@ public class TestTranslog {
// rewrite
fileChannel.position(corruptPosition);
fileChannel.write(bb);
logger.info("--> corrupting file {} at position {} turning 0x{} into 0x{}", fileToCorrupt, corruptPosition,
logger.info("corruptFile: corrupting file {} at position {} turning 0x{} into 0x{}", fileToCorrupt, corruptPosition,
Integer.toHexString(oldValue & 0xff), Integer.toHexString(newValue & 0xff));
} else {
logger.info("--> truncating file {} from length {} to length {}", fileToCorrupt, fileSize, corruptPosition);
logger.info("corruptFile: truncating file {} from length {} to length {}", fileToCorrupt, fileSize, corruptPosition);
fileChannel.truncate(corruptPosition);
}
}

View File

@ -64,7 +64,7 @@ public class TranslogHeaderTests extends ESTestCase {
assertThat(mismatchUUID.getMessage(), containsString("this translog file belongs to a different translog"));
int corruptions = between(1, 10);
for (int i = 0; i < corruptions && Files.size(translogFile) > 0; i++) {
TestTranslog.corruptFile(logger, random(), translogFile);
TestTranslog.corruptFile(logger, random(), translogFile, false);
}
expectThrows(TranslogCorruptedException.class, () -> {
try (FileChannel channel = FileChannel.open(translogFile, StandardOpenOption.READ)) {

View File

@ -20,7 +20,6 @@
package org.elasticsearch.index.translog;
import com.carrotsearch.randomizedtesting.generators.RandomPicks;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.document.Field;
@ -84,6 +83,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.nio.file.CopyOption;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.Files;
@ -125,6 +125,7 @@ import static org.elasticsearch.index.translog.SnapshotMatchers.containsOperatio
import static org.elasticsearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.endsWith;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
@ -149,6 +150,12 @@ public class TranslogTests extends ESTestCase {
// A default primary term is used by translog instances created in this test.
private final AtomicLong primaryTerm = new AtomicLong();
private final AtomicReference<LongConsumer> persistedSeqNoConsumer = new AtomicReference<>();
private boolean expectIntactTranslog;
@Before
public void expectIntactTranslogByDefault() {
expectIntactTranslog = true;
}
@Override
protected void afterIfSuccessful() throws Exception {
@ -162,7 +169,9 @@ public class TranslogTests extends ESTestCase {
}
translog.close();
}
assertFileIsPresent(translog, translog.currentFileGeneration());
if (expectIntactTranslog) {
assertFileIsPresent(translog, translog.currentFileGeneration());
}
IOUtils.rm(translog.location()); // delete all the locations
}
@ -850,30 +859,31 @@ public class TranslogTests extends ESTestCase {
String uuid = translog.getTranslogUUID();
List<Translog.Location> locations = new ArrayList<>();
int translogOperations = randomIntBetween(10, 100);
int translogOperations = randomIntBetween(10, 1000);
for (int op = 0; op < translogOperations; op++) {
String ascii = randomAlphaOfLengthBetween(1, 50);
locations.add(
translog.add(new Translog.Index("test", "" + op, op, primaryTerm.get(), ascii.getBytes("UTF-8")))
);
if (rarely()) {
translog.rollGeneration();
}
}
translog.close();
TestTranslog.corruptRandomTranslogFile(logger, random(), translogDir, 0);
int corruptionsCaught = 0;
try (Translog translog = openTranslog(config, uuid)) {
try (Translog.Snapshot snapshot = translog.newSnapshot()) {
assertThat(expectThrows(TranslogCorruptedException.class, () -> {
try (Translog translog = openTranslog(config, uuid);
Translog.Snapshot snapshot = translog.newSnapshot()) {
for (int i = 0; i < locations.size(); i++) {
snapshot.next();
}
}
} catch (TranslogCorruptedException e) {
assertThat(e.getMessage(), containsString(translogDir.toString()));
corruptionsCaught++;
}
}).getMessage(), containsString(translogDir.toString()));
assertThat("corruption is caught", corruptionsCaught, greaterThanOrEqualTo(1));
expectIntactTranslog = false;
}
public void testTruncatedTranslogs() throws Exception {
@ -1561,14 +1571,12 @@ public class TranslogTests extends ESTestCase {
}
public void testRecoveryUncommittedCorruptedCheckpoint() throws IOException {
List<Translog.Location> locations = new ArrayList<>();
int translogOperations = 100;
final int prepareOp = 44;
Translog.TranslogGeneration translogGeneration = null;
final boolean sync = randomBoolean();
for (int op = 0; op < translogOperations; op++) {
locations.add(translog.add(new Translog.Index("test", "" + op, op,
primaryTerm.get(), Integer.toString(op).getBytes(Charset.forName("UTF-8")))));
translog.add(new Translog.Index("test", "" + op, op, primaryTerm.get(), Integer.toString(op).getBytes(StandardCharsets.UTF_8)));
if (op == prepareOp) {
translogGeneration = translog.getGeneration();
translog.rollGeneration();
@ -1589,15 +1597,13 @@ public class TranslogTests extends ESTestCase {
corrupted, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW);
final String translogUUID = translog.getTranslogUUID();
final TranslogDeletionPolicy deletionPolicy = translog.getDeletionPolicy();
try (Translog ignored = new Translog(config, translogUUID, deletionPolicy,
() -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get, seqNo -> {})) {
fail("corrupted");
} catch (IllegalStateException ex) {
assertEquals("Checkpoint file translog-3.ckp already exists but has corrupted content expected: Checkpoint{offset=3025, " +
final TranslogCorruptedException translogCorruptedException = expectThrows(TranslogCorruptedException.class, () ->
new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get, seqNo -> { }));
assertThat(translogCorruptedException.getMessage(), endsWith(
"] is corrupted, checkpoint file translog-3.ckp already exists but has corrupted content: expected Checkpoint{offset=3025, " +
"numOps=55, generation=3, minSeqNo=45, maxSeqNo=99, globalCheckpoint=-1, minTranslogGeneration=1, trimmedAboveSeqNo=-2} " +
"but got: Checkpoint{offset=0, numOps=0, generation=0, minSeqNo=-1, maxSeqNo=-1, globalCheckpoint=-1, " +
"minTranslogGeneration=0, trimmedAboveSeqNo=-2}", ex.getMessage());
}
"but got Checkpoint{offset=0, numOps=0, generation=0, minSeqNo=-1, maxSeqNo=-1, globalCheckpoint=-1, " +
"minTranslogGeneration=0, trimmedAboveSeqNo=-2}"));
Checkpoint.write(FileChannel::open, config.getTranslogPath().resolve(Translog.getCommitCheckpointFileName(read.generation)),
read, StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING);
try (Translog translog = new Translog(config, translogUUID, deletionPolicy,