HDFS-8965. Harden edit log reading code against out of memory errors (cmccabe)

This commit is contained in:
Colin Patrick Mccabe 2015-08-31 17:31:29 -07:00
parent 8fa41d9dd4
commit 24f6a7c956
8 changed files with 341 additions and 105 deletions

View File

@ -872,6 +872,8 @@ Release 2.8.0 - UNRELEASED
HDFS-8946. Improve choosing datanode storage for block placement. (yliu)
HDFS-8965. Harden edit log reading code against out of memory errors (cmccabe)
OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

View File

@ -83,7 +83,7 @@ class BookKeeperEditLogInputStream extends EditLogInputStream {
tracker = new FSEditLogLoader.PositionTrackingInputStream(bin);
DataInputStream in = new DataInputStream(tracker);
reader = new FSEditLogOp.Reader(in, tracker, logVersion);
reader = FSEditLogOp.Reader.create(in, tracker, logVersion);
}
@Override

View File

@ -87,7 +87,7 @@ public class LayoutVersion {
FSIMAGE_COMPRESSION(-25, "Support for fsimage compression"),
FSIMAGE_CHECKSUM(-26, "Support checksum for fsimage"),
REMOVE_REL13_DISK_LAYOUT_SUPPORT(-27, "Remove support for 0.13 disk layout"),
EDITS_CHESKUM(-28, "Support checksum for editlog"),
EDITS_CHECKSUM(-28, "Support checksum for editlog"),
UNUSED(-29, "Skipped version"),
FSIMAGE_NAME_OPTIMIZATION(-30, "Store only last part of path in fsimage"),
RESERVED_REL20_203(-31, -19, "Reserved for release 0.20.203", true,

View File

@ -119,7 +119,7 @@ class EditLogBackupInputStream extends EditLogInputStream {
this.version = version;
reader = new FSEditLogOp.Reader(in, tracker, version);
reader = FSEditLogOp.Reader.create(in, tracker, version);
}
void clear() throws IOException {

View File

@ -157,7 +157,7 @@ public class EditLogFileInputStream extends EditLogInputStream {
"flags from log");
}
}
reader = new FSEditLogOp.Reader(dataIn, tracker, logVersion);
reader = FSEditLogOp.Reader.create(dataIn, tracker, logVersion);
reader.setMaxOpSize(maxOpSize);
state = State.OPEN;
} finally {

View File

@ -4518,42 +4518,46 @@ public abstract class FSEditLogOp {
/**
* Class for reading editlog ops from a stream
*/
public static class Reader {
private final DataInputStream in;
private final StreamLimiter limiter;
private final int logVersion;
private final Checksum checksum;
private final OpInstanceCache cache;
private int maxOpSize;
private final boolean supportEditLogLength;
public abstract static class Reader {
final DataInputStream in;
final StreamLimiter limiter;
final OpInstanceCache cache;
final byte[] temp = new byte[4096];
final int logVersion;
int maxOpSize;
public static Reader create(DataInputStream in, StreamLimiter limiter,
int logVersion) {
if (logVersion < NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION) {
// Use the LengthPrefixedReader on edit logs which are newer than what
// we can parse. (Newer layout versions are represented by smaller
// negative integers, for historical reasons.) Even though we can't
// parse the Ops contained in them, we should still be able to call
// scanOp on them. This is important for the JournalNode during rolling
// upgrade.
return new LengthPrefixedReader(in, limiter, logVersion);
} else if (NameNodeLayoutVersion.supports(
NameNodeLayoutVersion.Feature.EDITLOG_LENGTH, logVersion)) {
return new LengthPrefixedReader(in, limiter, logVersion);
} else if (NameNodeLayoutVersion.supports(
LayoutVersion.Feature.EDITS_CHECKSUM, logVersion)) {
Checksum checksum = DataChecksum.newCrc32();
return new ChecksummedReader(checksum, in, limiter, logVersion);
} else {
return new LegacyReader(in, limiter, logVersion);
}
}
/**
* Construct the reader
* @param in The stream to read from.
* @param logVersion The version of the data coming from the stream.
* @param in The stream to read from.
* @param limiter The limiter for this stream.
* @param logVersion The version of the data coming from the stream.
*/
public Reader(DataInputStream in, StreamLimiter limiter, int logVersion) {
this.logVersion = logVersion;
if (NameNodeLayoutVersion.supports(
LayoutVersion.Feature.EDITS_CHESKUM, logVersion)) {
this.checksum = DataChecksum.newCrc32();
} else {
this.checksum = null;
}
// It is possible that the logVersion is actually a future layoutversion
// during the rolling upgrade (e.g., the NN gets upgraded first). We
// assume future layout will also support length of editlog op.
this.supportEditLogLength = NameNodeLayoutVersion.supports(
NameNodeLayoutVersion.Feature.EDITLOG_LENGTH, logVersion)
|| logVersion < NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION;
if (this.checksum != null) {
this.in = new DataInputStream(
new CheckedInputStream(in, this.checksum));
} else {
this.in = in;
}
Reader(DataInputStream in, StreamLimiter limiter, int logVersion) {
this.in = in;
this.limiter = limiter;
this.logVersion = logVersion;
this.cache = new OpInstanceCache();
this.maxOpSize = DFSConfigKeys.DFS_NAMENODE_MAX_OP_SIZE_DEFAULT;
}
@ -4606,26 +4610,25 @@ public abstract class FSEditLogOp {
}
}
private void verifyTerminator() throws IOException {
void verifyTerminator() throws IOException {
/** The end of the edit log should contain only 0x00 or 0xff bytes.
* If it contains other bytes, the log itself may be corrupt.
* It is important to check this; if we don't, a stray OP_INVALID byte
* could make us stop reading the edit log halfway through, and we'd never
* know that we had lost data.
*/
byte[] buf = new byte[4096];
limiter.clearLimit();
int numRead = -1, idx = 0;
while (true) {
try {
numRead = -1;
idx = 0;
numRead = in.read(buf);
numRead = in.read(temp);
if (numRead == -1) {
return;
}
while (idx < numRead) {
if ((buf[idx] != (byte)0) && (buf[idx] != (byte)-1)) {
if ((temp[idx] != (byte)0) && (temp[idx] != (byte)-1)) {
throw new IOException("Read extra bytes after " +
"the terminator!");
}
@ -4638,7 +4641,7 @@ public abstract class FSEditLogOp {
if (numRead != -1) {
in.reset();
IOUtils.skipFully(in, idx);
in.mark(buf.length + 1);
in.mark(temp.length + 1);
IOUtils.skipFully(in, 1);
}
}
@ -4653,14 +4656,164 @@ public abstract class FSEditLogOp {
* If an exception is thrown, the stream's mark will be set to the first
* problematic byte. This usually means the beginning of the opcode.
*/
private FSEditLogOp decodeOp() throws IOException {
public abstract FSEditLogOp decodeOp() throws IOException;
/**
* Similar to decodeOp(), but we only retrieve the transaction ID of the
* Op rather than reading it. If the edit log format supports length
* prefixing, this can be much faster than full decoding.
*
* @return the last txid of the segment, or INVALID_TXID on EOF.
*/
public abstract long scanOp() throws IOException;
}
/**
* Reads edit logs which are prefixed with a length. These edit logs also
* include a checksum and transaction ID.
*/
private static class LengthPrefixedReader extends Reader {
/**
* The minimum length of a length-prefixed Op.
*
* The minimum Op has:
* 1-byte opcode
* 4-byte length
* 8-byte txid
* 0-byte body
* 4-byte checksum
*/
private static final int MIN_OP_LENGTH = 17;
/**
* The op id length.
*
* Not included in the stored length.
*/
private static final int OP_ID_LENGTH = 1;
/**
* The checksum length.
*
* Not included in the stored length.
*/
private static final int CHECKSUM_LENGTH = 4;
private final Checksum checksum;
LengthPrefixedReader(DataInputStream in, StreamLimiter limiter,
int logVersion) {
super(in, limiter, logVersion);
this.checksum = DataChecksum.newCrc32();
}
@Override
public FSEditLogOp decodeOp() throws IOException {
long txid = decodeOpFrame();
if (txid == HdfsServerConstants.INVALID_TXID) {
return null;
}
in.reset();
in.mark(maxOpSize);
FSEditLogOpCodes opCode = FSEditLogOpCodes.fromByte(in.readByte());
FSEditLogOp op = cache.get(opCode);
if (op == null) {
throw new IOException("Read invalid opcode " + opCode);
}
op.setTransactionId(txid);
IOUtils.skipFully(in, 4 + 8); // skip length and txid
op.readFields(in, logVersion);
// skip over the checksum, which we validated above.
IOUtils.skipFully(in, CHECKSUM_LENGTH);
return op;
}
@Override
public long scanOp() throws IOException {
return decodeOpFrame();
}
/**
* Decode the opcode "frame". This includes reading the opcode and
* transaction ID, and validating the checksum and length. It does not
* include reading the opcode-specific fields.
* The input stream will be advanced to the end of the op at the end of this
* function.
*
* @return An op with the txid set, but none of the other fields
* filled in, or null if we hit EOF.
*/
private long decodeOpFrame() throws IOException {
limiter.setLimit(maxOpSize);
in.mark(maxOpSize);
if (checksum != null) {
checksum.reset();
byte opCodeByte;
try {
opCodeByte = in.readByte();
} catch (EOFException eof) {
// EOF at an opcode boundary is expected.
return HdfsServerConstants.INVALID_TXID;
}
if (opCodeByte == FSEditLogOpCodes.OP_INVALID.getOpCode()) {
verifyTerminator();
return HdfsServerConstants.INVALID_TXID;
}
// Here, we verify that the Op size makes sense and that the
// data matches its checksum before attempting to construct an Op.
// This is important because otherwise we may encounter an
// OutOfMemoryException which could bring down the NameNode or
// JournalNode when reading garbage data.
int opLength = in.readInt() + OP_ID_LENGTH + CHECKSUM_LENGTH;
if (opLength > maxOpSize) {
throw new IOException("Op " + (int)opCodeByte + " has size " +
opLength + ", but maxOpSize = " + maxOpSize);
} else if (opLength < MIN_OP_LENGTH) {
throw new IOException("Op " + (int)opCodeByte + " has size " +
opLength + ", but the minimum op size is " + MIN_OP_LENGTH);
}
long txid = in.readLong();
// Verify checksum
in.reset();
in.mark(maxOpSize);
checksum.reset();
for (int rem = opLength - CHECKSUM_LENGTH; rem > 0;) {
int toRead = Math.min(temp.length, rem);
IOUtils.readFully(in, temp, 0, toRead);
checksum.update(temp, 0, toRead);
rem -= toRead;
}
int expectedChecksum = in.readInt();
int calculatedChecksum = (int)checksum.getValue();
if (expectedChecksum != calculatedChecksum) {
throw new ChecksumException(
"Transaction is corrupt. Calculated checksum is " +
calculatedChecksum + " but read checksum " +
expectedChecksum, txid);
}
return txid;
}
}
/**
* Read edit logs which have a checksum and a transaction ID, but not a
* length.
*/
private static class ChecksummedReader extends Reader {
private final Checksum checksum;
ChecksummedReader(Checksum checksum, DataInputStream in,
StreamLimiter limiter, int logVersion) {
super(new DataInputStream(
new CheckedInputStream(in, checksum)), limiter, logVersion);
this.checksum = checksum;
}
@Override
public FSEditLogOp decodeOp() throws IOException {
limiter.setLimit(maxOpSize);
in.mark(maxOpSize);
// Reset the checksum. Since we are using a CheckedInputStream, each
// subsequent read from the stream will update the checksum.
checksum.reset();
byte opCodeByte;
try {
opCodeByte = in.readByte();
@ -4668,88 +4821,89 @@ public abstract class FSEditLogOp {
// EOF at an opcode boundary is expected.
return null;
}
FSEditLogOpCodes opCode = FSEditLogOpCodes.fromByte(opCodeByte);
if (opCode == OP_INVALID) {
verifyTerminator();
return null;
}
FSEditLogOp op = cache.get(opCode);
if (op == null) {
throw new IOException("Read invalid opcode " + opCode);
}
if (supportEditLogLength) {
in.readInt();
op.setTransactionId(in.readLong());
op.readFields(in, logVersion);
// Verify checksum
int calculatedChecksum = (int)checksum.getValue();
int expectedChecksum = in.readInt();
if (expectedChecksum != calculatedChecksum) {
throw new ChecksumException(
"Transaction is corrupt. Calculated checksum is " +
calculatedChecksum + " but read checksum " +
expectedChecksum, op.txid);
}
return op;
}
@Override
public long scanOp() throws IOException {
// Edit logs of this age don't have any length prefix, so we just have
// to read the entire Op.
FSEditLogOp op = decodeOp();
return op == null ?
HdfsServerConstants.INVALID_TXID : op.getTransactionId();
}
}
/**
* Read older edit logs which may or may not have transaction IDs and other
* features. This code is used during upgrades and to allow HDFS INotify to
* read older edit log files.
*/
private static class LegacyReader extends Reader {
LegacyReader(DataInputStream in,
StreamLimiter limiter, int logVersion) {
super(in, limiter, logVersion);
}
@Override
public FSEditLogOp decodeOp() throws IOException {
limiter.setLimit(maxOpSize);
in.mark(maxOpSize);
byte opCodeByte;
try {
opCodeByte = in.readByte();
} catch (EOFException eof) {
// EOF at an opcode boundary is expected.
return null;
}
FSEditLogOpCodes opCode = FSEditLogOpCodes.fromByte(opCodeByte);
if (opCode == OP_INVALID) {
verifyTerminator();
return null;
}
FSEditLogOp op = cache.get(opCode);
if (op == null) {
throw new IOException("Read invalid opcode " + opCode);
}
if (NameNodeLayoutVersion.supports(
LayoutVersion.Feature.STORED_TXIDS, logVersion)) {
// Read the txid
LayoutVersion.Feature.STORED_TXIDS, logVersion)) {
op.setTransactionId(in.readLong());
} else {
op.setTransactionId(HdfsServerConstants.INVALID_TXID);
}
op.readFields(in, logVersion);
validateChecksum(in, checksum, op.txid);
return op;
}
/**
* Similar with decodeOp(), but instead of doing the real decoding, we skip
* the content of the op if the length of the editlog is supported.
* @return the last txid of the segment, or INVALID_TXID on exception
*/
@Override
public long scanOp() throws IOException {
if (supportEditLogLength) {
limiter.setLimit(maxOpSize);
in.mark(maxOpSize);
final byte opCodeByte;
try {
opCodeByte = in.readByte(); // op code
} catch (EOFException e) {
return HdfsServerConstants.INVALID_TXID;
}
FSEditLogOpCodes opCode = FSEditLogOpCodes.fromByte(opCodeByte);
if (opCode == OP_INVALID) {
verifyTerminator();
return HdfsServerConstants.INVALID_TXID;
}
int length = in.readInt(); // read the length of the op
long txid = in.readLong(); // read the txid
// skip the remaining content
IOUtils.skipFully(in, length - 8);
// TODO: do we want to verify checksum for JN? For now we don't.
return txid;
} else {
FSEditLogOp op = decodeOp();
return op == null ? HdfsServerConstants.INVALID_TXID : op.getTransactionId();
}
}
/**
* Validate a transaction's checksum
*/
private void validateChecksum(DataInputStream in,
Checksum checksum,
long txid)
throws IOException {
if (checksum != null) {
int calculatedChecksum = (int)checksum.getValue();
int readChecksum = in.readInt(); // read in checksum
if (readChecksum != calculatedChecksum) {
throw new ChecksumException(
"Transaction is corrupt. Calculated checksum is " +
calculatedChecksum + " but read checksum " + readChecksum, txid);
}
if (!NameNodeLayoutVersion.supports(
LayoutVersion.Feature.STORED_TXIDS, logVersion)) {
throw new IOException("Can't scan a pre-transactional edit log.");
}
FSEditLogOp op = decodeOp();
return op == null ?
HdfsServerConstants.INVALID_TXID : op.getTransactionId();
}
}

View File

@ -875,7 +875,7 @@ public class TestEditLog {
tracker = new FSEditLogLoader.PositionTrackingInputStream(in);
in = new DataInputStream(tracker);
reader = new FSEditLogOp.Reader(in, tracker, version);
reader = FSEditLogOp.Reader.create(in, tracker, version);
}
@Override

View File

@ -25,19 +25,35 @@ import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.EnumMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.util.Holder;
import org.apache.hadoop.hdfs.web.URLConnectionFactory;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.PathUtils;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
public class TestEditLogFileInputStream {
private static final Log LOG =
LogFactory.getLog(TestEditLogFileInputStream.class);
private static final byte[] FAKE_LOG_DATA = TestEditLog.HADOOP20_SOME_EDITS;
private final static File TEST_DIR = PathUtils
.getTestDir(TestEditLogFileInputStream.class);
@Test
public void testReadURL() throws Exception {
HttpURLConnection conn = mock(HttpURLConnection.class);
@ -63,4 +79,68 @@ public class TestEditLogFileInputStream {
assertEquals(FAKE_LOG_DATA.length, elis.length());
elis.close();
}
/**
* Regression test for HDFS-8965 which verifies that
* FSEditLogFileInputStream#scanOp verifies Op checksums.
*/
@Test(timeout=60000)
public void testScanCorruptEditLog() throws Exception {
Configuration conf = new Configuration();
File editLog = new File(System.getProperty(
"test.build.data", "/tmp"), "testCorruptEditLog");
LOG.debug("Creating test edit log file: " + editLog);
EditLogFileOutputStream elos = new EditLogFileOutputStream(conf,
editLog.getAbsoluteFile(), 8192);
elos.create(NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
FSEditLogOp.OpInstanceCache cache = new FSEditLogOp.OpInstanceCache();
FSEditLogOp.MkdirOp mkdirOp = FSEditLogOp.MkdirOp.getInstance(cache);
mkdirOp.reset();
mkdirOp.setRpcCallId(123);
mkdirOp.setTransactionId(1);
mkdirOp.setInodeId(789L);
mkdirOp.setPath("/mydir");
PermissionStatus perms = PermissionStatus.createImmutable(
"myuser", "mygroup", FsPermission.createImmutable((short)0777));
mkdirOp.setPermissionStatus(perms);
elos.write(mkdirOp);
mkdirOp.reset();
mkdirOp.setRpcCallId(456);
mkdirOp.setTransactionId(2);
mkdirOp.setInodeId(123L);
mkdirOp.setPath("/mydir2");
perms = PermissionStatus.createImmutable(
"myuser", "mygroup", FsPermission.createImmutable((short)0666));
mkdirOp.setPermissionStatus(perms);
elos.write(mkdirOp);
elos.setReadyToFlush();
elos.flushAndSync(false);
elos.close();
long fileLen = editLog.length();
LOG.debug("Corrupting last 4 bytes of edit log file " + editLog +
", whose length is " + fileLen);
RandomAccessFile rwf = new RandomAccessFile(editLog, "rw");
rwf.seek(fileLen - 4);
int b = rwf.readInt();
rwf.seek(fileLen - 4);
rwf.writeInt(b + 1);
rwf.close();
EditLogFileInputStream elis = new EditLogFileInputStream(editLog);
Assert.assertEquals(NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION,
elis.getVersion(true));
Assert.assertEquals(1, elis.scanNextOp());
LOG.debug("Read transaction 1 from " + editLog);
try {
elis.scanNextOp();
Assert.fail("Expected scanNextOp to fail when op checksum was corrupt.");
} catch (IOException e) {
LOG.debug("Caught expected checksum error when reading corrupt " +
"transaction 2", e);
GenericTestUtils.assertExceptionContains("Transaction is corrupt.", e);
}
elis.close();
}
}