HDFS-8964. When validating the edit log, do not read at or beyond the file offset that is being written (Zhe Zhang via Colin P. McCabe)

(cherry picked from commit 53c38cc89a)
This commit is contained in:
Colin Patrick Mccabe 2015-09-03 11:22:47 -07:00
parent e1842ce678
commit 9310745d22
11 changed files with 199 additions and 27 deletions

View File

@ -945,6 +945,9 @@ Release 2.8.0 - UNRELEASED
HDFS-9009. Send metrics logs to NullAppender by default. (Arpit Agarwal) HDFS-9009. Send metrics logs to NullAppender by default. (Arpit Agarwal)
HDFS-8964. When validating the edit log, do not read at or beyond the file
offset that is being written (Zhe Zhang via Colin P. McCabe)
Release 2.7.2 - UNRELEASED Release 2.7.2 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -33,7 +33,8 @@ public class FSEditLogTestUtil {
public static long countTransactionsInStream(EditLogInputStream in) public static long countTransactionsInStream(EditLogInputStream in)
throws IOException { throws IOException {
FSEditLogLoader.EditLogValidation validation = FSEditLogLoader.validateEditLog(in); FSEditLogLoader.EditLogValidation validation =
FSEditLogLoader.validateEditLog(in, Long.MAX_VALUE);
return (validation.getEndTxId() - in.getFirstTxId()) + 1; return (validation.getEndTxId() - in.getFirstTxId()) + 1;
} }
} }

View File

