diff --git a/server/src/main/java/org/elasticsearch/index/translog/BaseTranslogReader.java b/server/src/main/java/org/elasticsearch/index/translog/BaseTranslogReader.java index bfd9e31abcd..8f5884d0275 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/BaseTranslogReader.java +++ b/server/src/main/java/org/elasticsearch/index/translog/BaseTranslogReader.java @@ -148,7 +148,7 @@ public abstract class BaseTranslogReader implements Comparable channel.size()) { - throw new TranslogCorruptedException( - path.toString(), - "UUID length can't be larger than the translog"); - } - final BytesRef uuid = new BytesRef(uuidLen); - uuid.length = uuidLen; - in.read(uuid.bytes, uuid.offset, uuid.length); - final BytesRef expectedUUID = new BytesRef(translogUUID); - if (uuid.bytesEquals(expectedUUID) == false) { - throw new TranslogCorruptedException( + final int version; + try { + version = CodecUtil.checkHeader(new InputStreamDataInput(in), TRANSLOG_CODEC, VERSION_CHECKSUMS, VERSION_PRIMARY_TERM); + } catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException e) { + tryReportOldVersionError(path, channel); + throw new TranslogCorruptedException(path.toString(), "translog header corrupted", e); + } + if (version == VERSION_CHECKSUMS) { + throw new IllegalStateException("pre-2.0 translog found [" + path + "]"); + } + // Read the translogUUID + final int uuidLen = in.readInt(); + if (uuidLen > channel.size()) { + throw new TranslogCorruptedException(path.toString(), "UUID length can't be larger than the translog"); + } + if (uuidLen <= 0) { + throw new TranslogCorruptedException(path.toString(), "UUID length must be positive"); + } + final BytesRef uuid = new BytesRef(uuidLen); + uuid.length = uuidLen; + in.read(uuid.bytes, uuid.offset, uuid.length); + final BytesRef expectedUUID = new BytesRef(translogUUID); + if (uuid.bytesEquals(expectedUUID) == false) { + throw new TranslogCorruptedException( path.toString(), "expected shard UUID " + expectedUUID + " but got: " + uuid + - " this translog file belongs to a different translog"); - } - // Read the primary term - final long primaryTerm; - if (version == VERSION_PRIMARY_TERM) { - primaryTerm = in.readLong(); - } else { - assert version == VERSION_CHECKPOINTS : "Unknown header version [" + version + "]"; - primaryTerm = UNASSIGNED_PRIMARY_TERM; - } - // Verify the checksum - if (version >= VERSION_PRIMARY_TERM) { - Translog.verifyChecksum(in); - } - assert primaryTerm >= 0 : "Primary term must be non-negative [" + primaryTerm + "]; translog path [" + path + "]"; + " this translog file belongs to a different translog"); + } + // Read the primary term + final long primaryTerm; + if (version == VERSION_PRIMARY_TERM) { + primaryTerm = in.readLong(); + } else { + assert version == VERSION_CHECKPOINTS : "Unknown header version [" + version + "]"; + primaryTerm = UNASSIGNED_PRIMARY_TERM; + } + // Verify the checksum + if (version >= VERSION_PRIMARY_TERM) { + Translog.verifyChecksum(in); + } + assert primaryTerm >= 0 : "Primary term must be non-negative [" + primaryTerm + "]; translog path [" + path + "]"; - final int headerSizeInBytes = headerSizeInBytes(version, uuid.length); - assert channel.position() == headerSizeInBytes : - "Header is not fully read; header size [" + headerSizeInBytes + "], position [" + channel.position() + "]"; - return new TranslogHeader(translogUUID, primaryTerm, headerSizeInBytes); + final int headerSizeInBytes = headerSizeInBytes(version, uuid.length); + assert channel.position() == headerSizeInBytes : + "Header is not fully read; header size [" + headerSizeInBytes + "], position [" + channel.position() + "]"; + return new TranslogHeader(translogUUID, primaryTerm, headerSizeInBytes); + } catch (EOFException e) { + throw new TranslogCorruptedException(path.toString(), "translog header truncated", e); + } } private static void tryReportOldVersionError(final Path path, final FileChannel channel) throws IOException { diff --git a/server/src/main/java/org/elasticsearch/index/translog/TranslogSnapshot.java b/server/src/main/java/org/elasticsearch/index/translog/TranslogSnapshot.java index bff3e4eb2f5..4cfc886219f 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/TranslogSnapshot.java +++ b/server/src/main/java/org/elasticsearch/index/translog/TranslogSnapshot.java @@ -76,7 +76,7 @@ final class TranslogSnapshot extends BaseTranslogReader { return null; } - protected Translog.Operation readOperation() throws IOException { + private Translog.Operation readOperation() throws IOException { final int opSize = readSize(reusableBuffer, position); reuse = checksummedStream(reusableBuffer, position, opSize, reuse); Translog.Operation op = read(reuse); @@ -93,15 +93,19 @@ final class TranslogSnapshot extends BaseTranslogReader { * reads an operation at the given position into the given buffer. */ protected void readBytes(ByteBuffer buffer, long position) throws IOException { - if (position >= length) { - throw new EOFException("read requested past EOF. pos [" + position + "] end: [" + length + "], generation: [" + - getGeneration() + "], path: [" + path + "]"); + try { + if (position >= length) { + throw new EOFException("read requested past EOF. pos [" + position + "] end: [" + length + "], generation: [" + + getGeneration() + "], path: [" + path + "]"); + } + if (position < getFirstOperationOffset()) { + throw new IOException("read requested before position of first ops. pos [" + position + "] first op on: [" + + getFirstOperationOffset() + "], generation: [" + getGeneration() + "], path: [" + path + "]"); + } + Channels.readFromFileChannelWithEofException(channel, position, buffer); + } catch (EOFException e) { + throw new TranslogCorruptedException(path.toString(), "translog truncated", e); } - if (position < getFirstOperationOffset()) { - throw new IOException("read requested before position of first ops. pos [" + position + "] first op on: [" + - getFirstOperationOffset() + "], generation: [" + getGeneration() + "], path: [" + path + "]"); - } - Channels.readFromFileChannelWithEofException(channel, position, buffer); } @Override diff --git a/server/src/main/java/org/elasticsearch/index/translog/TruncateTranslogAction.java b/server/src/main/java/org/elasticsearch/index/translog/TruncateTranslogAction.java index e99128fd3e0..01a7836d813 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/TruncateTranslogAction.java +++ b/server/src/main/java/org/elasticsearch/index/translog/TruncateTranslogAction.java @@ -168,7 +168,6 @@ public class TruncateTranslogAction { private boolean isTranslogClean(ShardPath shardPath, String translogUUID) throws IOException { // perform clean check of translog instead of corrupted marker file - boolean clean = true; try { final Path translogPath = shardPath.resolveTranslog(); final long translogGlobalCheckpoint = Translog.readGlobalCheckpoint(translogPath, translogUUID); @@ -184,18 +183,19 @@ public class TruncateTranslogAction { try (Translog translog = new Translog(translogConfig, translogUUID, translogDeletionPolicy, () -> translogGlobalCheckpoint, () -> primaryTerm); Translog.Snapshot snapshot = translog.newSnapshot()) { + //noinspection StatementWithEmptyBody we are just checking that we can iterate through the whole snapshot while (snapshot.next() != null) { - // just iterate over snapshot } } + return true; } catch (TranslogCorruptedException e) { - clean = false; + return false; } - return clean; } /** Write a checkpoint file to the given location with the given generation */ - static void writeEmptyCheckpoint(Path filename, int translogLength, long translogGeneration, long globalCheckpoint) throws IOException { + private static void writeEmptyCheckpoint(Path filename, int translogLength, long translogGeneration, long globalCheckpoint) + throws IOException { Checkpoint emptyCheckpoint = Checkpoint.emptyTranslogCheckpoint(translogLength, translogGeneration, globalCheckpoint, translogGeneration); Checkpoint.write(FileChannel::open, filename, emptyCheckpoint, @@ -234,7 +234,7 @@ public class TruncateTranslogAction { } /** Return a Set of all files in a given directory */ - public static Set filesInDirectory(Path directory) throws IOException { + private static Set filesInDirectory(Path directory) throws IOException { Set files = new TreeSet<>(); try (DirectoryStream stream = Files.newDirectoryStream(directory)) { for (Path file : stream) { diff --git a/server/src/test/java/org/elasticsearch/index/shard/RemoveCorruptedShardDataCommandTests.java b/server/src/test/java/org/elasticsearch/index/shard/RemoveCorruptedShardDataCommandTests.java index 205ab46ffd2..ce9eca77cb3 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/RemoveCorruptedShardDataCommandTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/RemoveCorruptedShardDataCommandTests.java @@ -52,6 +52,7 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; import java.util.Arrays; +import java.util.Collections; import java.util.Set; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -67,9 +68,7 @@ public class RemoveCorruptedShardDataCommandTests extends IndexShardTestCase { private ShardId shardId; private ShardRouting routing; - private Path dataDir; private Environment environment; - private Settings settings; private ShardPath shardPath; private IndexMetaData indexMetaData; private IndexShard indexShard; @@ -86,7 +85,7 @@ public class RemoveCorruptedShardDataCommandTests extends IndexShardTestCase { routing = TestShardRouting.newShardRouting(shardId, nodeId, true, ShardRoutingState.INITIALIZING, RecoverySource.EmptyStoreRecoverySource.INSTANCE); - dataDir = createTempDir(); + final Path dataDir = createTempDir(); environment = TestEnvironment.newEnvironment(Settings.builder() @@ -96,7 +95,7 @@ public class RemoveCorruptedShardDataCommandTests extends IndexShardTestCase { // create same directory structure as prod does final Path path = NodeEnvironment.resolveNodePath(dataDir, 0); Files.createDirectories(path); - settings = Settings.builder() + final Settings settings = Settings.builder() .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) .put(MergePolicyConfig.INDEX_MERGE_ENABLED, false) @@ -217,7 +216,7 @@ public class RemoveCorruptedShardDataCommandTests extends IndexShardTestCase { // close shard closeShards(indexShard); - TestTranslog.corruptRandomTranslogFile(logger, random(), Arrays.asList(translogPath)); + TestTranslog.corruptRandomTranslogFile(logger, random(), Collections.singletonList(translogPath)); // test corrupted shard final IndexShard corruptedShard = reopenIndexShard(true); @@ -283,7 +282,7 @@ public class RemoveCorruptedShardDataCommandTests extends IndexShardTestCase { expectThrows(IndexShardRecoveryException.class, () -> newStartedShard(p -> corruptedShard, true)); closeShards(corruptedShard); } - TestTranslog.corruptRandomTranslogFile(logger, random(), Arrays.asList(translogPath)); + TestTranslog.corruptRandomTranslogFile(logger, random(), Collections.singletonList(translogPath)); final RemoveCorruptedShardDataCommand command = new RemoveCorruptedShardDataCommand(); final MockTerminal t = new MockTerminal(); diff --git a/server/src/test/java/org/elasticsearch/index/translog/TestTranslog.java b/server/src/test/java/org/elasticsearch/index/translog/TestTranslog.java index a3ebfff478e..af1693a45bf 100644 --- a/server/src/test/java/org/elasticsearch/index/translog/TestTranslog.java +++ b/server/src/test/java/org/elasticsearch/index/translog/TestTranslog.java @@ -47,6 +47,8 @@ import java.util.regex.Pattern; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.core.Is.is; import static org.hamcrest.core.IsNot.not; @@ -54,7 +56,7 @@ import static org.hamcrest.core.IsNot.not; * Helpers for testing translog. */ public class TestTranslog { - static final Pattern TRANSLOG_FILE_PATTERN = Pattern.compile("translog-(\\d+)\\.tlog"); + private static final Pattern TRANSLOG_FILE_PATTERN = Pattern.compile("translog-(\\d+)\\.tlog"); public static void corruptRandomTranslogFile(Logger logger, Random random, Collection translogDirs) throws IOException { for (Path translogDir : translogDirs) { @@ -65,12 +67,11 @@ public class TestTranslog { /** * Corrupts random translog file (translog-N.tlog) from the given translog directory. - * - * @return a translog file which has been corrupted. */ - public static Path corruptRandomTranslogFile(Logger logger, Random random, Path translogDir, long minGeneration) throws IOException { + public static void corruptRandomTranslogFile(Logger logger, Random random, Path translogDir, long minGeneration) + throws IOException { Set candidates = new TreeSet<>(); // TreeSet makes sure iteration order is deterministic - logger.info("--> Translog dir [{}], minUsedTranslogGen [{}]", translogDir, minGeneration); + logger.info("--> corruptRandomTranslogFile: translogDir [{}], minUsedTranslogGen [{}]", translogDir, minGeneration); try (DirectoryStream stream = Files.newDirectoryStream(translogDir)) { for (Path item : stream) { if (Files.isRegularFile(item)) { @@ -81,41 +82,51 @@ public class TestTranslog { } } } - assertThat(candidates, is(not(empty()))); + assertThat("no translog files found in " + translogDir, candidates, is(not(empty()))); Path corruptedFile = RandomPicks.randomFrom(random, candidates); corruptFile(logger, random, corruptedFile); - return corruptedFile; } + static void corruptFile(Logger logger, Random random, Path fileToCorrupt) throws IOException { + final long fileSize = Files.size(fileToCorrupt); + assertThat("cannot corrupt empty file " + fileToCorrupt, fileSize, greaterThan(0L)); - static void corruptFile(Logger logger, Random random, Path fileToCorrupt) throws IOException { - try (FileChannel raf = FileChannel.open(fileToCorrupt, StandardOpenOption.READ, StandardOpenOption.WRITE)) { - // read - raf.position(RandomNumbers.randomLongBetween(random, 0, raf.size() - 1)); - long filePointer = raf.position(); - ByteBuffer bb = ByteBuffer.wrap(new byte[1]); - raf.read(bb); - bb.flip(); + try (FileChannel fileChannel = FileChannel.open(fileToCorrupt, StandardOpenOption.READ, StandardOpenOption.WRITE)) { + final long corruptPosition = RandomNumbers.randomLongBetween(random, 0, fileSize - 1); - // corrupt - byte oldValue = bb.get(0); - byte newValue = (byte) (oldValue + 1); - bb.put(0, newValue); + if (random.nextBoolean()) { + // read + fileChannel.position(corruptPosition); + assertThat(fileChannel.position(), equalTo(corruptPosition)); + ByteBuffer bb = ByteBuffer.wrap(new byte[1]); + fileChannel.read(bb); + bb.flip(); - // rewrite - raf.position(filePointer); - raf.write(bb); - logger.info("--> corrupting file {} -- flipping at position {} from {} to {} file: {}", - fileToCorrupt, filePointer, Integer.toHexString(oldValue), - Integer.toHexString(newValue), fileToCorrupt); + // corrupt + byte oldValue = bb.get(0); + byte newValue; + do { + newValue = (byte) random.nextInt(0x100); + } while (newValue == oldValue); + bb.put(0, newValue); + + // rewrite + fileChannel.position(corruptPosition); + fileChannel.write(bb); + logger.info("--> corrupting file {} at position {} turning 0x{} into 0x{}", fileToCorrupt, corruptPosition, + Integer.toHexString(oldValue & 0xff), Integer.toHexString(newValue & 0xff)); + } else { + logger.info("--> truncating file {} from length {} to length {}", fileToCorrupt, fileSize, corruptPosition); + fileChannel.truncate(corruptPosition); + } } } /** * Lists all existing commits in a given index path, then read the minimum translog generation that will be used in recoverFromTranslog. */ - public static long minTranslogGenUsedInRecovery(Path translogPath) throws IOException { + private static long minTranslogGenUsedInRecovery(Path translogPath) throws IOException { try (NIOFSDirectory directory = new NIOFSDirectory(translogPath.getParent().resolve("index"))) { List commits = DirectoryReader.listCommits(directory); final String translogUUID = commits.get(commits.size() - 1).getUserData().get(Translog.TRANSLOG_UUID_KEY); diff --git a/server/src/test/java/org/elasticsearch/index/translog/TranslogHeaderTests.java b/server/src/test/java/org/elasticsearch/index/translog/TranslogHeaderTests.java index 7d06e25519a..0f2eed79bcb 100644 --- a/server/src/test/java/org/elasticsearch/index/translog/TranslogHeaderTests.java +++ b/server/src/test/java/org/elasticsearch/index/translog/TranslogHeaderTests.java @@ -34,6 +34,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.StandardOpenOption; +import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.lessThan; @@ -62,12 +63,23 @@ public class TranslogHeaderTests extends ESTestCase { }); assertThat(mismatchUUID.getMessage(), containsString("this translog file belongs to a different translog")); int corruptions = between(1, 10); - for (int i = 0; i < corruptions; i++) { + for (int i = 0; i < corruptions && Files.size(translogFile) > 0; i++) { TestTranslog.corruptFile(logger, random(), translogFile); } expectThrows(TranslogCorruptedException.class, () -> { try (FileChannel channel = FileChannel.open(translogFile, StandardOpenOption.READ)) { - TranslogHeader.read(outHeader.getTranslogUUID(), translogFile, channel); + final TranslogHeader translogHeader = TranslogHeader.read(outHeader.getTranslogUUID(), translogFile, channel); + // succeeds if the corruption corrupted the version byte making this look like a v2 translog, because we don't check the + // checksum on this version + assertThat("version " + TranslogHeader.VERSION_CHECKPOINTS + " translog", + translogHeader.getPrimaryTerm(), equalTo(SequenceNumbers.UNASSIGNED_PRIMARY_TERM)); + throw new TranslogCorruptedException(translogFile.toString(), "adjusted translog version"); + } catch (IllegalStateException e) { + // corruption corrupted the version byte making this look like a v2, v1 or v0 translog + assertThat("version " + TranslogHeader.VERSION_CHECKPOINTS + "-or-earlier translog", + e.getMessage(), anyOf(containsString("pre-2.0 translog found"), containsString("pre-1.4 translog found"), + containsString("pre-6.3 translog found"))); + throw new TranslogCorruptedException(translogFile.toString(), "adjusted translog version", e); } }); } diff --git a/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index ceca8a811a6..f2401505cba 100644 --- a/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -128,6 +128,7 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.hasToString; +import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.isIn; import static org.hamcrest.Matchers.lessThanOrEqualTo; @@ -882,7 +883,8 @@ public class TranslogTests extends ESTestCase { for (int i = 0; i < locations.size(); i++) { try { assertNotNull(snap.next()); - } catch (EOFException e) { + } catch (TranslogCorruptedException e) { + assertThat(e.getCause(), instanceOf(EOFException.class)); truncations.incrementAndGet(); } }