Improve translog corruption detection (#42980)

Today we test for translog corruption by incrementing a byte by 1 somewhere in
a file, and verify that this leads to a `TranslogCorruptionException`.
However, we rely on _all_ corruptions leading to this exception in the
`RemoveCorruptedShardDataCommand`: this command fails if a translog file
corruption leads to a different kind of exception, and `EOFException` and
`NegativeArraySizeException` are both possible. This commit strengthens the
translog corruption detection tests by simulating the following:

- a random value is written
- the file is truncated

It also makes sure that we return a `TranslogCorruptionException` in all such
cases.

Fixes #42661
Backport of #42744
This commit is contained in:
David Turner 2019-06-07 20:28:02 +01:00 committed by GitHub
parent 479a1eeff6
commit 5bc0dfce94
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 130 additions and 96 deletions

View File

@ -148,7 +148,7 @@ public abstract class BaseTranslogReader implements Comparable<BaseTranslogReade
}
/**
* Reads a single opertation from the given location.
* Reads a single operation from the given location.
*/
Translog.Operation read(Translog.Location location) throws IOException {
assert location.generation == this.generation : "generation mismatch expected: " + generation + " got: " + location.generation;

View File

@ -30,6 +30,7 @@ import org.elasticsearch.common.io.Channels;
import org.elasticsearch.common.io.stream.InputStreamStreamInput;
import org.elasticsearch.common.io.stream.OutputStreamStreamOutput;
import java.io.EOFException;
import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
@ -108,56 +109,61 @@ final class TranslogHeader {
* Read a translog header from the given path and file channel
*/
static TranslogHeader read(final String translogUUID, final Path path, final FileChannel channel) throws IOException {
// This input is intentionally not closed because closing it will close the FileChannel.
final BufferedChecksumStreamInput in =
new BufferedChecksumStreamInput(
try {
// This input is intentionally not closed because closing it will close the FileChannel.
final BufferedChecksumStreamInput in =
new BufferedChecksumStreamInput(
new InputStreamStreamInput(java.nio.channels.Channels.newInputStream(channel), channel.size()),
path.toString());
final int version;
try {
version = CodecUtil.checkHeader(new InputStreamDataInput(in), TRANSLOG_CODEC, VERSION_CHECKSUMS, VERSION_PRIMARY_TERM);
} catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException e) {
tryReportOldVersionError(path, channel);
throw new TranslogCorruptedException(path.toString(), "translog header corrupted", e);
}
if (version == VERSION_CHECKSUMS) {
throw new IllegalStateException("pre-2.0 translog found [" + path + "]");
}
// Read the translogUUID
final int uuidLen = in.readInt();
if (uuidLen > channel.size()) {
throw new TranslogCorruptedException(
path.toString(),
"UUID length can't be larger than the translog");
}
final BytesRef uuid = new BytesRef(uuidLen);
uuid.length = uuidLen;
in.read(uuid.bytes, uuid.offset, uuid.length);
final BytesRef expectedUUID = new BytesRef(translogUUID);
if (uuid.bytesEquals(expectedUUID) == false) {
throw new TranslogCorruptedException(
final int version;
try {
version = CodecUtil.checkHeader(new InputStreamDataInput(in), TRANSLOG_CODEC, VERSION_CHECKSUMS, VERSION_PRIMARY_TERM);
} catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException e) {
tryReportOldVersionError(path, channel);
throw new TranslogCorruptedException(path.toString(), "translog header corrupted", e);
}
if (version == VERSION_CHECKSUMS) {
throw new IllegalStateException("pre-2.0 translog found [" + path + "]");
}
// Read the translogUUID
final int uuidLen = in.readInt();
if (uuidLen > channel.size()) {
throw new TranslogCorruptedException(path.toString(), "UUID length can't be larger than the translog");
}
if (uuidLen <= 0) {
throw new TranslogCorruptedException(path.toString(), "UUID length must be positive");
}
final BytesRef uuid = new BytesRef(uuidLen);
uuid.length = uuidLen;
in.read(uuid.bytes, uuid.offset, uuid.length);
final BytesRef expectedUUID = new BytesRef(translogUUID);
if (uuid.bytesEquals(expectedUUID) == false) {
throw new TranslogCorruptedException(
path.toString(),
"expected shard UUID " + expectedUUID + " but got: " + uuid +
" this translog file belongs to a different translog");
}
// Read the primary term
final long primaryTerm;
if (version == VERSION_PRIMARY_TERM) {
primaryTerm = in.readLong();
} else {
assert version == VERSION_CHECKPOINTS : "Unknown header version [" + version + "]";
primaryTerm = UNASSIGNED_PRIMARY_TERM;
}
// Verify the checksum
if (version >= VERSION_PRIMARY_TERM) {
Translog.verifyChecksum(in);
}
assert primaryTerm >= 0 : "Primary term must be non-negative [" + primaryTerm + "]; translog path [" + path + "]";
" this translog file belongs to a different translog");
}
// Read the primary term
final long primaryTerm;
if (version == VERSION_PRIMARY_TERM) {
primaryTerm = in.readLong();
} else {
assert version == VERSION_CHECKPOINTS : "Unknown header version [" + version + "]";
primaryTerm = UNASSIGNED_PRIMARY_TERM;
}
// Verify the checksum
if (version >= VERSION_PRIMARY_TERM) {
Translog.verifyChecksum(in);
}
assert primaryTerm >= 0 : "Primary term must be non-negative [" + primaryTerm + "]; translog path [" + path + "]";
final int headerSizeInBytes = headerSizeInBytes(version, uuid.length);
assert channel.position() == headerSizeInBytes :
"Header is not fully read; header size [" + headerSizeInBytes + "], position [" + channel.position() + "]";
return new TranslogHeader(translogUUID, primaryTerm, headerSizeInBytes);
final int headerSizeInBytes = headerSizeInBytes(version, uuid.length);
assert channel.position() == headerSizeInBytes :
"Header is not fully read; header size [" + headerSizeInBytes + "], position [" + channel.position() + "]";
return new TranslogHeader(translogUUID, primaryTerm, headerSizeInBytes);
} catch (EOFException e) {
throw new TranslogCorruptedException(path.toString(), "translog header truncated", e);
}
}
private static void tryReportOldVersionError(final Path path, final FileChannel channel) throws IOException {

View File

@ -76,7 +76,7 @@ final class TranslogSnapshot extends BaseTranslogReader {
return null;
}
protected Translog.Operation readOperation() throws IOException {
private Translog.Operation readOperation() throws IOException {
final int opSize = readSize(reusableBuffer, position);
reuse = checksummedStream(reusableBuffer, position, opSize, reuse);
Translog.Operation op = read(reuse);
@ -93,15 +93,19 @@ final class TranslogSnapshot extends BaseTranslogReader {
* reads an operation at the given position into the given buffer.
*/
protected void readBytes(ByteBuffer buffer, long position) throws IOException {
if (position >= length) {
throw new EOFException("read requested past EOF. pos [" + position + "] end: [" + length + "], generation: [" +
getGeneration() + "], path: [" + path + "]");
try {
if (position >= length) {
throw new EOFException("read requested past EOF. pos [" + position + "] end: [" + length + "], generation: [" +
getGeneration() + "], path: [" + path + "]");
}
if (position < getFirstOperationOffset()) {
throw new IOException("read requested before position of first ops. pos [" + position + "] first op on: [" +
getFirstOperationOffset() + "], generation: [" + getGeneration() + "], path: [" + path + "]");
}
Channels.readFromFileChannelWithEofException(channel, position, buffer);
} catch (EOFException e) {
throw new TranslogCorruptedException(path.toString(), "translog truncated", e);
}
if (position < getFirstOperationOffset()) {
throw new IOException("read requested before position of first ops. pos [" + position + "] first op on: [" +
getFirstOperationOffset() + "], generation: [" + getGeneration() + "], path: [" + path + "]");
}
Channels.readFromFileChannelWithEofException(channel, position, buffer);
}
@Override

View File

@ -168,7 +168,6 @@ public class TruncateTranslogAction {
private boolean isTranslogClean(ShardPath shardPath, String translogUUID) throws IOException {
// perform clean check of translog instead of corrupted marker file
boolean clean = true;
try {
final Path translogPath = shardPath.resolveTranslog();
final long translogGlobalCheckpoint = Translog.readGlobalCheckpoint(translogPath, translogUUID);
@ -184,18 +183,19 @@ public class TruncateTranslogAction {
try (Translog translog = new Translog(translogConfig, translogUUID,
translogDeletionPolicy, () -> translogGlobalCheckpoint, () -> primaryTerm);
Translog.Snapshot snapshot = translog.newSnapshot()) {
//noinspection StatementWithEmptyBody we are just checking that we can iterate through the whole snapshot
while (snapshot.next() != null) {
// just iterate over snapshot
}
}
return true;
} catch (TranslogCorruptedException e) {
clean = false;
return false;
}
return clean;
}
/** Write a checkpoint file to the given location with the given generation */
static void writeEmptyCheckpoint(Path filename, int translogLength, long translogGeneration, long globalCheckpoint) throws IOException {
private static void writeEmptyCheckpoint(Path filename, int translogLength, long translogGeneration, long globalCheckpoint)
throws IOException {
Checkpoint emptyCheckpoint = Checkpoint.emptyTranslogCheckpoint(translogLength, translogGeneration,
globalCheckpoint, translogGeneration);
Checkpoint.write(FileChannel::open, filename, emptyCheckpoint,
@ -234,7 +234,7 @@ public class TruncateTranslogAction {
}
/** Return a Set of all files in a given directory */
public static Set<Path> filesInDirectory(Path directory) throws IOException {
private static Set<Path> filesInDirectory(Path directory) throws IOException {
Set<Path> files = new TreeSet<>();
try (DirectoryStream<Path> stream = Files.newDirectoryStream(directory)) {
for (Path file : stream) {

View File

@ -52,6 +52,7 @@ import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collections;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@ -67,9 +68,7 @@ public class RemoveCorruptedShardDataCommandTests extends IndexShardTestCase {
private ShardId shardId;
private ShardRouting routing;
private Path dataDir;
private Environment environment;
private Settings settings;
private ShardPath shardPath;
private IndexMetaData indexMetaData;
private IndexShard indexShard;
@ -86,7 +85,7 @@ public class RemoveCorruptedShardDataCommandTests extends IndexShardTestCase {
routing = TestShardRouting.newShardRouting(shardId, nodeId, true, ShardRoutingState.INITIALIZING,
RecoverySource.EmptyStoreRecoverySource.INSTANCE);
dataDir = createTempDir();
final Path dataDir = createTempDir();
environment =
TestEnvironment.newEnvironment(Settings.builder()
@ -96,7 +95,7 @@ public class RemoveCorruptedShardDataCommandTests extends IndexShardTestCase {
// create same directory structure as prod does
final Path path = NodeEnvironment.resolveNodePath(dataDir, 0);
Files.createDirectories(path);
settings = Settings.builder()
final Settings settings = Settings.builder()
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(MergePolicyConfig.INDEX_MERGE_ENABLED, false)
@ -217,7 +216,7 @@ public class RemoveCorruptedShardDataCommandTests extends IndexShardTestCase {
// close shard
closeShards(indexShard);
TestTranslog.corruptRandomTranslogFile(logger, random(), Arrays.asList(translogPath));
TestTranslog.corruptRandomTranslogFile(logger, random(), Collections.singletonList(translogPath));
// test corrupted shard
final IndexShard corruptedShard = reopenIndexShard(true);
@ -283,7 +282,7 @@ public class RemoveCorruptedShardDataCommandTests extends IndexShardTestCase {
expectThrows(IndexShardRecoveryException.class, () -> newStartedShard(p -> corruptedShard, true));
closeShards(corruptedShard);
}
TestTranslog.corruptRandomTranslogFile(logger, random(), Arrays.asList(translogPath));
TestTranslog.corruptRandomTranslogFile(logger, random(), Collections.singletonList(translogPath));
final RemoveCorruptedShardDataCommand command = new RemoveCorruptedShardDataCommand();
final MockTerminal t = new MockTerminal();

View File

@ -47,6 +47,8 @@ import java.util.regex.Pattern;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.core.Is.is;
import static org.hamcrest.core.IsNot.not;
@ -54,7 +56,7 @@ import static org.hamcrest.core.IsNot.not;
* Helpers for testing translog.
*/
public class TestTranslog {
static final Pattern TRANSLOG_FILE_PATTERN = Pattern.compile("translog-(\\d+)\\.tlog");
private static final Pattern TRANSLOG_FILE_PATTERN = Pattern.compile("translog-(\\d+)\\.tlog");
public static void corruptRandomTranslogFile(Logger logger, Random random, Collection<Path> translogDirs) throws IOException {
for (Path translogDir : translogDirs) {
@ -65,12 +67,11 @@ public class TestTranslog {
/**
* Corrupts random translog file (translog-N.tlog) from the given translog directory.
*
* @return a translog file which has been corrupted.
*/
public static Path corruptRandomTranslogFile(Logger logger, Random random, Path translogDir, long minGeneration) throws IOException {
public static void corruptRandomTranslogFile(Logger logger, Random random, Path translogDir, long minGeneration)
throws IOException {
Set<Path> candidates = new TreeSet<>(); // TreeSet makes sure iteration order is deterministic
logger.info("--> Translog dir [{}], minUsedTranslogGen [{}]", translogDir, minGeneration);
logger.info("--> corruptRandomTranslogFile: translogDir [{}], minUsedTranslogGen [{}]", translogDir, minGeneration);
try (DirectoryStream<Path> stream = Files.newDirectoryStream(translogDir)) {
for (Path item : stream) {
if (Files.isRegularFile(item)) {
@ -81,41 +82,51 @@ public class TestTranslog {
}
}
}
assertThat(candidates, is(not(empty())));
assertThat("no translog files found in " + translogDir, candidates, is(not(empty())));
Path corruptedFile = RandomPicks.randomFrom(random, candidates);
corruptFile(logger, random, corruptedFile);
return corruptedFile;
}
static void corruptFile(Logger logger, Random random, Path fileToCorrupt) throws IOException {
final long fileSize = Files.size(fileToCorrupt);
assertThat("cannot corrupt empty file " + fileToCorrupt, fileSize, greaterThan(0L));
static void corruptFile(Logger logger, Random random, Path fileToCorrupt) throws IOException {
try (FileChannel raf = FileChannel.open(fileToCorrupt, StandardOpenOption.READ, StandardOpenOption.WRITE)) {
// read
raf.position(RandomNumbers.randomLongBetween(random, 0, raf.size() - 1));
long filePointer = raf.position();
ByteBuffer bb = ByteBuffer.wrap(new byte[1]);
raf.read(bb);
bb.flip();
try (FileChannel fileChannel = FileChannel.open(fileToCorrupt, StandardOpenOption.READ, StandardOpenOption.WRITE)) {
final long corruptPosition = RandomNumbers.randomLongBetween(random, 0, fileSize - 1);
// corrupt
byte oldValue = bb.get(0);
byte newValue = (byte) (oldValue + 1);
bb.put(0, newValue);
if (random.nextBoolean()) {
// read
fileChannel.position(corruptPosition);
assertThat(fileChannel.position(), equalTo(corruptPosition));
ByteBuffer bb = ByteBuffer.wrap(new byte[1]);
fileChannel.read(bb);
bb.flip();
// rewrite
raf.position(filePointer);
raf.write(bb);
logger.info("--> corrupting file {} -- flipping at position {} from {} to {} file: {}",
fileToCorrupt, filePointer, Integer.toHexString(oldValue),
Integer.toHexString(newValue), fileToCorrupt);
// corrupt
byte oldValue = bb.get(0);
byte newValue;
do {
newValue = (byte) random.nextInt(0x100);
} while (newValue == oldValue);
bb.put(0, newValue);
// rewrite
fileChannel.position(corruptPosition);
fileChannel.write(bb);
logger.info("--> corrupting file {} at position {} turning 0x{} into 0x{}", fileToCorrupt, corruptPosition,
Integer.toHexString(oldValue & 0xff), Integer.toHexString(newValue & 0xff));
} else {
logger.info("--> truncating file {} from length {} to length {}", fileToCorrupt, fileSize, corruptPosition);
fileChannel.truncate(corruptPosition);
}
}
}
/**
* Lists all existing commits in a given index path, then read the minimum translog generation that will be used in recoverFromTranslog.
*/
public static long minTranslogGenUsedInRecovery(Path translogPath) throws IOException {
private static long minTranslogGenUsedInRecovery(Path translogPath) throws IOException {
try (NIOFSDirectory directory = new NIOFSDirectory(translogPath.getParent().resolve("index"))) {
List<IndexCommit> commits = DirectoryReader.listCommits(directory);
final String translogUUID = commits.get(commits.size() - 1).getUserData().get(Translog.TRANSLOG_UUID_KEY);

View File

@ -34,6 +34,7 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.lessThan;
@ -62,12 +63,23 @@ public class TranslogHeaderTests extends ESTestCase {
});
assertThat(mismatchUUID.getMessage(), containsString("this translog file belongs to a different translog"));
int corruptions = between(1, 10);
for (int i = 0; i < corruptions; i++) {
for (int i = 0; i < corruptions && Files.size(translogFile) > 0; i++) {
TestTranslog.corruptFile(logger, random(), translogFile);
}
expectThrows(TranslogCorruptedException.class, () -> {
try (FileChannel channel = FileChannel.open(translogFile, StandardOpenOption.READ)) {
TranslogHeader.read(outHeader.getTranslogUUID(), translogFile, channel);
final TranslogHeader translogHeader = TranslogHeader.read(outHeader.getTranslogUUID(), translogFile, channel);
// succeeds if the corruption corrupted the version byte making this look like a v2 translog, because we don't check the
// checksum on this version
assertThat("version " + TranslogHeader.VERSION_CHECKPOINTS + " translog",
translogHeader.getPrimaryTerm(), equalTo(SequenceNumbers.UNASSIGNED_PRIMARY_TERM));
throw new TranslogCorruptedException(translogFile.toString(), "adjusted translog version");
} catch (IllegalStateException e) {
// corruption corrupted the version byte making this look like a v2, v1 or v0 translog
assertThat("version " + TranslogHeader.VERSION_CHECKPOINTS + "-or-earlier translog",
e.getMessage(), anyOf(containsString("pre-2.0 translog found"), containsString("pre-1.4 translog found"),
containsString("pre-6.3 translog found")));
throw new TranslogCorruptedException(translogFile.toString(), "adjusted translog version", e);
}
});
}

View File

@ -128,6 +128,7 @@ import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasToString;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.isIn;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
@ -882,7 +883,8 @@ public class TranslogTests extends ESTestCase {
for (int i = 0; i < locations.size(); i++) {
try {
assertNotNull(snap.next());
} catch (EOFException e) {
} catch (TranslogCorruptedException e) {
assertThat(e.getCause(), instanceOf(EOFException.class));
truncations.incrementAndGet();
}
}