HDFS-11306. Print remaining edit logs from buffer if edit log can't be rolled. Contributed by Wei-Chiu Chuang.

(cherry picked from commit 1cde954a4f)
(cherry picked from commit 914eeb997b)
This commit is contained in:
Wei-Chiu Chuang 2017-01-13 11:46:01 -08:00
parent 22f8b66137
commit 2463923492
2 changed files with 88 additions and 0 deletions

View File

@ -17,9 +17,15 @@
*/ */
package org.apache.hadoop.hdfs.server.namenode; package org.apache.hadoop.hdfs.server.namenode;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.util.Arrays;
import org.apache.commons.codec.binary.Hex;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.Writer; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.Writer;
@ -37,6 +43,7 @@ import com.google.common.base.Preconditions;
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class EditsDoubleBuffer { public class EditsDoubleBuffer {
protected static final Log LOG = LogFactory.getLog(EditsDoubleBuffer.class);
private TxnBuffer bufCurrent; // current buffer for writing private TxnBuffer bufCurrent; // current buffer for writing
private TxnBuffer bufReady; // buffer ready for flushing private TxnBuffer bufReady; // buffer ready for flushing
@ -63,6 +70,7 @@ public class EditsDoubleBuffer {
int bufSize = bufCurrent.size(); int bufSize = bufCurrent.size();
if (bufSize != 0) { if (bufSize != 0) {
bufCurrent.dumpRemainingEditLogs();
throw new IOException("FSEditStream has " + bufSize throw new IOException("FSEditStream has " + bufSize
+ " bytes still to be flushed and cannot be closed."); + " bytes still to be flushed and cannot be closed.");
} }
@ -157,6 +165,32 @@ public class EditsDoubleBuffer {
numTxns = 0; numTxns = 0;
return this; return this;
} }
private void dumpRemainingEditLogs() {
byte[] buf = this.getData();
byte[] remainingRawEdits = Arrays.copyOfRange(buf, 0, this.size());
ByteArrayInputStream bis = new ByteArrayInputStream(remainingRawEdits);
DataInputStream dis = new DataInputStream(bis);
FSEditLogLoader.PositionTrackingInputStream tracker =
new FSEditLogLoader.PositionTrackingInputStream(bis);
FSEditLogOp.Reader reader = FSEditLogOp.Reader.create(dis, tracker,
NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
FSEditLogOp op;
LOG.warn("The edits buffer is " + size() + " bytes long with " + numTxns +
" unflushed transactions. " +
"Below is the list of unflushed transactions:");
int numTransactions = 0;
try {
while ((op = reader.readOp(false)) != null) {
LOG.warn("Unflushed op [" + numTransactions + "]: " + op);
numTransactions++;
}
} catch (IOException ioe) {
// If any exceptions, print raw bytes and stop.
LOG.warn("Unable to dump remaining ops. Remaining raw bytes: " +
Hex.encodeHexString(remainingRawEdits), ioe);
}
}
} }
} }

View File

@ -25,6 +25,8 @@ import static org.junit.Assert.fail;
import java.io.IOException; import java.io.IOException;
import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
public class TestEditsDoubleBuffer { public class TestEditsDoubleBuffer {
@ -81,4 +83,56 @@ public class TestEditsDoubleBuffer {
} }
} }
} }
@Test
public void testDumpEdits() throws IOException {
final int defaultBufferSize = 256;
EditsDoubleBuffer buffer = new EditsDoubleBuffer(defaultBufferSize);
FSEditLogOp.OpInstanceCache cache = new FSEditLogOp.OpInstanceCache();
String src = "/testdumpedits";
short replication = 1;
FSEditLogOp.SetReplicationOp op =
FSEditLogOp.SetReplicationOp.getInstance(cache.get())
.setPath(src)
.setReplication(replication);
op.setTransactionId(1);
buffer.writeOp(op);
src = "/testdumpedits2";
FSEditLogOp.DeleteOp op2 =
FSEditLogOp.DeleteOp.getInstance(cache.get())
.setPath(src)
.setTimestamp(0);
op2.setTransactionId(2);
buffer.writeOp(op2);
FSEditLogOp.AllocateBlockIdOp op3 =
FSEditLogOp.AllocateBlockIdOp.getInstance(cache.get())
.setBlockId(0);
op3.setTransactionId(3);
buffer.writeOp(op3);
GenericTestUtils.LogCapturer logs =
GenericTestUtils.LogCapturer.captureLogs(EditsDoubleBuffer.LOG);
try {
buffer.close();
fail();
} catch (IOException ioe) {
GenericTestUtils.assertExceptionContains(
"bytes still to be flushed and cannot be closed.",
ioe);
EditsDoubleBuffer.LOG.info("Exception expected: ", ioe);
}
logs.stopCapturing();
// Make sure ops are dumped into log in human readable format.
Assert.assertTrue("expected " + op.toString() + " in the log",
logs.getOutput().contains(op.toString()));
Assert.assertTrue("expected " + op2.toString() + " in the log",
logs.getOutput().contains(op2.toString()));
Assert.assertTrue("expected " + op3.toString() + " in the log",
logs.getOutput().contains(op3.toString()));
}
} }