diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditsDoubleBuffer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditsDoubleBuffer.java index 76a9eacc551..4e1dab069b0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditsDoubleBuffer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditsDoubleBuffer.java @@ -17,9 +17,15 @@ */ package org.apache.hadoop.hdfs.server.namenode; +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; import java.io.IOException; import java.io.OutputStream; +import java.util.Arrays; +import org.apache.commons.codec.binary.Hex; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.Writer; @@ -37,6 +43,7 @@ import com.google.common.base.Preconditions; */ @InterfaceAudience.Private public class EditsDoubleBuffer { + protected static final Log LOG = LogFactory.getLog(EditsDoubleBuffer.class); private TxnBuffer bufCurrent; // current buffer for writing private TxnBuffer bufReady; // buffer ready for flushing @@ -63,6 +70,7 @@ public class EditsDoubleBuffer { int bufSize = bufCurrent.size(); if (bufSize != 0) { + bufCurrent.dumpRemainingEditLogs(); throw new IOException("FSEditStream has " + bufSize + " bytes still to be flushed and cannot be closed."); } @@ -157,6 +165,32 @@ public class EditsDoubleBuffer { numTxns = 0; return this; } + + private void dumpRemainingEditLogs() { + byte[] buf = this.getData(); + byte[] remainingRawEdits = Arrays.copyOfRange(buf, 0, this.size()); + ByteArrayInputStream bis = new ByteArrayInputStream(remainingRawEdits); + DataInputStream dis = new DataInputStream(bis); + FSEditLogLoader.PositionTrackingInputStream tracker = + new FSEditLogLoader.PositionTrackingInputStream(bis); + FSEditLogOp.Reader reader = FSEditLogOp.Reader.create(dis, tracker, + NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION); + FSEditLogOp op; + LOG.warn("The edits buffer is " + size() + " bytes long with " + numTxns + + " unflushed transactions. " + + "Below is the list of unflushed transactions:"); + int numTransactions = 0; + try { + while ((op = reader.readOp(false)) != null) { + LOG.warn("Unflushed op [" + numTransactions + "]: " + op); + numTransactions++; + } + } catch (IOException ioe) { + // If any exceptions, print raw bytes and stop. + LOG.warn("Unable to dump remaining ops. Remaining raw bytes: " + + Hex.encodeHexString(remainingRawEdits), ioe); + } + } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditsDoubleBuffer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditsDoubleBuffer.java index 9feeada276a..b75309e64cf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditsDoubleBuffer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditsDoubleBuffer.java @@ -25,6 +25,8 @@ import static org.junit.Assert.fail; import java.io.IOException; import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.test.GenericTestUtils; +import org.junit.Assert; import org.junit.Test; public class TestEditsDoubleBuffer { @@ -81,4 +83,56 @@ public class TestEditsDoubleBuffer { } } } + + @Test + public void testDumpEdits() throws IOException { + final int defaultBufferSize = 256; + EditsDoubleBuffer buffer = new EditsDoubleBuffer(defaultBufferSize); + FSEditLogOp.OpInstanceCache cache = new FSEditLogOp.OpInstanceCache(); + + String src = "/testdumpedits"; + short replication = 1; + + FSEditLogOp.SetReplicationOp op = + FSEditLogOp.SetReplicationOp.getInstance(cache.get()) + .setPath(src) + .setReplication(replication); + op.setTransactionId(1); + buffer.writeOp(op); + + src = "/testdumpedits2"; + + FSEditLogOp.DeleteOp op2 = + FSEditLogOp.DeleteOp.getInstance(cache.get()) + .setPath(src) + .setTimestamp(0); + op2.setTransactionId(2); + buffer.writeOp(op2); + + FSEditLogOp.AllocateBlockIdOp op3 = + FSEditLogOp.AllocateBlockIdOp.getInstance(cache.get()) + .setBlockId(0); + op3.setTransactionId(3); + buffer.writeOp(op3); + + GenericTestUtils.LogCapturer logs = + GenericTestUtils.LogCapturer.captureLogs(EditsDoubleBuffer.LOG); + try { + buffer.close(); + fail(); + } catch (IOException ioe) { + GenericTestUtils.assertExceptionContains( + "bytes still to be flushed and cannot be closed.", + ioe); + EditsDoubleBuffer.LOG.info("Exception expected: ", ioe); + } + logs.stopCapturing(); + // Make sure ops are dumped into log in human readable format. + Assert.assertTrue("expected " + op.toString() + " in the log", + logs.getOutput().contains(op.toString())); + Assert.assertTrue("expected " + op2.toString() + " in the log", + logs.getOutput().contains(op2.toString())); + Assert.assertTrue("expected " + op3.toString() + " in the log", + logs.getOutput().contains(op3.toString())); + } }