Include translog path in error message when translog is corrupted (#32251)

Currently, when TranslogCorruptedException is thrown most of the times it does not contain information about the translog location on the file system. There is the translog recovery tool that accepts the translog path as an argument and users are constantly puzzled where to get the path.
This pull request adds "source" information to every TranslogCorruptedException thrown. The source could be local file, remote translog source (used for recovery), assertion (translog entry is constructed to perform some assertion) or translog constructed inside the test.
Closes #24929
This commit is contained in:
Andrey Ershov 2018-08-07 13:03:43 +02:00 committed by GitHub
parent 1f50950099
commit 6449d9bc14
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 121 additions and 115 deletions

View File

@ -79,7 +79,9 @@ public abstract class BaseTranslogReader implements Comparable<BaseTranslogReade
final int size = reusableBuffer.getInt() + 4;
final long maxSize = sizeInBytes() - position;
if (size < 0 || size > maxSize) {
throw new TranslogCorruptedException("operation size is corrupted must be [0.." + maxSize + "] but was: " + size);
throw new TranslogCorruptedException(
path.toString(),
"operation size is corrupted must be [0.." + maxSize + "] but was: " + size);
}
return size;
}
@ -103,14 +105,16 @@ public abstract class BaseTranslogReader implements Comparable<BaseTranslogReade
buffer.limit(opSize);
readBytes(buffer, position);
buffer.flip();
return new BufferedChecksumStreamInput(new ByteBufferStreamInput(buffer), reuse);
return new BufferedChecksumStreamInput(new ByteBufferStreamInput(buffer), path.toString(), reuse);
}
protected Translog.Operation read(BufferedChecksumStreamInput inStream) throws IOException {
final Translog.Operation op = Translog.readOperation(inStream);
if (op.primaryTerm() > getPrimaryTerm() && getPrimaryTerm() != TranslogHeader.UNKNOWN_PRIMARY_TERM) {
throw new TranslogCorruptedException("Operation's term is newer than translog header term; " +
"operation term[" + op.primaryTerm() + "], translog header term [" + getPrimaryTerm() + "]");
throw new TranslogCorruptedException(
path.toString(),
"operation's term is newer than translog header term; " +
"operation term[" + op.primaryTerm() + "], translog header term [" + getPrimaryTerm() + "]");
}
return op;
}

View File

