HDFS-3510. Editlog pre-allocation is performed prior to writing edits to avoid partial edits case disk out of space. Contributed by Collin McCabe.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1355189 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Suresh Srinivas 2012-06-28 23:00:10 +00:00
parent 1d54e2b331
commit 19dd66a3f6
6 changed files with 215 additions and 159 deletions

View File

@ -99,6 +99,9 @@ Trunk (unreleased changes)
HDFS-3571. Allow EditLogFileInputStream to read from a remote URL (todd)
HDFS-3510. Editlog pre-allocation is performed prior to writing edits
to avoid partial edits case disk out of space.(Collin McCabe via suresh)
OPTIMIZATIONS
BUG FIXES

View File

@ -41,13 +41,13 @@
@InterfaceAudience.Private
public class EditLogFileOutputStream extends EditLogOutputStream {
private static Log LOG = LogFactory.getLog(EditLogFileOutputStream.class);
public static final int PREALLOCATION_LENGTH = 1024 * 1024;
public static final int MIN_PREALLOCATION_LENGTH = 1024 * 1024;
private File file;
private FileOutputStream fp; // file stream for storing edit logs
private FileChannel fc; // channel of the file stream for sync
private EditsDoubleBuffer doubleBuf;
static ByteBuffer fill = ByteBuffer.allocateDirect(PREALLOCATION_LENGTH);
static ByteBuffer fill = ByteBuffer.allocateDirect(MIN_PREALLOCATION_LENGTH);
static {
fill.position(0);
@ -132,7 +132,7 @@ public void close() throws IOException {
doubleBuf = null;
}
// remove the last INVALID marker from transaction log.
// remove any preallocated padding bytes from the transaction log.
if (fc != null && fc.isOpen()) {
fc.truncate(fc.position());
fc.close();
@ -166,7 +166,6 @@ public void abort() throws IOException {
*/
@Override
public void setReadyToFlush() throws IOException {
doubleBuf.getCurrentBuf().write(FSEditLogOpCodes.OP_INVALID.getOpCode()); // insert eof marker
doubleBuf.setReadyToFlush();
}
@ -183,10 +182,9 @@ public void flushAndSync() throws IOException {
LOG.info("Nothing to flush");
return;
}
preallocate(); // preallocate file if necessay
doubleBuf.flushTo(fp);
fc.force(false); // metadata updates not needed
fc.position(fc.position() - 1); // skip back the end-of-file marker
preallocate(); // preallocate file if necessary
}
/**
@ -197,20 +195,27 @@ public boolean shouldForceSync() {
return doubleBuf.shouldForceSync();
}
// allocate a big chunk of data
private void preallocate() throws IOException {
long position = fc.position();
if (position + 4096 >= fc.size()) {
if(FSNamesystem.LOG.isDebugEnabled()) {
FSNamesystem.LOG.debug("Preallocating Edit log, current size "
+ fc.size());
}
long size = fc.size();
int bufSize = doubleBuf.getReadyBuf().getLength();
long need = bufSize - (size - position);
if (need <= 0) {
return;
}
long oldSize = size;
long total = 0;
long fillCapacity = fill.capacity();
while (need > 0) {
fill.position(0);
IOUtils.writeFully(fc, fill, position);
if(FSNamesystem.LOG.isDebugEnabled()) {
FSNamesystem.LOG.debug("Edit log size is now " + fc.size() +
" written " + fill.capacity() + " bytes " + " at offset " + position);
}
IOUtils.writeFully(fc, fill, size);
need -= fillCapacity;
size += fillCapacity;
total += fillCapacity;
}
if(FSNamesystem.LOG.isDebugEnabled()) {
FSNamesystem.LOG.debug("Preallocated " + total + " bytes at the end of " +
"the edit log (offset " + oldSize + ")");
}
}

View File

@ -89,6 +89,10 @@ boolean shouldForceSync() {
return bufCurrent.size() >= initBufferSize;
}
DataOutputBuffer getReadyBuf() {
return bufReady;
}
DataOutputBuffer getCurrentBuf() {
return bufCurrent;
}

View File

@ -2278,25 +2278,7 @@ public Reader(DataInputStream in, StreamLimiter limiter, int logVersion) {
public FSEditLogOp readOp(boolean skipBrokenEdits) throws IOException {
while (true) {
try {
limiter.setLimit(MAX_OP_SIZE);
in.mark(MAX_OP_SIZE);
return decodeOp();
} catch (GarbageAfterTerminatorException e) {
in.reset();
if (!skipBrokenEdits) {
throw e;
}
// If we saw a terminator opcode followed by a long region of 0x00 or
// 0xff, we want to skip over that region, because there's nothing
// interesting there.
long numSkip = e.getNumAfterTerminator();
try {
IOUtils.skipFully(in, numSkip);
} catch (Throwable t) {
FSImage.LOG.error("Failed to skip " + numSkip + " bytes of " +
"garbage after an OP_INVALID.", t);
return null;
}
} catch (IOException e) {
in.reset();
if (!skipBrokenEdits) {
@ -2325,7 +2307,6 @@ public FSEditLogOp readOp(boolean skipBrokenEdits) throws IOException {
}
private void verifyTerminator() throws IOException {
long off = 0;
/** The end of the edit log should contain only 0x00 or 0xff bytes.
* If it contains other bytes, the log itself may be corrupt.
* It is important to check this; if we don't, a stray OP_INVALID byte
@ -2333,21 +2314,49 @@ private void verifyTerminator() throws IOException {
* know that we had lost data.
*/
byte[] buf = new byte[4096];
limiter.clearLimit();
int numRead = -1, idx = 0;
while (true) {
int numRead = in.read(buf);
if (numRead == -1) {
return;
}
for (int i = 0; i < numRead; i++, off++) {
if ((buf[i] != (byte)0) && (buf[i] != (byte)-1)) {
throw new GarbageAfterTerminatorException("Read garbage after " +
"the terminator!", off);
try {
numRead = -1;
idx = 0;
numRead = in.read(buf);
if (numRead == -1) {
return;
}
while (idx < numRead) {
if ((buf[idx] != (byte)0) && (buf[idx] != (byte)-1)) {
throw new IOException("Read extra bytes after " +
"the terminator!");
}
idx++;
}
} finally {
// After reading each group of bytes, we reposition the mark one
// byte before the next group. Similarly, if there is an error, we
// want to reposition the mark one byte before the error
if (numRead != -1) {
in.reset();
IOUtils.skipFully(in, idx);
in.mark(buf.length + 1);
IOUtils.skipFully(in, 1);
}
}
}
}
/**
* Read an opcode from the input stream.
*
* @return the opcode, or null on EOF.
*
* If an exception is thrown, the stream's mark will be set to the first
* problematic byte. This usually means the beginning of the opcode.
*/
private FSEditLogOp decodeOp() throws IOException {
limiter.setLimit(MAX_OP_SIZE);
in.mark(MAX_OP_SIZE);
if (checksum != null) {
checksum.reset();
}
@ -2534,35 +2543,4 @@ public static PermissionStatus permissionStatusFromXml(Stanza st)
short mode = Short.valueOf(st.getValue("MODE"));
return new PermissionStatus(username, groupname, new FsPermission(mode));
}
/**
* Exception indicating that we found an OP_INVALID followed by some
* garbage. An OP_INVALID should signify the end of the file... if there
* is additional content after that, then the edit log is corrupt.
*/
static class GarbageAfterTerminatorException extends IOException {
private static final long serialVersionUID = 1L;
private final long numAfterTerminator;
public GarbageAfterTerminatorException(String str,
long numAfterTerminator) {
super(str);
this.numAfterTerminator = numAfterTerminator;
}
/**
* Get the number of bytes after the terminator at which the garbage
* appeared.
*
* So if you had an OP_INVALID followed immediately by another valid opcode,
* this would be 0.
* If you had an OP_INVALID followed by some padding bytes, followed by a
* stray byte at the end, this would be the number of padding bytes.
*
* @return numAfterTerminator
*/
public long getNumAfterTerminator() {
return numAfterTerminator;
}
}
}

View File

@ -20,105 +20,74 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.File;
import java.io.IOException;
import java.nio.channels.FileChannel;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.DU;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogLoader.EditLogValidation;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
/**
* Test the EditLogFileOutputStream
*/
public class TestEditLogFileOutputStream {
private final static int HEADER_LEN = 17;
private final static File TEST_DIR =
new File(System.getProperty("test.build.data", "/tmp"));
private static final File TEST_EDITS =
new File(System.getProperty("test.build.data","/tmp"),
"editLogStream.dat");
new File(TEST_DIR, "testEditLogFileOutput.log");
final static int MIN_PREALLOCATION_LENGTH =
EditLogFileOutputStream.MIN_PREALLOCATION_LENGTH;
@Before
@After
public void deleteEditsFile() {
TEST_EDITS.delete();
if (TEST_EDITS.exists()) TEST_EDITS.delete();
}
@Test
public void testConstants() {
// Each call to FSEditLogOp#Reader#readOp can read at most MAX_OP_SIZE bytes
// before getting an exception. So we don't want to preallocate a longer
// region than MAX_OP_SIZE, because then we'd get an IOException when reading
// through the padding at the end of the file.
assertTrue(EditLogFileOutputStream.PREALLOCATION_LENGTH <
FSEditLogOp.MAX_OP_SIZE);
}
@Test
public void testPreallocation() throws IOException {
Configuration conf = new HdfsConfiguration();
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0)
.build();
final long START_TXID = 1;
StorageDirectory sd = cluster.getNameNode().getFSImage()
.getStorage().getStorageDir(0);
File editLog = NNStorage.getInProgressEditsFile(sd, START_TXID);
EditLogValidation validation = EditLogFileInputStream.validateEditLog(editLog);
assertEquals("Edit log should contain a header as valid length",
HEADER_LEN, validation.getValidLength());
assertEquals(validation.getEndTxId(), START_TXID);
assertEquals("Edit log should have 1MB pre-allocated, plus 4 bytes " +
"for the version number",
EditLogFileOutputStream.PREALLOCATION_LENGTH + 4, editLog.length());
cluster.getFileSystem().mkdirs(new Path("/tmp"),
new FsPermission((short)777));
long oldLength = validation.getValidLength();
validation = EditLogFileInputStream.validateEditLog(editLog);
assertTrue("Edit log should have more valid data after writing a txn " +
"(was: " + oldLength + " now: " + validation.getValidLength() + ")",
validation.getValidLength() > oldLength);
assertEquals(1, validation.getEndTxId() - START_TXID);
assertEquals("Edit log should be 1MB long, plus 4 bytes for the version number",
EditLogFileOutputStream.PREALLOCATION_LENGTH + 4, editLog.length());
// 256 blocks for the 1MB of preallocation space
assertTrue("Edit log disk space used should be at least 257 blocks",
256 * 4096 <= new DU(editLog, conf).getUsed());
static void flushAndCheckLength(EditLogFileOutputStream elos,
long expectedLength) throws IOException {
elos.setReadyToFlush();
elos.flushAndSync();
assertEquals(expectedLength, elos.getFile().length());
}
/**
* Tests writing to the EditLogFileOutputStream. Due to preallocation, the
* length of the edit log will usually be longer than its valid contents.
*/
@Test
public void testClose() throws IOException {
String errorMessage = "TESTING: fc.truncate() threw IOE";
File testDir = new File(System.getProperty("test.build.data", "/tmp"));
assertTrue("could not create test directory", testDir.exists() || testDir.mkdirs());
File f = new File(testDir, "edits");
assertTrue("could not create test file", f.createNewFile());
EditLogFileOutputStream elos = new EditLogFileOutputStream(f, 0);
FileChannel mockFc = Mockito.spy(elos.getFileChannelForTesting());
Mockito.doThrow(new IOException(errorMessage)).when(mockFc).truncate(Mockito.anyLong());
elos.setFileChannelForTesting(mockFc);
public void testRawWrites() throws IOException {
EditLogFileOutputStream elos = new EditLogFileOutputStream(TEST_EDITS, 0);
try {
elos.close();
fail("elos.close() succeeded, but should have thrown");
} catch (IOException e) {
assertEquals("wrong IOE thrown from elos.close()", e.getMessage(), errorMessage);
byte[] small = new byte[] {1,2,3,4,5,8,7};
elos.create();
// The first (small) write we make extends the file by 1 MB due to
// preallocation.
elos.writeRaw(small, 0, small.length);
flushAndCheckLength(elos, MIN_PREALLOCATION_LENGTH);
// The next small write we make goes into the area that was already
// preallocated.
elos.writeRaw(small, 0, small.length);
flushAndCheckLength(elos, MIN_PREALLOCATION_LENGTH);
// Now we write enough bytes so that we exceed the minimum preallocated
// length.
final int BIG_WRITE_LENGTH = 3 * MIN_PREALLOCATION_LENGTH;
byte[] buf = new byte[4096];
for (int i = 0; i < buf.length; i++) {
buf[i] = 0;
}
int total = BIG_WRITE_LENGTH;
while (total > 0) {
int toWrite = (total > buf.length) ? buf.length : total;
elos.writeRaw(buf, 0, toWrite);
total -= toWrite;
}
flushAndCheckLength(elos, 4 * MIN_PREALLOCATION_LENGTH);
} finally {
if (elos != null) elos.close();
}
assertEquals("fc was not nulled when elos.close() failed", elos.getFileChannelForTesting(), null);
}
/**
@ -164,5 +133,4 @@ public void testEditLogFileOutputStreamAbortAbort() throws IOException {
editLogStream.abort();
editLogStream.abort();
}
}

View File

@ -135,6 +135,9 @@ static void runEditLogTest(EditLogTestSetup elts) throws IOException {
}
}
/**
* A test scenario for the edit log
*/
private interface EditLogTestSetup {
/**
* Set up the edit log.
@ -156,10 +159,50 @@ abstract public void addTransactionsToLog(EditLogOutputStream elos,
abstract public Set<Long> getValidTxIds();
}
private class EltsTestEmptyLog implements EditLogTestSetup {
static void padEditLog(EditLogOutputStream elos, int paddingLength)
throws IOException {
if (paddingLength <= 0) {
return;
}
byte buf[] = new byte[4096];
for (int i = 0; i < buf.length; i++) {
buf[i] = (byte)-1;
}
int pad = paddingLength;
while (pad > 0) {
int toWrite = pad > buf.length ? buf.length : pad;
elos.writeRaw(buf, 0, toWrite);
pad -= toWrite;
}
}
static void addDeleteOpcode(EditLogOutputStream elos,
OpInstanceCache cache) throws IOException {
DeleteOp op = DeleteOp.getInstance(cache);
op.setTransactionId(0x0);
op.setPath("/foo");
op.setTimestamp(0);
elos.write(op);
}
/**
* Test the scenario where we have an empty edit log.
*
* This class is also useful in testing whether we can correctly handle
* various amounts of padding bytes at the end of the log. We should be
* able to handle any amount of padding (including no padding) without
* throwing an exception.
*/
private static class EltsTestEmptyLog implements EditLogTestSetup {
private int paddingLength;
public EltsTestEmptyLog(int paddingLength) {
this.paddingLength = paddingLength;
}
public void addTransactionsToLog(EditLogOutputStream elos,
OpInstanceCache cache) throws IOException {
// do nothing
padEditLog(elos, paddingLength);
}
public long getLastValidTxId() {
@ -174,10 +217,65 @@ public Set<Long> getValidTxIds() {
/** Test an empty edit log */
@Test(timeout=180000)
public void testEmptyLog() throws IOException {
runEditLogTest(new EltsTestEmptyLog());
runEditLogTest(new EltsTestEmptyLog(0));
}
/** Test an empty edit log with padding */
@Test(timeout=180000)
public void testEmptyPaddedLog() throws IOException {
runEditLogTest(new EltsTestEmptyLog(
EditLogFileOutputStream.MIN_PREALLOCATION_LENGTH));
}
private class EltsTestGarbageInEditLog implements EditLogTestSetup {
/** Test an empty edit log with extra-long padding */
@Test(timeout=180000)
public void testEmptyExtraPaddedLog() throws IOException {
runEditLogTest(new EltsTestEmptyLog(
3 * EditLogFileOutputStream.MIN_PREALLOCATION_LENGTH));
}
/**
* Test the scenario where an edit log contains some padding (0xff) bytes
* followed by valid opcode data.
*
* These edit logs are corrupt, but all the opcodes should be recoverable
* with recovery mode.
*/
private static class EltsTestOpcodesAfterPadding implements EditLogTestSetup {
private int paddingLength;
public EltsTestOpcodesAfterPadding(int paddingLength) {
this.paddingLength = paddingLength;
}
public void addTransactionsToLog(EditLogOutputStream elos,
OpInstanceCache cache) throws IOException {
padEditLog(elos, paddingLength);
addDeleteOpcode(elos, cache);
}
public long getLastValidTxId() {
return 0;
}
public Set<Long> getValidTxIds() {
return Sets.newHashSet(0L);
}
}
@Test(timeout=180000)
public void testOpcodesAfterPadding() throws IOException {
runEditLogTest(new EltsTestOpcodesAfterPadding(
EditLogFileOutputStream.MIN_PREALLOCATION_LENGTH));
}
@Test(timeout=180000)
public void testOpcodesAfterExtraPadding() throws IOException {
runEditLogTest(new EltsTestOpcodesAfterPadding(
3 * EditLogFileOutputStream.MIN_PREALLOCATION_LENGTH));
}
private static class EltsTestGarbageInEditLog implements EditLogTestSetup {
final private long BAD_TXID = 4;
final private long MAX_TXID = 10;