@ -151,7 +151,7 @@ public class Journal implements Closeable {
EditLogFile latest = scanStorageForLatestEdits(); EditLogFile latest = scanStorageForLatestEdits();
if (latest != null) { if (latest != null) {
highestWrittenTxId = latest.getLastTxId(); updateHighestWrittenTxId(latest.getLastTxId());
} }
} }
@ -266,7 +266,17 @@ public class Journal implements Closeable {
synchronized long getHighestWrittenTxId() { synchronized long getHighestWrittenTxId() {
return highestWrittenTxId; return highestWrittenTxId;
} }
/**
* Update the highest Tx ID that has been written to the journal. Also update
* the {@link FileJournalManager#lastReadableTxId} of the underlying fjm.
* @param val The new value
*/
private void updateHighestWrittenTxId(long val) {
highestWrittenTxId = val;
fjm.setLastReadableTxId(val);
}
@VisibleForTesting @VisibleForTesting
JournalMetrics getMetricsForTests() { JournalMetrics getMetricsForTests() {
return metrics; return metrics;
@ -399,7 +409,7 @@ public class Journal implements Closeable {
metrics.bytesWritten.incr(records.length); metrics.bytesWritten.incr(records.length);
metrics.txnsWritten.incr(numTxns); metrics.txnsWritten.incr(numTxns);
highestWrittenTxId = lastTxnId; updateHighestWrittenTxId(lastTxnId);
nextTxId = lastTxnId + 1; nextTxId = lastTxnId + 1;
} }
@ -782,8 +792,8 @@ public class Journal implements Closeable {
": no current segment in place"); ": no current segment in place");
// Update the highest txid for lag metrics // Update the highest txid for lag metrics
highestWrittenTxId = Math.max(segment.getEndTxId(), updateHighestWrittenTxId(Math.max(segment.getEndTxId(),
highestWrittenTxId); highestWrittenTxId));
} else { } else {
LOG.info("Synchronizing log " + TextFormat.shortDebugString(segment) + LOG.info("Synchronizing log " + TextFormat.shortDebugString(segment) +
": old segment " + TextFormat.shortDebugString(currentSegment) + ": old segment " + TextFormat.shortDebugString(currentSegment) +
@ -812,7 +822,7 @@ public class Journal implements Closeable {
// If we're shortening the log, update our highest txid // If we're shortening the log, update our highest txid
// used for lag metrics. // used for lag metrics.
if (txnRange(currentSegment).containsLong(highestWrittenTxId)) { if (txnRange(currentSegment).containsLong(highestWrittenTxId)) {
highestWrittenTxId = segment.getEndTxId(); updateHighestWrittenTxId(segment.getEndTxId());
} }
} }
syncedFile = syncLog(reqInfo, segment, fromUrl); syncedFile = syncLog(reqInfo, segment, fromUrl);

View File

@ -301,8 +301,17 @@ public class EditLogFileInputStream extends EditLogInputStream {
return getName(); return getName();
} }
static FSEditLogLoader.EditLogValidation validateEditLog(File file) /**
throws IOException { * @param file File being validated.
* @param maxTxIdToValidate Maximum Tx ID to try to validate. Validation
* returns after reading this or a higher ID.
* The file portion beyond this ID is potentially
* being updated.
* @return Result of the validation
* @throws IOException
*/
static FSEditLogLoader.EditLogValidation validateEditLog(File file,
long maxTxIdToValidate) throws IOException {
EditLogFileInputStream in; EditLogFileInputStream in;
try { try {
in = new EditLogFileInputStream(file); in = new EditLogFileInputStream(file);
@ -315,7 +324,7 @@ public class EditLogFileInputStream extends EditLogInputStream {
} }
try { try {
return FSEditLogLoader.validateEditLog(in); return FSEditLogLoader.validateEditLog(in, maxTxIdToValidate);
} finally { } finally {
IOUtils.closeStream(in); IOUtils.closeStream(in);
} }

View File

@ -675,6 +675,16 @@ public class FSEditLog implements LogsPurgeable {
synchronized (this) { synchronized (this) {
if (sync) { if (sync) {
synctxid = syncStart; synctxid = syncStart;
for (JournalManager jm : journalSet.getJournalManagers()) {
/**
* {@link FileJournalManager#lastReadableTxId} is only meaningful
* for file-based journals. Therefore the interface is not added to
* other types of {@link JournalManager}.
*/
if (jm instanceof FileJournalManager) {
((FileJournalManager)jm).setLastReadableTxId(syncStart);
}
}
isSyncRunning = false; isSyncRunning = false;
} }
this.notifyAll(); this.notifyAll();

View File

@ -1115,8 +1115,14 @@ public class FSEditLogLoader {
* If there are invalid or corrupt transactions in the middle of the stream, * If there are invalid or corrupt transactions in the middle of the stream,
* validateEditLog will skip over them. * validateEditLog will skip over them.
* This reads through the stream but does not close it. * This reads through the stream but does not close it.
*
* @param maxTxIdToValidate Maximum Tx ID to try to validate. Validation
* returns after reading this or a higher ID.
* The file portion beyond this ID is potentially
* being updated.
*/ */
static EditLogValidation validateEditLog(EditLogInputStream in) { static EditLogValidation validateEditLog(EditLogInputStream in,
long maxTxIdToValidate) {
long lastPos = 0; long lastPos = 0;
long lastTxId = HdfsServerConstants.INVALID_TXID; long lastTxId = HdfsServerConstants.INVALID_TXID;
long numValid = 0; long numValid = 0;
@ -1139,6 +1145,10 @@ public class FSEditLogLoader {
|| op.getTransactionId() > lastTxId) { || op.getTransactionId() > lastTxId) {
lastTxId = op.getTransactionId(); lastTxId = op.getTransactionId();
} }
if (lastTxId >= maxTxIdToValidate) {
break;
}
numValid++; numValid++;
} }
return new EditLogValidation(lastPos, lastTxId, false); return new EditLogValidation(lastPos, lastTxId, false);

View File

@ -77,6 +77,15 @@ public class FileJournalManager implements JournalManager {
private File currentInProgress = null; private File currentInProgress = null;
/**
* A FileJournalManager should maintain the largest Tx ID that has been
* safely written to its edit log files.
* It should limit readers to read beyond this ID to avoid potential race
* with ongoing writers.
* Initial value indicates that all transactions can be read.
*/
private long lastReadableTxId = Long.MAX_VALUE;
@VisibleForTesting @VisibleForTesting
StoragePurger purger StoragePurger purger
= new NNStorageRetentionManager.DeletionStoragePurger(); = new NNStorageRetentionManager.DeletionStoragePurger();
@ -160,6 +169,15 @@ public class FileJournalManager implements JournalManager {
this.outputBufferCapacity = size; this.outputBufferCapacity = size;
} }
public long getLastReadableTxId() {
return lastReadableTxId;
}
public void setLastReadableTxId(long id) {
this.lastReadableTxId = id;
}
@Override @Override
public void purgeLogsOlderThan(long minTxIdToKeep) public void purgeLogsOlderThan(long minTxIdToKeep)
throws IOException { throws IOException {
@ -194,7 +212,7 @@ public class FileJournalManager implements JournalManager {
} }
if (elf.isInProgress()) { if (elf.isInProgress()) {
try { try {
elf.validateLog(); elf.validateLog(getLastReadableTxId());
} catch (IOException e) { } catch (IOException e) {
LOG.error("got IOException while trying to validate header of " + LOG.error("got IOException while trying to validate header of " +
elf + ". Skipping.", e); elf + ". Skipping.", e);
@ -326,11 +344,13 @@ public class FileJournalManager implements JournalManager {
(inProgressOk ? " (inProgress ok) " : " (excluding inProgress) ") + (inProgressOk ? " (inProgress ok) " : " (excluding inProgress) ") +
"from among " + elfs.size() + " candidate file(s)"); "from among " + elfs.size() + " candidate file(s)");
} }
addStreamsToCollectionFromFiles(elfs, streams, fromTxId, inProgressOk); addStreamsToCollectionFromFiles(elfs, streams, fromTxId,
getLastReadableTxId(), inProgressOk);
} }
static void addStreamsToCollectionFromFiles(Collection<EditLogFile> elfs, static void addStreamsToCollectionFromFiles(Collection<EditLogFile> elfs,
Collection<EditLogInputStream> streams, long fromTxId, boolean inProgressOk) { Collection<EditLogInputStream> streams, long fromTxId, long maxTxIdToValidate,
boolean inProgressOk) {
for (EditLogFile elf : elfs) { for (EditLogFile elf : elfs) {
if (elf.isInProgress()) { if (elf.isInProgress()) {
if (!inProgressOk) { if (!inProgressOk) {
@ -341,7 +361,7 @@ public class FileJournalManager implements JournalManager {
continue; continue;
} }
try { try {
elf.validateLog(); elf.validateLog(maxTxIdToValidate);
} catch (IOException e) { } catch (IOException e) {
LOG.error("got IOException while trying to validate header of " + LOG.error("got IOException while trying to validate header of " +
elf + ". Skipping.", e); elf + ". Skipping.", e);
@ -385,7 +405,7 @@ public class FileJournalManager implements JournalManager {
continue; continue;
} }
elf.validateLog(); elf.validateLog(getLastReadableTxId());
if (elf.hasCorruptHeader()) { if (elf.hasCorruptHeader()) {
elf.moveAsideCorruptFile(); elf.moveAsideCorruptFile();
@ -517,9 +537,14 @@ public class FileJournalManager implements JournalManager {
* Find out where the edit log ends. * Find out where the edit log ends.
* This will update the lastTxId of the EditLogFile or * This will update the lastTxId of the EditLogFile or
* mark it as corrupt if it is. * mark it as corrupt if it is.
* @param maxTxIdToValidate Maximum Tx ID to try to validate. Validation
* returns after reading this or a higher ID.
* The file portion beyond this ID is potentially
* being updated.
*/ */
public void validateLog() throws IOException { public void validateLog(long maxTxIdToValidate) throws IOException {
EditLogValidation val = EditLogFileInputStream.validateEditLog(file); EditLogValidation val = EditLogFileInputStream.validateEditLog(file,
maxTxIdToValidate);
this.lastTxId = val.getEndTxId(); this.lastTxId = val.getEndTxId();
this.hasCorruptHeader = val.hasCorruptHeader(); this.hasCorruptHeader = val.hasCorruptHeader();
} }

View File

@ -907,7 +907,7 @@ public class SecondaryNameNode implements Runnable,
throw new RuntimeException(ioe); throw new RuntimeException(ioe);
} }
FileJournalManager.addStreamsToCollectionFromFiles(editFiles, streams, FileJournalManager.addStreamsToCollectionFromFiles(editFiles, streams,
fromTxId, inProgressOk); fromTxId, Long.MAX_VALUE, inProgressOk);
} }
} }

View File

@ -88,7 +88,7 @@ public class TestCheckPointForSecurityTokens {
for (StorageDirectory sd : nn.getFSImage().getStorage().dirIterable(null)) { for (StorageDirectory sd : nn.getFSImage().getStorage().dirIterable(null)) {
EditLogFile log = FSImageTestUtil.findLatestEditsLog(sd); EditLogFile log = FSImageTestUtil.findLatestEditsLog(sd);
assertTrue(log.isInProgress()); assertTrue(log.isInProgress());
log.validateLog(); log.validateLog(Long.MAX_VALUE);
long numTransactions = (log.getLastTxId() - log.getFirstTxId()) + 1; long numTransactions = (log.getLastTxId() - log.getFirstTxId()) + 1;
assertEquals("In-progress log " + log + " should have 5 transactions", assertEquals("In-progress log " + log + " should have 5 transactions",
5, numTransactions);; 5, numTransactions);;
@ -105,7 +105,7 @@ public class TestCheckPointForSecurityTokens {
for (StorageDirectory sd : nn.getFSImage().getStorage().dirIterable(null)) { for (StorageDirectory sd : nn.getFSImage().getStorage().dirIterable(null)) {
EditLogFile log = FSImageTestUtil.findLatestEditsLog(sd); EditLogFile log = FSImageTestUtil.findLatestEditsLog(sd);
assertTrue(log.isInProgress()); assertTrue(log.isInProgress());
log.validateLog(); log.validateLog(Long.MAX_VALUE);
long numTransactions = (log.getLastTxId() - log.getFirstTxId()) + 1; long numTransactions = (log.getLastTxId() - log.getFirstTxId()) + 1;
assertEquals("In-progress log " + log + " should only have START txn", assertEquals("In-progress log " + log + " should only have START txn",
1, numTransactions); 1, numTransactions);

View File

@ -66,6 +66,8 @@ import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus; import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSInotifyEventInputStream;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
@ -83,6 +85,9 @@ import org.apache.hadoop.test.PathUtils;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
import org.apache.log4j.Level; import org.apache.log4j.Level;
import org.apache.log4j.AppenderSkeleton;
import org.apache.log4j.LogManager;
import org.apache.log4j.spi.LoggingEvent;
import org.junit.Test; import org.junit.Test;
import org.mockito.Mockito; import org.mockito.Mockito;
import org.xml.sax.ContentHandler; import org.xml.sax.ContentHandler;
@ -1223,7 +1228,8 @@ public class TestEditLog {
TXNS_PER_ROLL*11); TXNS_PER_ROLL*11);
for (EditLogInputStream edits : editStreams) { for (EditLogInputStream edits : editStreams) {
FSEditLogLoader.EditLogValidation val = FSEditLogLoader.validateEditLog(edits); FSEditLogLoader.EditLogValidation val =
FSEditLogLoader.validateEditLog(edits, Long.MAX_VALUE);
long read = (val.getEndTxId() - edits.getFirstTxId()) + 1; long read = (val.getEndTxId() - edits.getFirstTxId()) + 1;
LOG.info("Loading edits " + edits + " read " + read); LOG.info("Loading edits " + edits + " read " + read);
assertEquals(startTxId, edits.getFirstTxId()); assertEquals(startTxId, edits.getFirstTxId());
@ -1571,4 +1577,99 @@ public class TestEditLog {
} }
} }
} }
class TestAppender extends AppenderSkeleton {
private final List<LoggingEvent> log = new ArrayList<>();
@Override
public boolean requiresLayout() {
return false;
}
@Override
protected void append(final LoggingEvent loggingEvent) {
log.add(loggingEvent);
}
@Override
public void close() {
}
public List<LoggingEvent> getLog() {
return new ArrayList<>(log);
}
}
/**
*
* @throws Exception
*/
@Test
public void testReadActivelyUpdatedLog() throws Exception {
final TestAppender appender = new TestAppender();
LogManager.getRootLogger().addAppender(appender);
Configuration conf = new HdfsConfiguration();
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, true);
// Set single handler thread, so all transactions hit same thread-local ops.
conf.setInt(DFSConfigKeys.DFS_NAMENODE_HANDLER_COUNT_KEY, 1);
MiniDFSCluster cluster = null;
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
FSImage fsimage = cluster.getNamesystem().getFSImage();
StorageDirectory sd = fsimage.getStorage().getStorageDir(0);
final DistributedFileSystem fileSys = cluster.getFileSystem();
DFSInotifyEventInputStream events = fileSys.getInotifyEventStream();
fileSys.mkdirs(new Path("/test"));
fileSys.mkdirs(new Path("/test/dir1"));
fileSys.delete(new Path("/test/dir1"), true);
fsimage.getEditLog().logSync();
fileSys.mkdirs(new Path("/test/dir2"));
final File inProgressEdit = NNStorage.getInProgressEditsFile(sd, 1);
assertTrue(inProgressEdit.exists());
EditLogFileInputStream elis = new EditLogFileInputStream(inProgressEdit);
FSEditLogOp op;
long pos = 0;
while (true) {
op = elis.readOp();
if (op != null && op.opCode != FSEditLogOpCodes.OP_INVALID) {
pos = elis.getPosition();
} else {
break;
}
}
elis.close();
assertTrue(pos > 0);
RandomAccessFile rwf = new RandomAccessFile(inProgressEdit, "rw");
rwf.seek(pos);
assertEquals(rwf.readByte(), (byte) -1);
rwf.seek(pos + 1);
rwf.writeByte(2);
rwf.close();
events.poll();
String pattern = "Caught exception after reading (.*) ops";
Pattern r = Pattern.compile(pattern);
final List<LoggingEvent> log = appender.getLog();
for (LoggingEvent event : log) {
Matcher m = r.matcher(event.getRenderedMessage());
if (m.find()) {
fail("Should not try to read past latest syned edit log op");
}
}
} finally {
if (cluster != null) {
cluster.shutdown();
}
LogManager.getRootLogger().removeAppender(appender);
}
}
} }

View File

@ -318,7 +318,8 @@ public class TestFSEditLogLoader {
} finally { } finally {
rwf.close(); rwf.close();
} }
EditLogValidation validation = EditLogFileInputStream.validateEditLog(logFile); EditLogValidation validation =
EditLogFileInputStream.validateEditLog(logFile, Long.MAX_VALUE);
assertTrue(validation.hasCorruptHeader()); assertTrue(validation.hasCorruptHeader());
} }
@ -333,7 +334,7 @@ public class TestFSEditLogLoader {
File logFileBak = new File(testDir, logFile.getName() + ".bak"); File logFileBak = new File(testDir, logFile.getName() + ".bak");
Files.copy(logFile, logFileBak); Files.copy(logFile, logFileBak);
EditLogValidation validation = EditLogValidation validation =
EditLogFileInputStream.validateEditLog(logFile); EditLogFileInputStream.validateEditLog(logFile, Long.MAX_VALUE);
assertTrue(!validation.hasCorruptHeader()); assertTrue(!validation.hasCorruptHeader());
// We expect that there will be an OP_START_LOG_SEGMENT, followed by // We expect that there will be an OP_START_LOG_SEGMENT, followed by
// NUM_TXNS opcodes, followed by an OP_END_LOG_SEGMENT. // NUM_TXNS opcodes, followed by an OP_END_LOG_SEGMENT.
@ -346,7 +347,8 @@ public class TestFSEditLogLoader {
// Restore backup, corrupt the txn opcode // Restore backup, corrupt the txn opcode
Files.copy(logFileBak, logFile); Files.copy(logFileBak, logFile);
corruptByteInFile(logFile, txOffset); corruptByteInFile(logFile, txOffset);
validation = EditLogFileInputStream.validateEditLog(logFile); validation = EditLogFileInputStream.validateEditLog(logFile,
Long.MAX_VALUE);
long expectedEndTxId = (txId == (NUM_TXNS + 1)) ? long expectedEndTxId = (txId == (NUM_TXNS + 1)) ?
NUM_TXNS : (NUM_TXNS + 1); NUM_TXNS : (NUM_TXNS + 1);
assertEquals("Failed when corrupting txn opcode at " + txOffset, assertEquals("Failed when corrupting txn opcode at " + txOffset,
@ -363,7 +365,8 @@ public class TestFSEditLogLoader {
// Restore backup, corrupt the txn opcode // Restore backup, corrupt the txn opcode
Files.copy(logFileBak, logFile); Files.copy(logFileBak, logFile);
truncateFile(logFile, txOffset); truncateFile(logFile, txOffset);
validation = EditLogFileInputStream.validateEditLog(logFile); validation = EditLogFileInputStream.validateEditLog(logFile,
Long.MAX_VALUE);
long expectedEndTxId = (txId == 0) ? long expectedEndTxId = (txId == 0) ?
HdfsServerConstants.INVALID_TXID : (txId - 1); HdfsServerConstants.INVALID_TXID : (txId - 1);
assertEquals("Failed when corrupting txid " + txId + " txn opcode " + assertEquals("Failed when corrupting txid " + txId + " txn opcode " +
@ -381,7 +384,7 @@ public class TestFSEditLogLoader {
// layout flags section. // layout flags section.
truncateFile(logFile, 8); truncateFile(logFile, 8);
EditLogValidation validation = EditLogValidation validation =
EditLogFileInputStream.validateEditLog(logFile); EditLogFileInputStream.validateEditLog(logFile, Long.MAX_VALUE);
assertTrue(!validation.hasCorruptHeader()); assertTrue(!validation.hasCorruptHeader());
assertEquals(HdfsServerConstants.INVALID_TXID, validation.getEndTxId()); assertEquals(HdfsServerConstants.INVALID_TXID, validation.getEndTxId());
} }