@ -35,14 +35,11 @@ public final class BufferedChecksumStreamInput extends FilterStreamInput {
private static final int SKIP_BUFFER_SIZE = 1024;
private byte[] skipBuffer;
private final Checksum digest;
private final String source;
public BufferedChecksumStreamInput(StreamInput in) {
super(in);
this.digest = new BufferedChecksum(new CRC32());
}
public BufferedChecksumStreamInput(StreamInput in, BufferedChecksumStreamInput reuse) {
public BufferedChecksumStreamInput(StreamInput in, String source, BufferedChecksumStreamInput reuse) {
super(in);
this.source = source;
if (reuse == null ) {
this.digest = new BufferedChecksum(new CRC32());
} else {
@ -52,6 +49,10 @@ public final class BufferedChecksumStreamInput extends FilterStreamInput {
}
}
public BufferedChecksumStreamInput(StreamInput in, String source) {
this(in, source, null);
}
public long getChecksum() {
return this.digest.getValue();
}
@ -85,7 +86,6 @@ public final class BufferedChecksumStreamInput extends FilterStreamInput {
return delegate.markSupported();
}
@Override
public long skip(long numBytes) throws IOException {
if (numBytes < 0) {
@ -104,7 +104,6 @@ public final class BufferedChecksumStreamInput extends FilterStreamInput {
return skipped;
}
@Override
public synchronized void mark(int readlimit) {
delegate.mark(readlimit);
@ -114,4 +113,7 @@ public final class BufferedChecksumStreamInput extends FilterStreamInput {
digest.reset();
}
public String getSource(){
return source;
}
}

View File

@ -1427,7 +1427,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
long expectedChecksum = in.getChecksum();
long readChecksum = Integer.toUnsignedLong(in.readInt());
if (readChecksum != expectedChecksum) {
throw new TranslogCorruptedException("translog stream is corrupted, expected: 0x" +
throw new TranslogCorruptedException(in.getSource(), "checksum verification failed - expected: 0x" +
Long.toHexString(expectedChecksum) + ", got: 0x" + Long.toHexString(readChecksum));
}
}
@ -1435,10 +1435,10 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
/**
* Reads a list of operations written with {@link #writeOperations(StreamOutput, List)}
*/
public static List<Operation> readOperations(StreamInput input) throws IOException {
public static List<Operation> readOperations(StreamInput input, String source) throws IOException {
ArrayList<Operation> operations = new ArrayList<>();
int numOps = input.readInt();
final BufferedChecksumStreamInput checksumStreamInput = new BufferedChecksumStreamInput(input);
final BufferedChecksumStreamInput checksumStreamInput = new BufferedChecksumStreamInput(input, source);
for (int i = 0; i < numOps; i++) {
operations.add(readOperation(checksumStreamInput));
}
@ -1450,7 +1450,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
try {
final int opSize = in.readInt();
if (opSize < 4) { // 4byte for the checksum
throw new TranslogCorruptedException("operation size must be at least 4 but was: " + opSize);
throw new TranslogCorruptedException(in.getSource(), "operation size must be at least 4 but was: " + opSize);
}
in.resetDigest(); // size is not part of the checksum!
if (in.markSupported()) { // if we can we validate the checksum first
@ -1465,17 +1465,15 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
}
operation = Translog.Operation.readOperation(in);
verifyChecksum(in);
} catch (TranslogCorruptedException e) {
throw e;
} catch (EOFException e) {
throw new TruncatedTranslogException("reached premature end of file, translog is truncated", e);
throw new TruncatedTranslogException(in.getSource(), "reached premature end of file, translog is truncated", e);
}
return operation;
}
/**
* Writes all operations in the given iterable to the given output stream including the size of the array
* use {@link #readOperations(StreamInput)} to read it back.
* use {@link #readOperations(StreamInput, String)} to read it back.
*/
public static void writeOperations(StreamOutput outStream, List<Operation> toWrite) throws IOException {
final ReleasableBytesStreamOutput out = new ReleasableBytesStreamOutput(BigArrays.NON_RECYCLING_INSTANCE);
@ -1716,7 +1714,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
} catch (TranslogCorruptedException ex) {
throw ex; // just bubble up.
} catch (Exception ex) {
throw new TranslogCorruptedException("Translog at [" + location + "] is corrupted", ex);
throw new TranslogCorruptedException(location.toString(), ex);
}
return checkpoint;
}

View File

@ -25,15 +25,27 @@ import org.elasticsearch.common.io.stream.StreamInput;
import java.io.IOException;
public class TranslogCorruptedException extends ElasticsearchException {
public TranslogCorruptedException(String msg) {
super(msg);
public TranslogCorruptedException(String source, String details) {
super(corruptedMessage(source, details));
}
public TranslogCorruptedException(String msg, Throwable cause) {
super(msg, cause);
public TranslogCorruptedException(String source, Throwable cause) {
this(source, null, cause);
}
public TranslogCorruptedException(StreamInput in) throws IOException{
public TranslogCorruptedException(String source, String details, Throwable cause) {
super(corruptedMessage(source, details), cause);
}
private static String corruptedMessage(String source, String details) {
String msg = "translog from source [" + source + "] is corrupted";
if (details != null) {
msg += ", " + details;
}
return msg;
}
public TranslogCorruptedException(StreamInput in) throws IOException {
super(in);
}
}

View File

@ -110,13 +110,15 @@ final class TranslogHeader {
static TranslogHeader read(final String translogUUID, final Path path, final FileChannel channel) throws IOException {
// This input is intentionally not closed because closing it will close the FileChannel.
final BufferedChecksumStreamInput in =
new BufferedChecksumStreamInput(new InputStreamStreamInput(java.nio.channels.Channels.newInputStream(channel), channel.size()));
new BufferedChecksumStreamInput(
new InputStreamStreamInput(java.nio.channels.Channels.newInputStream(channel), channel.size()),
path.toString());
final int version;
try {
version = CodecUtil.checkHeader(new InputStreamDataInput(in), TRANSLOG_CODEC, VERSION_CHECKSUMS, VERSION_PRIMARY_TERM);
} catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException e) {
tryReportOldVersionError(path, channel);
throw new TranslogCorruptedException("Translog header corrupted. path:" + path, e);
throw new TranslogCorruptedException(path.toString(), "translog header corrupted", e);
}
if (version == VERSION_CHECKSUMS) {
throw new IllegalStateException("pre-2.0 translog found [" + path + "]");
@ -124,15 +126,19 @@ final class TranslogHeader {
// Read the translogUUID
final int uuidLen = in.readInt();
if (uuidLen > channel.size()) {
throw new TranslogCorruptedException("uuid length can't be larger than the translog");
throw new TranslogCorruptedException(
path.toString(),
"UUID length can't be larger than the translog");
}
final BytesRef uuid = new BytesRef(uuidLen);
uuid.length = uuidLen;
in.read(uuid.bytes, uuid.offset, uuid.length);
final BytesRef expectedUUID = new BytesRef(translogUUID);
if (uuid.bytesEquals(expectedUUID) == false) {
throw new TranslogCorruptedException("expected shard UUID " + expectedUUID + " but got: " + uuid +
" this translog file belongs to a different translog. path:" + path);
throw new TranslogCorruptedException(
path.toString(),
"expected shard UUID " + expectedUUID + " but got: " + uuid +
" this translog file belongs to a different translog");
}
// Read the primary term
final long primaryTerm;
@ -164,7 +170,9 @@ final class TranslogHeader {
// 0x00 => version 0 of the translog
final byte b1 = Channels.readFromFileChannel(channel, 0, 1)[0];
if (b1 == 0x3f) { // LUCENE_CODEC_HEADER_BYTE
throw new TranslogCorruptedException("translog looks like version 1 or later, but has corrupted header. path:" + path);
throw new TranslogCorruptedException(
path.toString(),
"translog looks like version 1 or later, but has corrupted header" );
} else if (b1 == 0x00) { // UNVERSIONED_TRANSLOG_HEADER_BYTE
throw new IllegalStateException("pre-1.4 translog found [" + path + "]");
}

View File

@ -200,8 +200,10 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
} else if (seenSequenceNumbers.containsKey(seqNo)) {
final Tuple<BytesReference, Exception> previous = seenSequenceNumbers.get(seqNo);
if (previous.v1().equals(data) == false) {
Translog.Operation newOp = Translog.readOperation(new BufferedChecksumStreamInput(data.streamInput()));
Translog.Operation prvOp = Translog.readOperation(new BufferedChecksumStreamInput(previous.v1().streamInput()));
Translog.Operation newOp = Translog.readOperation(
new BufferedChecksumStreamInput(data.streamInput(), "assertion"));
Translog.Operation prvOp = Translog.readOperation(
new BufferedChecksumStreamInput(previous.v1().streamInput(), "assertion"));
if (newOp.equals(prvOp) == false) {
throw new AssertionError(
"seqNo [" + seqNo + "] was processed twice in generation [" + generation + "], with different data. " +
@ -220,7 +222,8 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
.forEach(e -> {
final Translog.Operation op;
try {
op = Translog.readOperation(new BufferedChecksumStreamInput(e.getValue().v1().streamInput()));
op = Translog.readOperation(
new BufferedChecksumStreamInput(e.getValue().v1().streamInput(), "assertion"));
} catch (IOException ex) {
throw new RuntimeException(ex);
}

View File

@ -25,11 +25,12 @@ import java.io.IOException;
public class TruncatedTranslogException extends TranslogCorruptedException {
public TruncatedTranslogException(String msg, Throwable cause) {
super(msg, cause);
}
public TruncatedTranslogException(StreamInput in) throws IOException {
super(in);
}
public TruncatedTranslogException(String source, String details, Throwable cause) {
super(source, details, cause);
}
}

View File

@ -66,7 +66,7 @@ public class RecoveryTranslogOperationsRequest extends TransportRequest {
super.readFrom(in);
recoveryId = in.readLong();
shardId = ShardId.readShardId(in);
operations = Translog.readOperations(in);
operations = Translog.readOperations(in, "recovery");
totalTranslogOps = in.readVInt();
}

View File

@ -44,6 +44,7 @@ import org.elasticsearch.test.engine.MockEngineSupport;
import org.elasticsearch.test.transport.MockTransportService;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collection;
@ -86,7 +87,7 @@ public class CorruptedTranslogIT extends ESIntegTestCase {
indexRandom(false, false, false, Arrays.asList(builders)); // this one
// Corrupt the translog file(s)
corruptRandomTranslogFiles();
corruptRandomTranslogFile();
// Restart the single node
internalCluster().fullRestart();
@ -102,7 +103,7 @@ public class CorruptedTranslogIT extends ESIntegTestCase {
}
private void corruptRandomTranslogFiles() throws IOException {
private void corruptRandomTranslogFile() throws IOException {
ClusterState state = client().admin().cluster().prepareState().get().getState();
GroupShardsIterator shardIterators = state.getRoutingTable().activePrimaryShardsGrouped(new String[]{"test"}, false);
final Index test = state.metaData().index("test").getIndex();
@ -119,9 +120,12 @@ public class CorruptedTranslogIT extends ESIntegTestCase {
String path = fsPath.getPath();
String relativeDataLocationPath = "indices/" + test.getUUID() + "/" + Integer.toString(shardRouting.getId()) + "/translog";
Path translogDir = PathUtils.get(path).resolve(relativeDataLocationPath);
translogDirs.add(translogDir);
if (Files.isDirectory(translogDir)) {
translogDirs.add(translogDir);
}
}
TestTranslog.corruptTranslogFiles(logger, random(), translogDirs);
Path translogDir = RandomPicks.randomFrom(random(), translogDirs);
TestTranslog.corruptRandomTranslogFile(logger, random(), translogDir, TestTranslog.minTranslogGenUsedInRecovery(translogDir));
}
/** Disables translog flushing for the specified index */

View File

@ -34,8 +34,6 @@ import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.Set;
@ -45,7 +43,8 @@ import java.util.regex.Pattern;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.core.Is.is;
import static org.hamcrest.core.IsNot.not;
/**
* Helpers for testing translog.
@ -54,44 +53,33 @@ public class TestTranslog {
static final Pattern TRANSLOG_FILE_PATTERN = Pattern.compile("translog-(\\d+)\\.tlog");
/**
* Corrupts some translog files (translog-N.tlog) from the given translog directories.
* Corrupts random translog file (translog-N.tlog) from the given translog directory.
*
* @return a collection of tlog files that have been corrupted.
* @return a translog file which has been corrupted.
*/
public static Set<Path> corruptTranslogFiles(Logger logger, Random random, Collection<Path> translogDirs) throws IOException {
public static Path corruptRandomTranslogFile(Logger logger, Random random, Path translogDir, long minGeneration) throws
IOException {
Set<Path> candidates = new TreeSet<>(); // TreeSet makes sure iteration order is deterministic
for (Path translogDir : translogDirs) {
if (Files.isDirectory(translogDir)) {
final long minUsedTranslogGen = minTranslogGenUsedInRecovery(translogDir);
logger.info("--> Translog dir [{}], minUsedTranslogGen [{}]", translogDir, minUsedTranslogGen);
try (DirectoryStream<Path> stream = Files.newDirectoryStream(translogDir)) {
for (Path item : stream) {
if (Files.isRegularFile(item)) {
// Makes sure that we will corrupt tlog files that are referenced by the Checkpoint.
final Matcher matcher = TRANSLOG_FILE_PATTERN.matcher(item.getFileName().toString());
if (matcher.matches() && Long.parseLong(matcher.group(1)) >= minUsedTranslogGen) {
candidates.add(item);
}
}
logger.info("--> Translog dir [{}], minUsedTranslogGen [{}]", translogDir, minGeneration);
try (DirectoryStream<Path> stream = Files.newDirectoryStream(translogDir)) {
for (Path item : stream) {
if (Files.isRegularFile(item)) {
final Matcher matcher = TRANSLOG_FILE_PATTERN.matcher(item.getFileName().toString());
if (matcher.matches() && Long.parseLong(matcher.group(1)) >= minGeneration) {
candidates.add(item);
}
}
}
}
assertThat(candidates, is(not(empty())));
Set<Path> corruptedFiles = new HashSet<>();
if (!candidates.isEmpty()) {
int corruptions = RandomNumbers.randomIntBetween(random, 5, 20);
for (int i = 0; i < corruptions; i++) {
Path fileToCorrupt = RandomPicks.randomFrom(random, candidates);
corruptFile(logger, random, fileToCorrupt);
corruptedFiles.add(fileToCorrupt);
}
}
assertThat("no translog file corrupted", corruptedFiles, not(empty()));
return corruptedFiles;
Path corruptedFile = RandomPicks.randomFrom(random, candidates);
corruptFile(logger, random, corruptedFile);
return corruptedFile;
}
static void corruptFile(Logger logger, Random random, Path fileToCorrupt) throws IOException {
static void corruptFile(Logger logger, Random random, Path fileToCorrupt) throws IOException {
try (FileChannel raf = FileChannel.open(fileToCorrupt, StandardOpenOption.READ, StandardOpenOption.WRITE)) {
// read
raf.position(RandomNumbers.randomLongBetween(random, 0, raf.size() - 1));
@ -117,7 +105,7 @@ public class TestTranslog {
/**
* Lists all existing commits in a given index path, then read the minimum translog generation that will be used in recoverFromTranslog.
*/
private static long minTranslogGenUsedInRecovery(Path translogPath) throws IOException {
public static long minTranslogGenUsedInRecovery(Path translogPath) throws IOException {
try (NIOFSDirectory directory = new NIOFSDirectory(translogPath.getParent().resolve("index"))) {
List<IndexCommit> commits = DirectoryReader.listCommits(directory);
final String translogUUID = commits.get(commits.size() - 1).getUserData().get(Translog.TRANSLOG_UUID_KEY);

View File

@ -748,7 +748,9 @@ public class TranslogTests extends ESTestCase {
}
public void testTranslogChecksums() throws Exception {
public void testTranslogCorruption() throws Exception {
TranslogConfig config = translog.getConfig();
String uuid = translog.getTranslogUUID();
List<Translog.Location> locations = new ArrayList<>();
int translogOperations = randomIntBetween(10, 100);
@ -756,23 +758,23 @@ public class TranslogTests extends ESTestCase {
String ascii = randomAlphaOfLengthBetween(1, 50);
locations.add(translog.add(new Translog.Index("test", "" + op, op, primaryTerm.get(), ascii.getBytes("UTF-8"))));
}
translog.sync();
translog.close();
corruptTranslogs(translogDir);
TestTranslog.corruptRandomTranslogFile(logger, random(), translogDir, 0);
int corruptionsCaught = 0;
AtomicInteger corruptionsCaught = new AtomicInteger(0);
try (Translog.Snapshot snapshot = translog.newSnapshot()) {
for (Translog.Location location : locations) {
try {
Translog.Operation next = snapshot.next();
assertNotNull(next);
} catch (TranslogCorruptedException e) {
corruptionsCaught.incrementAndGet();
try (Translog translog = openTranslog(config, uuid)) {
try (Translog.Snapshot snapshot = translog.newSnapshot()) {
for (Location loc : locations) {
snapshot.next();
}
}
expectThrows(TranslogCorruptedException.class, snapshot::next);
assertThat("at least one corruption was caused and caught", corruptionsCaught.get(), greaterThanOrEqualTo(1));
} catch (TranslogCorruptedException e) {
assertThat(e.getMessage(), containsString(translogDir.toString()));
corruptionsCaught++;
}
assertThat("corruption is caught", corruptionsCaught, greaterThanOrEqualTo(1));
}
public void testTruncatedTranslogs() throws Exception {
@ -816,25 +818,6 @@ public class TranslogTests extends ESTestCase {
}
/**
* Randomly overwrite some bytes in the translog files
*/
private void corruptTranslogs(Path directory) throws Exception {
Path[] files = FileSystemUtils.files(directory, "translog-*");
for (Path file : files) {
logger.info("--> corrupting {}...", file);
FileChannel f = FileChannel.open(file, StandardOpenOption.READ, StandardOpenOption.WRITE);
int corruptions = scaledRandomIntBetween(10, 50);
for (int i = 0; i < corruptions; i++) {
// note: with the current logic, this will sometimes be a no-op
long pos = randomIntBetween(0, (int) f.size());
ByteBuffer junk = ByteBuffer.wrap(new byte[]{randomByte()});
f.write(junk, pos);
}
f.close();
}
}
private Term newUid(ParsedDocument doc) {
return new Term("_id", Uid.encodeId(doc.id()));
}
@ -1505,7 +1488,8 @@ public class TranslogTests extends ESTestCase {
ops.add(test);
}
Translog.writeOperations(out, ops);
final List<Translog.Operation> readOperations = Translog.readOperations(out.bytes().streamInput());
final List<Translog.Operation> readOperations = Translog.readOperations(
out.bytes().streamInput(), "testSnapshotFromStreamInput");
assertEquals(ops.size(), readOperations.size());
assertEquals(ops, readOperations);
}

View File

@ -153,9 +153,9 @@ public class TruncateTranslogIT extends ESIntegTestCase {
// shut down the replica node to be tested later
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(replicaNode));
// Corrupt the translog file(s)
// Corrupt the translog file
logger.info("--> corrupting translog");
corruptRandomTranslogFiles("test");
corruptRandomTranslogFile("test");
// Restart the single node
logger.info("--> restarting node");
@ -267,15 +267,16 @@ public class TruncateTranslogIT extends ESIntegTestCase {
// sample the replica node translog dirs
final ShardId shardId = new ShardId(resolveIndex("test"), 0);
Set<Path> translogDirs = getTranslogDirs(replicaNode, shardId);
Path tdir = randomFrom(translogDirs);
// stop the cluster nodes. we don't use full restart so the node start up order will be the same
// and shard roles will be maintained
internalCluster().stopRandomDataNode();
internalCluster().stopRandomDataNode();
// Corrupt the translog file(s)
// Corrupt the translog file
logger.info("--> corrupting translog");
TestTranslog.corruptTranslogFiles(logger, random(), translogDirs);
TestTranslog.corruptRandomTranslogFile(logger, random(), tdir, TestTranslog.minTranslogGenUsedInRecovery(tdir));
// Restart the single node
logger.info("--> starting node");
@ -358,9 +359,10 @@ public class TruncateTranslogIT extends ESIntegTestCase {
return translogDirs;
}
private void corruptRandomTranslogFiles(String indexName) throws IOException {
private void corruptRandomTranslogFile(String indexName) throws IOException {
Set<Path> translogDirs = getTranslogDirs(indexName);
TestTranslog.corruptTranslogFiles(logger, random(), translogDirs);
Path translogDir = randomFrom(translogDirs);
TestTranslog.corruptRandomTranslogFile(logger, random(), translogDir, TestTranslog.minTranslogGenUsedInRecovery(translogDir));
}
/** Disables translog flushing for the specified index */