HDFS-5099. Namenode#copyEditLogSegmentsToSharedDir should close EditLogInputStreams upon finishing. Contributed by Chuan Liu.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1514481 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Chris Nauroth 2013-08-15 20:43:46 +00:00
parent a37d2fc89d
commit 2fc7e14e39
2 changed files with 39 additions and 28 deletions

View File

@ -339,6 +339,9 @@ Release 2.1.1-beta - UNRELEASED
HDFS-5080. BootstrapStandby not working with QJM when the existing NN is
active. (jing9)
HDFS-5099. Namenode#copyEditLogSegmentsToSharedDir should close
EditLogInputStreams upon finishing. (Chuan Liu via cnauroth)
Release 2.1.0-beta - 2013-08-22
INCOMPATIBLE CHANGES

View File

@ -956,41 +956,49 @@ public class NameNode implements NameNodeStatusMXBean {
FSEditLog sourceEditLog = fsns.getFSImage().editLog;
long fromTxId = fsns.getFSImage().getMostRecentCheckpointTxId();
Collection<EditLogInputStream> streams = sourceEditLog.selectInputStreams(
fromTxId+1, 0);
// Set the nextTxid to the CheckpointTxId+1
newSharedEditLog.setNextTxId(fromTxId + 1);
// Copy all edits after last CheckpointTxId to shared edits dir
for (EditLogInputStream stream : streams) {
LOG.debug("Beginning to copy stream " + stream + " to shared edits");
FSEditLogOp op;
boolean segmentOpen = false;
while ((op = stream.readOp()) != null) {
if (LOG.isTraceEnabled()) {
LOG.trace("copying op: " + op);
}
if (!segmentOpen) {
newSharedEditLog.startLogSegment(op.txid, false);
segmentOpen = true;
}
newSharedEditLog.logEdit(op);
Collection<EditLogInputStream> streams = null;
try {
streams = sourceEditLog.selectInputStreams(fromTxId + 1, 0);
if (op.opCode == FSEditLogOpCodes.OP_END_LOG_SEGMENT) {
// Set the nextTxid to the CheckpointTxId+1
newSharedEditLog.setNextTxId(fromTxId + 1);
// Copy all edits after last CheckpointTxId to shared edits dir
for (EditLogInputStream stream : streams) {
LOG.debug("Beginning to copy stream " + stream + " to shared edits");
FSEditLogOp op;
boolean segmentOpen = false;
while ((op = stream.readOp()) != null) {
if (LOG.isTraceEnabled()) {
LOG.trace("copying op: " + op);
}
if (!segmentOpen) {
newSharedEditLog.startLogSegment(op.txid, false);
segmentOpen = true;
}
newSharedEditLog.logEdit(op);
if (op.opCode == FSEditLogOpCodes.OP_END_LOG_SEGMENT) {
newSharedEditLog.logSync();
newSharedEditLog.endCurrentLogSegment(false);
LOG.debug("ending log segment because of END_LOG_SEGMENT op in "
+ stream);
segmentOpen = false;
}
}
if (segmentOpen) {
LOG.debug("ending log segment because of end of stream in " + stream);
newSharedEditLog.logSync();
newSharedEditLog.endCurrentLogSegment(false);
LOG.debug("ending log segment because of END_LOG_SEGMENT op in " + stream);
segmentOpen = false;
}
}
if (segmentOpen) {
LOG.debug("ending log segment because of end of stream in " + stream);
newSharedEditLog.logSync();
newSharedEditLog.endCurrentLogSegment(false);
segmentOpen = false;
} finally {
if (streams != null) {
FSEditLog.closeAllStreams(streams);
}
}
}