HDFS-3510. Editlog pre-allocation is performed prior to writing edits to avoid partial edits case disk out of space. Contributed by Colin Patrick McCabe.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1383098 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
7296dee763
commit
ee6f83bc89
|
@ -267,6 +267,9 @@ Release 2.0.2-alpha - 2012-09-07
|
||||||
|
|
||||||
HDFS-3907. Allow multiple users for local block readers. (eli)
|
HDFS-3907. Allow multiple users for local block readers. (eli)
|
||||||
|
|
||||||
|
HDFS-3510. Editlog pre-allocation is performed prior to writing edits
|
||||||
|
to avoid partial edits case disk out of space. (Colin McCabe via todd)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
HDFS-2982. Startup performance suffers when there are many edit log
|
HDFS-2982. Startup performance suffers when there are many edit log
|
||||||
|
|
|
@ -41,13 +41,13 @@ import com.google.common.annotations.VisibleForTesting;
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class EditLogFileOutputStream extends EditLogOutputStream {
|
public class EditLogFileOutputStream extends EditLogOutputStream {
|
||||||
private static Log LOG = LogFactory.getLog(EditLogFileOutputStream.class);
|
private static Log LOG = LogFactory.getLog(EditLogFileOutputStream.class);
|
||||||
public static final int PREALLOCATION_LENGTH = 1024 * 1024;
|
public static final int MIN_PREALLOCATION_LENGTH = 1024 * 1024;
|
||||||
|
|
||||||
private File file;
|
private File file;
|
||||||
private FileOutputStream fp; // file stream for storing edit logs
|
private FileOutputStream fp; // file stream for storing edit logs
|
||||||
private FileChannel fc; // channel of the file stream for sync
|
private FileChannel fc; // channel of the file stream for sync
|
||||||
private EditsDoubleBuffer doubleBuf;
|
private EditsDoubleBuffer doubleBuf;
|
||||||
static ByteBuffer fill = ByteBuffer.allocateDirect(PREALLOCATION_LENGTH);
|
static ByteBuffer fill = ByteBuffer.allocateDirect(MIN_PREALLOCATION_LENGTH);
|
||||||
|
|
||||||
private static boolean shouldSkipFsyncForTests = false;
|
private static boolean shouldSkipFsyncForTests = false;
|
||||||
|
|
||||||
|
@ -134,7 +134,7 @@ public class EditLogFileOutputStream extends EditLogOutputStream {
|
||||||
doubleBuf = null;
|
doubleBuf = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
// remove the last INVALID marker from transaction log.
|
// remove any preallocated padding bytes from the transaction log.
|
||||||
if (fc != null && fc.isOpen()) {
|
if (fc != null && fc.isOpen()) {
|
||||||
fc.truncate(fc.position());
|
fc.truncate(fc.position());
|
||||||
fc.close();
|
fc.close();
|
||||||
|
@ -168,7 +168,6 @@ public class EditLogFileOutputStream extends EditLogOutputStream {
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public void setReadyToFlush() throws IOException {
|
public void setReadyToFlush() throws IOException {
|
||||||
doubleBuf.getCurrentBuf().write(FSEditLogOpCodes.OP_INVALID.getOpCode()); // insert eof marker
|
|
||||||
doubleBuf.setReadyToFlush();
|
doubleBuf.setReadyToFlush();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -185,12 +184,11 @@ public class EditLogFileOutputStream extends EditLogOutputStream {
|
||||||
LOG.info("Nothing to flush");
|
LOG.info("Nothing to flush");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
preallocate(); // preallocate file if necessay
|
||||||
doubleBuf.flushTo(fp);
|
doubleBuf.flushTo(fp);
|
||||||
if (!shouldSkipFsyncForTests) {
|
if (!shouldSkipFsyncForTests) {
|
||||||
fc.force(false); // metadata updates not needed
|
fc.force(false); // metadata updates not needed
|
||||||
}
|
}
|
||||||
fc.position(fc.position() - 1); // skip back the end-of-file marker
|
|
||||||
preallocate(); // preallocate file if necessary
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -201,20 +199,27 @@ public class EditLogFileOutputStream extends EditLogOutputStream {
|
||||||
return doubleBuf.shouldForceSync();
|
return doubleBuf.shouldForceSync();
|
||||||
}
|
}
|
||||||
|
|
||||||
// allocate a big chunk of data
|
|
||||||
private void preallocate() throws IOException {
|
private void preallocate() throws IOException {
|
||||||
long position = fc.position();
|
long position = fc.position();
|
||||||
if (position + 4096 >= fc.size()) {
|
long size = fc.size();
|
||||||
if(FSNamesystem.LOG.isDebugEnabled()) {
|
int bufSize = doubleBuf.getReadyBuf().getLength();
|
||||||
FSNamesystem.LOG.debug("Preallocating Edit log, current size "
|
long need = bufSize - (size - position);
|
||||||
+ fc.size());
|
if (need <= 0) {
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
long oldSize = size;
|
||||||
|
long total = 0;
|
||||||
|
long fillCapacity = fill.capacity();
|
||||||
|
while (need > 0) {
|
||||||
fill.position(0);
|
fill.position(0);
|
||||||
IOUtils.writeFully(fc, fill, position);
|
IOUtils.writeFully(fc, fill, size);
|
||||||
if(FSNamesystem.LOG.isDebugEnabled()) {
|
need -= fillCapacity;
|
||||||
FSNamesystem.LOG.debug("Edit log size is now " + fc.size() +
|
size += fillCapacity;
|
||||||
" written " + fill.capacity() + " bytes " + " at offset " + position);
|
total += fillCapacity;
|
||||||
}
|
}
|
||||||
|
if(FSNamesystem.LOG.isDebugEnabled()) {
|
||||||
|
FSNamesystem.LOG.debug("Preallocated " + total + " bytes at the end of " +
|
||||||
|
"the edit log (offset " + oldSize + ")");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -89,6 +89,10 @@ class EditsDoubleBuffer {
|
||||||
return bufCurrent.size() >= initBufferSize;
|
return bufCurrent.size() >= initBufferSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
DataOutputBuffer getReadyBuf() {
|
||||||
|
return bufReady;
|
||||||
|
}
|
||||||
|
|
||||||
DataOutputBuffer getCurrentBuf() {
|
DataOutputBuffer getCurrentBuf() {
|
||||||
return bufCurrent;
|
return bufCurrent;
|
||||||
}
|
}
|
||||||
|
|
|
@ -2287,25 +2287,7 @@ public abstract class FSEditLogOp {
|
||||||
public FSEditLogOp readOp(boolean skipBrokenEdits) throws IOException {
|
public FSEditLogOp readOp(boolean skipBrokenEdits) throws IOException {
|
||||||
while (true) {
|
while (true) {
|
||||||
try {
|
try {
|
||||||
limiter.setLimit(MAX_OP_SIZE);
|
|
||||||
in.mark(MAX_OP_SIZE);
|
|
||||||
return decodeOp();
|
return decodeOp();
|
||||||
} catch (GarbageAfterTerminatorException e) {
|
|
||||||
in.reset();
|
|
||||||
if (!skipBrokenEdits) {
|
|
||||||
throw e;
|
|
||||||
}
|
|
||||||
// If we saw a terminator opcode followed by a long region of 0x00 or
|
|
||||||
// 0xff, we want to skip over that region, because there's nothing
|
|
||||||
// interesting there.
|
|
||||||
long numSkip = e.getNumAfterTerminator();
|
|
||||||
try {
|
|
||||||
IOUtils.skipFully(in, numSkip);
|
|
||||||
} catch (Throwable t) {
|
|
||||||
FSImage.LOG.error("Failed to skip " + numSkip + " bytes of " +
|
|
||||||
"garbage after an OP_INVALID.", t);
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
in.reset();
|
in.reset();
|
||||||
if (!skipBrokenEdits) {
|
if (!skipBrokenEdits) {
|
||||||
|
@ -2334,7 +2316,6 @@ public abstract class FSEditLogOp {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void verifyTerminator() throws IOException {
|
private void verifyTerminator() throws IOException {
|
||||||
long off = 0;
|
|
||||||
/** The end of the edit log should contain only 0x00 or 0xff bytes.
|
/** The end of the edit log should contain only 0x00 or 0xff bytes.
|
||||||
* If it contains other bytes, the log itself may be corrupt.
|
* If it contains other bytes, the log itself may be corrupt.
|
||||||
* It is important to check this; if we don't, a stray OP_INVALID byte
|
* It is important to check this; if we don't, a stray OP_INVALID byte
|
||||||
|
@ -2342,21 +2323,49 @@ public abstract class FSEditLogOp {
|
||||||
* know that we had lost data.
|
* know that we had lost data.
|
||||||
*/
|
*/
|
||||||
byte[] buf = new byte[4096];
|
byte[] buf = new byte[4096];
|
||||||
|
limiter.clearLimit();
|
||||||
|
int numRead = -1, idx = 0;
|
||||||
while (true) {
|
while (true) {
|
||||||
int numRead = in.read(buf);
|
try {
|
||||||
|
numRead = -1;
|
||||||
|
idx = 0;
|
||||||
|
numRead = in.read(buf);
|
||||||
if (numRead == -1) {
|
if (numRead == -1) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
for (int i = 0; i < numRead; i++, off++) {
|
while (idx < numRead) {
|
||||||
if ((buf[i] != (byte)0) && (buf[i] != (byte)-1)) {
|
if ((buf[idx] != (byte)0) && (buf[idx] != (byte)-1)) {
|
||||||
throw new GarbageAfterTerminatorException("Read garbage after " +
|
throw new IOException("Read extra bytes after " +
|
||||||
"the terminator!", off);
|
"the terminator!");
|
||||||
|
}
|
||||||
|
idx++;
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
// After reading each group of bytes, we reposition the mark one
|
||||||
|
// byte before the next group. Similarly, if there is an error, we
|
||||||
|
// want to reposition the mark one byte before the error
|
||||||
|
if (numRead != -1) {
|
||||||
|
in.reset();
|
||||||
|
IOUtils.skipFully(in, idx);
|
||||||
|
in.mark(buf.length + 1);
|
||||||
|
IOUtils.skipFully(in, 1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Read an opcode from the input stream.
|
||||||
|
*
|
||||||
|
* @return the opcode, or null on EOF.
|
||||||
|
*
|
||||||
|
* If an exception is thrown, the stream's mark will be set to the first
|
||||||
|
* problematic byte. This usually means the beginning of the opcode.
|
||||||
|
*/
|
||||||
private FSEditLogOp decodeOp() throws IOException {
|
private FSEditLogOp decodeOp() throws IOException {
|
||||||
|
limiter.setLimit(MAX_OP_SIZE);
|
||||||
|
in.mark(MAX_OP_SIZE);
|
||||||
|
|
||||||
if (checksum != null) {
|
if (checksum != null) {
|
||||||
checksum.reset();
|
checksum.reset();
|
||||||
}
|
}
|
||||||
|
@ -2543,35 +2552,4 @@ public abstract class FSEditLogOp {
|
||||||
short mode = Short.valueOf(st.getValue("MODE"));
|
short mode = Short.valueOf(st.getValue("MODE"));
|
||||||
return new PermissionStatus(username, groupname, new FsPermission(mode));
|
return new PermissionStatus(username, groupname, new FsPermission(mode));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Exception indicating that we found an OP_INVALID followed by some
|
|
||||||
* garbage. An OP_INVALID should signify the end of the file... if there
|
|
||||||
* is additional content after that, then the edit log is corrupt.
|
|
||||||
*/
|
|
||||||
static class GarbageAfterTerminatorException extends IOException {
|
|
||||||
private static final long serialVersionUID = 1L;
|
|
||||||
private final long numAfterTerminator;
|
|
||||||
|
|
||||||
public GarbageAfterTerminatorException(String str,
|
|
||||||
long numAfterTerminator) {
|
|
||||||
super(str);
|
|
||||||
this.numAfterTerminator = numAfterTerminator;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Get the number of bytes after the terminator at which the garbage
|
|
||||||
* appeared.
|
|
||||||
*
|
|
||||||
* So if you had an OP_INVALID followed immediately by another valid opcode,
|
|
||||||
* this would be 0.
|
|
||||||
* If you had an OP_INVALID followed by some padding bytes, followed by a
|
|
||||||
* stray byte at the end, this would be the number of padding bytes.
|
|
||||||
*
|
|
||||||
* @return numAfterTerminator
|
|
||||||
*/
|
|
||||||
public long getNumAfterTerminator() {
|
|
||||||
return numAfterTerminator;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,30 +20,25 @@ package org.apache.hadoop.hdfs.server.namenode;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.junit.Assert.fail;
|
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.nio.channels.FileChannel;
|
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
|
||||||
import org.apache.hadoop.fs.DU;
|
|
||||||
import org.apache.hadoop.fs.Path;
|
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
|
||||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
|
||||||
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
|
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FSEditLogLoader.EditLogValidation;
|
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
|
import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.mockito.Mockito;
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test the EditLogFileOutputStream
|
||||||
|
*/
|
||||||
public class TestEditLogFileOutputStream {
|
public class TestEditLogFileOutputStream {
|
||||||
private final static int HEADER_LEN = 17;
|
private final static File TEST_DIR =
|
||||||
|
new File(System.getProperty("test.build.data", "/tmp"));
|
||||||
private static final File TEST_EDITS =
|
private static final File TEST_EDITS =
|
||||||
new File(System.getProperty("test.build.data","/tmp"),
|
new File(TEST_DIR, "testEditLogFileOutput.log");
|
||||||
"editLogStream.dat");
|
final static int MIN_PREALLOCATION_LENGTH =
|
||||||
|
EditLogFileOutputStream.MIN_PREALLOCATION_LENGTH;
|
||||||
|
|
||||||
static {
|
static {
|
||||||
// No need to fsync for the purposes of tests. This makes
|
// No need to fsync for the purposes of tests. This makes
|
||||||
|
@ -52,69 +47,54 @@ public class TestEditLogFileOutputStream {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
|
@After
|
||||||
public void deleteEditsFile() {
|
public void deleteEditsFile() {
|
||||||
TEST_EDITS.delete();
|
if (TEST_EDITS.exists()) TEST_EDITS.delete();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
static void flushAndCheckLength(EditLogFileOutputStream elos,
|
||||||
public void testPreallocation() throws IOException {
|
long expectedLength) throws IOException {
|
||||||
Configuration conf = new HdfsConfiguration();
|
elos.setReadyToFlush();
|
||||||
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0)
|
elos.flushAndSync();
|
||||||
.build();
|
assertEquals(expectedLength, elos.getFile().length());
|
||||||
|
|
||||||
final long START_TXID = 1;
|
|
||||||
StorageDirectory sd = cluster.getNameNode().getFSImage()
|
|
||||||
.getStorage().getStorageDir(0);
|
|
||||||
File editLog = NNStorage.getInProgressEditsFile(sd, START_TXID);
|
|
||||||
|
|
||||||
EditLogValidation validation = EditLogFileInputStream.validateEditLog(editLog);
|
|
||||||
assertEquals("Edit log should contain a header as valid length",
|
|
||||||
HEADER_LEN, validation.getValidLength());
|
|
||||||
assertEquals(validation.getEndTxId(), START_TXID);
|
|
||||||
assertEquals("Edit log should have 1MB pre-allocated, plus 4 bytes " +
|
|
||||||
"for the version number",
|
|
||||||
EditLogFileOutputStream.PREALLOCATION_LENGTH + 4, editLog.length());
|
|
||||||
|
|
||||||
|
|
||||||
cluster.getFileSystem().mkdirs(new Path("/tmp"),
|
|
||||||
new FsPermission((short)777));
|
|
||||||
|
|
||||||
long oldLength = validation.getValidLength();
|
|
||||||
validation = EditLogFileInputStream.validateEditLog(editLog);
|
|
||||||
assertTrue("Edit log should have more valid data after writing a txn " +
|
|
||||||
"(was: " + oldLength + " now: " + validation.getValidLength() + ")",
|
|
||||||
validation.getValidLength() > oldLength);
|
|
||||||
assertEquals(1, validation.getEndTxId() - START_TXID);
|
|
||||||
|
|
||||||
assertEquals("Edit log should be 1MB long, plus 4 bytes for the version number",
|
|
||||||
EditLogFileOutputStream.PREALLOCATION_LENGTH + 4, editLog.length());
|
|
||||||
// 256 blocks for the 1MB of preallocation space
|
|
||||||
assertTrue("Edit log disk space used should be at least 257 blocks",
|
|
||||||
256 * 4096 <= new DU(editLog, conf).getUsed());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests writing to the EditLogFileOutputStream. Due to preallocation, the
|
||||||
|
* length of the edit log will usually be longer than its valid contents.
|
||||||
|
*/
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testClose() throws IOException {
|
public void testRawWrites() throws IOException {
|
||||||
String errorMessage = "TESTING: fc.truncate() threw IOE";
|
EditLogFileOutputStream elos = new EditLogFileOutputStream(TEST_EDITS, 0);
|
||||||
|
|
||||||
File testDir = new File(System.getProperty("test.build.data", "/tmp"));
|
|
||||||
assertTrue("could not create test directory", testDir.exists() || testDir.mkdirs());
|
|
||||||
File f = new File(testDir, "edits");
|
|
||||||
assertTrue("could not create test file", f.createNewFile());
|
|
||||||
EditLogFileOutputStream elos = new EditLogFileOutputStream(f, 0);
|
|
||||||
|
|
||||||
FileChannel mockFc = Mockito.spy(elos.getFileChannelForTesting());
|
|
||||||
Mockito.doThrow(new IOException(errorMessage)).when(mockFc).truncate(Mockito.anyLong());
|
|
||||||
elos.setFileChannelForTesting(mockFc);
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
elos.close();
|
byte[] small = new byte[] {1,2,3,4,5,8,7};
|
||||||
fail("elos.close() succeeded, but should have thrown");
|
elos.create();
|
||||||
} catch (IOException e) {
|
// The first (small) write we make extends the file by 1 MB due to
|
||||||
assertEquals("wrong IOE thrown from elos.close()", e.getMessage(), errorMessage);
|
// preallocation.
|
||||||
|
elos.writeRaw(small, 0, small.length);
|
||||||
|
flushAndCheckLength(elos, MIN_PREALLOCATION_LENGTH);
|
||||||
|
// The next small write we make goes into the area that was already
|
||||||
|
// preallocated.
|
||||||
|
elos.writeRaw(small, 0, small.length);
|
||||||
|
flushAndCheckLength(elos, MIN_PREALLOCATION_LENGTH);
|
||||||
|
// Now we write enough bytes so that we exceed the minimum preallocated
|
||||||
|
// length.
|
||||||
|
final int BIG_WRITE_LENGTH = 3 * MIN_PREALLOCATION_LENGTH;
|
||||||
|
byte[] buf = new byte[4096];
|
||||||
|
for (int i = 0; i < buf.length; i++) {
|
||||||
|
buf[i] = 0;
|
||||||
|
}
|
||||||
|
int total = BIG_WRITE_LENGTH;
|
||||||
|
while (total > 0) {
|
||||||
|
int toWrite = (total > buf.length) ? buf.length : total;
|
||||||
|
elos.writeRaw(buf, 0, toWrite);
|
||||||
|
total -= toWrite;
|
||||||
|
}
|
||||||
|
flushAndCheckLength(elos, 4 * MIN_PREALLOCATION_LENGTH);
|
||||||
|
} finally {
|
||||||
|
if (elos != null) elos.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
assertEquals("fc was not nulled when elos.close() failed", elos.getFileChannelForTesting(), null);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -160,5 +140,4 @@ public class TestEditLogFileOutputStream {
|
||||||
editLogStream.abort();
|
editLogStream.abort();
|
||||||
editLogStream.abort();
|
editLogStream.abort();
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -136,6 +136,9 @@ public class TestNameNodeRecovery {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A test scenario for the edit log
|
||||||
|
*/
|
||||||
private interface EditLogTestSetup {
|
private interface EditLogTestSetup {
|
||||||
/**
|
/**
|
||||||
* Set up the edit log.
|
* Set up the edit log.
|
||||||
|
@ -157,10 +160,50 @@ public class TestNameNodeRecovery {
|
||||||
abstract public Set<Long> getValidTxIds();
|
abstract public Set<Long> getValidTxIds();
|
||||||
}
|
}
|
||||||
|
|
||||||
private class EltsTestEmptyLog implements EditLogTestSetup {
|
static void padEditLog(EditLogOutputStream elos, int paddingLength)
|
||||||
|
throws IOException {
|
||||||
|
if (paddingLength <= 0) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
byte buf[] = new byte[4096];
|
||||||
|
for (int i = 0; i < buf.length; i++) {
|
||||||
|
buf[i] = (byte)-1;
|
||||||
|
}
|
||||||
|
int pad = paddingLength;
|
||||||
|
while (pad > 0) {
|
||||||
|
int toWrite = pad > buf.length ? buf.length : pad;
|
||||||
|
elos.writeRaw(buf, 0, toWrite);
|
||||||
|
pad -= toWrite;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static void addDeleteOpcode(EditLogOutputStream elos,
|
||||||
|
OpInstanceCache cache) throws IOException {
|
||||||
|
DeleteOp op = DeleteOp.getInstance(cache);
|
||||||
|
op.setTransactionId(0x0);
|
||||||
|
op.setPath("/foo");
|
||||||
|
op.setTimestamp(0);
|
||||||
|
elos.write(op);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test the scenario where we have an empty edit log.
|
||||||
|
*
|
||||||
|
* This class is also useful in testing whether we can correctly handle
|
||||||
|
* various amounts of padding bytes at the end of the log. We should be
|
||||||
|
* able to handle any amount of padding (including no padding) without
|
||||||
|
* throwing an exception.
|
||||||
|
*/
|
||||||
|
private static class EltsTestEmptyLog implements EditLogTestSetup {
|
||||||
|
private int paddingLength;
|
||||||
|
|
||||||
|
public EltsTestEmptyLog(int paddingLength) {
|
||||||
|
this.paddingLength = paddingLength;
|
||||||
|
}
|
||||||
|
|
||||||
public void addTransactionsToLog(EditLogOutputStream elos,
|
public void addTransactionsToLog(EditLogOutputStream elos,
|
||||||
OpInstanceCache cache) throws IOException {
|
OpInstanceCache cache) throws IOException {
|
||||||
// do nothing
|
padEditLog(elos, paddingLength);
|
||||||
}
|
}
|
||||||
|
|
||||||
public long getLastValidTxId() {
|
public long getLastValidTxId() {
|
||||||
|
@ -175,10 +218,65 @@ public class TestNameNodeRecovery {
|
||||||
/** Test an empty edit log */
|
/** Test an empty edit log */
|
||||||
@Test(timeout=180000)
|
@Test(timeout=180000)
|
||||||
public void testEmptyLog() throws IOException {
|
public void testEmptyLog() throws IOException {
|
||||||
runEditLogTest(new EltsTestEmptyLog());
|
runEditLogTest(new EltsTestEmptyLog(0));
|
||||||
}
|
}
|
||||||
|
|
||||||
private class EltsTestGarbageInEditLog implements EditLogTestSetup {
|
/** Test an empty edit log with padding */
|
||||||
|
@Test(timeout=180000)
|
||||||
|
public void testEmptyPaddedLog() throws IOException {
|
||||||
|
runEditLogTest(new EltsTestEmptyLog(
|
||||||
|
EditLogFileOutputStream.MIN_PREALLOCATION_LENGTH));
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Test an empty edit log with extra-long padding */
|
||||||
|
@Test(timeout=180000)
|
||||||
|
public void testEmptyExtraPaddedLog() throws IOException {
|
||||||
|
runEditLogTest(new EltsTestEmptyLog(
|
||||||
|
3 * EditLogFileOutputStream.MIN_PREALLOCATION_LENGTH));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test the scenario where an edit log contains some padding (0xff) bytes
|
||||||
|
* followed by valid opcode data.
|
||||||
|
*
|
||||||
|
* These edit logs are corrupt, but all the opcodes should be recoverable
|
||||||
|
* with recovery mode.
|
||||||
|
*/
|
||||||
|
private static class EltsTestOpcodesAfterPadding implements EditLogTestSetup {
|
||||||
|
private int paddingLength;
|
||||||
|
|
||||||
|
public EltsTestOpcodesAfterPadding(int paddingLength) {
|
||||||
|
this.paddingLength = paddingLength;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void addTransactionsToLog(EditLogOutputStream elos,
|
||||||
|
OpInstanceCache cache) throws IOException {
|
||||||
|
padEditLog(elos, paddingLength);
|
||||||
|
addDeleteOpcode(elos, cache);
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getLastValidTxId() {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Set<Long> getValidTxIds() {
|
||||||
|
return Sets.newHashSet(0L);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout=180000)
|
||||||
|
public void testOpcodesAfterPadding() throws IOException {
|
||||||
|
runEditLogTest(new EltsTestOpcodesAfterPadding(
|
||||||
|
EditLogFileOutputStream.MIN_PREALLOCATION_LENGTH));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout=180000)
|
||||||
|
public void testOpcodesAfterExtraPadding() throws IOException {
|
||||||
|
runEditLogTest(new EltsTestOpcodesAfterPadding(
|
||||||
|
3 * EditLogFileOutputStream.MIN_PREALLOCATION_LENGTH));
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class EltsTestGarbageInEditLog implements EditLogTestSetup {
|
||||||
final private long BAD_TXID = 4;
|
final private long BAD_TXID = 4;
|
||||||
final private long MAX_TXID = 10;
|
final private long MAX_TXID = 10;
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue