diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index d0de5f39d2f..3f361a0e7ab 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -62,6 +62,9 @@ Trunk (unreleased changes) HDFS-3178. Add states and state handler for journal synchronization in JournalService. (szetszwo) + HDFS-3273. Refactor BackupImage and FSEditLog, and rename + JournalListener.rollLogs(..) to startLogSegment(..). (szetszwo) + OPTIMIZATIONS HDFS-2834. Add a ByteBuffer-based read API to DFSInputStream. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalListener.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalListener.java index 5d93a4cbaea..2d5ec9e9860 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalListener.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalListener.java @@ -60,5 +60,5 @@ public void journal(JournalService service, long firstTxnId, int numTxns, * Any IOException thrown from the listener is thrown back in * {@link JournalProtocol#startLogSegment} */ - public void rollLogs(JournalService service, long txid) throws IOException; + public void startLogSegment(JournalService service, long txid) throws IOException; } \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalService.java index 4e25eea3135..e8d7073670b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalService.java @@ -256,7 +256,7 @@ public void startLogSegment(JournalInfo journalInfo, long epoch, long txid) } stateHandler.isStartLogSegmentAllowed(); verify(epoch, journalInfo); - listener.rollLogs(this, txid); + listener.startLogSegment(this, txid); stateHandler.startLogSegment(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java index 85f0245928c..a9aa20d4d2b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java @@ -21,6 +21,7 @@ import java.util.Collection; import java.util.Iterator; import java.util.List; + import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; @@ -183,21 +184,9 @@ synchronized void journal(long firstTxId, int numTxns, byte[] data) throws IOExc } // write to BN's local edit log. - logEditsLocally(firstTxId, numTxns, data); + editLog.journal(firstTxId, numTxns, data); } - /** - * Write the batch of edits to the local copy of the edit logs. - */ - private void logEditsLocally(long firstTxId, int numTxns, byte[] data) { - long expectedTxId = editLog.getLastWrittenTxId() + 1; - Preconditions.checkState(firstTxId == expectedTxId, - "received txid batch starting at %s but expected txn %s", - firstTxId, expectedTxId); - editLog.setNextTxId(firstTxId + numTxns - 1); - editLog.logEdit(data.length, data); - editLog.logSync(); - } /** * Apply the batch of edits to the local namespace. @@ -342,28 +331,9 @@ private synchronized void setState(BNState newState) { * This causes the BN to also start the new edit log in its local * directories. */ - synchronized void namenodeStartedLogSegment(long txid) - throws IOException { - LOG.info("NameNode started a new log segment at txid " + txid); - if (editLog.isSegmentOpen()) { - if (editLog.getLastWrittenTxId() == txid - 1) { - // We are in sync with the NN, so end and finalize the current segment - editLog.endCurrentLogSegment(false); - } else { - // We appear to have missed some transactions -- the NN probably - // lost contact with us temporarily. So, mark the current segment - // as aborted. - LOG.warn("NN started new log segment at txid " + txid + - ", but BN had only written up to txid " + - editLog.getLastWrittenTxId() + - "in the log segment starting at " + - editLog.getCurSegmentTxId() + ". Aborting this " + - "log segment."); - editLog.abortCurrentLogSegment(); - } - } - editLog.setNextTxId(txid); - editLog.startLogSegment(txid, false); + synchronized void namenodeStartedLogSegment(long txid) throws IOException { + editLog.startLogSegment(txid, true); + if (bnState == BNState.DROP_UNTIL_NEXT_ROLL) { setState(BNState.JOURNAL_ONLY); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java index cb826f6e089..fea05e91354 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java @@ -260,7 +260,7 @@ private void verifyJournalRequest(JournalInfo journalInfo) } ///////////////////////////////////////////////////// - // BackupNodeProtocol implementation for backup node. + // JournalProtocol implementation for backup node. ///////////////////////////////////////////////////// @Override public void startLogSegment(JournalInfo journalInfo, long epoch, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java index d96af1ee226..c90998a6924 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java @@ -18,18 +18,20 @@ package org.apache.hadoop.hdfs.server.namenode; import static org.apache.hadoop.hdfs.server.common.Util.now; -import java.net.URI; + import java.io.IOException; +import java.lang.reflect.Constructor; +import java.net.URI; import java.util.ArrayList; import java.util.Collection; import java.util.List; -import java.lang.reflect.Constructor; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Options; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.DFSConfigKeys; @@ -37,14 +39,34 @@ import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole; import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; -import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.*; +import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddOp; +import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.CancelDelegationTokenOp; +import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.CloseOp; +import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.ConcatDeleteOp; +import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.DeleteOp; +import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.GetDelegationTokenOp; +import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.LogSegmentOp; +import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.MkdirOp; +import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.OpInstanceCache; +import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.ReassignLeaseOp; +import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameOldOp; +import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameOp; +import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenewDelegationTokenOp; +import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetGenstampOp; +import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetOwnerOp; +import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetPermissionsOp; +import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetQuotaOp; +import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetReplicationOp; +import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SymlinkOp; +import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.TimesOp; +import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.UpdateBlocksOp; +import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.UpdateMasterKeyOp; import org.apache.hadoop.hdfs.server.namenode.JournalSet.JournalAndStream; import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics; import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.security.token.delegation.DelegationKey; -import org.apache.hadoop.conf.Configuration; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; @@ -269,7 +291,7 @@ synchronized void openForWrite() throws IOException { IOUtils.closeStream(s); } - startLogSegment(segmentTxId, true); + startLogSegmentAndWriteHeaderTxn(segmentTxId); assert state == State.IN_SEGMENT : "Bad state: " + state; } @@ -864,18 +886,48 @@ synchronized long rollEditLog() throws IOException { endCurrentLogSegment(true); long nextTxId = getLastWrittenTxId() + 1; - startLogSegment(nextTxId, true); + startLogSegmentAndWriteHeaderTxn(nextTxId); assert curSegmentTxId == nextTxId; return nextTxId; } + + /** + * Remote namenode just has started a log segment, start log segment locally. + */ + public synchronized void startLogSegment(long txid, + boolean abortCurrentLogSegment) throws IOException { + LOG.info("Namenode started a new log segment at txid " + txid); + if (isSegmentOpen()) { + if (getLastWrittenTxId() == txid - 1) { + //In sync with the NN, so end and finalize the current segment` + endCurrentLogSegment(false); + } else { + //Missed some transactions: probably lost contact with NN temporarily. + final String mess = "Cannot start a new log segment at txid " + txid + + " since only up to txid " + getLastWrittenTxId() + + " have been written in the log segment starting at " + + getCurSegmentTxId() + "."; + if (abortCurrentLogSegment) { + //Mark the current segment as aborted. + LOG.warn(mess); + abortCurrentLogSegment(); + } else { + throw new IOException(mess); + } + } + } + setNextTxId(txid); + startLogSegment(txid); + } /** * Start writing to the log segment with the given txid. * Transitions from BETWEEN_LOG_SEGMENTS state to IN_LOG_SEGMENT state. */ - synchronized void startLogSegment(final long segmentTxId, - boolean writeHeaderTxn) throws IOException { + private void startLogSegment(final long segmentTxId) throws IOException { + assert Thread.holdsLock(this); + LOG.info("Starting log segment at " + segmentTxId); Preconditions.checkArgument(segmentTxId > 0, "Bad txid: %s", segmentTxId); @@ -903,12 +955,15 @@ synchronized void startLogSegment(final long segmentTxId, curSegmentTxId = segmentTxId; state = State.IN_SEGMENT; + } - if (writeHeaderTxn) { - logEdit(LogSegmentOp.getInstance(cache.get(), - FSEditLogOpCodes.OP_START_LOG_SEGMENT)); - logSync(); - } + synchronized void startLogSegmentAndWriteHeaderTxn(final long segmentTxId + ) throws IOException { + startLogSegment(segmentTxId); + + logEdit(LogSegmentOp.getInstance(cache.get(), + FSEditLogOpCodes.OP_START_LOG_SEGMENT)); + logSync(); } /** @@ -1057,6 +1112,17 @@ private synchronized BackupJournalManager findBackupJournal( return null; } + /** Write the batch of edits to edit log. */ + public synchronized void journal(long firstTxId, int numTxns, byte[] data) { + final long expectedTxId = getLastWrittenTxId() + 1; + Preconditions.checkState(firstTxId == expectedTxId, + "received txid batch starting at %s but expected txid %s", + firstTxId, expectedTxId); + setNextTxId(firstTxId + numTxns - 1); + logEdit(data.length, data); + logSync(); + } + /** * Write an operation to the edit log. Do not sync to persistent * store yet. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java index 70d184d9142..0279337b5cc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java @@ -823,7 +823,7 @@ public synchronized void saveNamespace(FSNamesystem source) throws IOException { storage.writeAll(); } finally { if (editLogWasOpen) { - editLog.startLogSegment(imageTxId + 1, true); + editLog.startLogSegmentAndWriteHeaderTxn(imageTxId + 1); // Take this opportunity to note the current transaction. // Even if the namespace save was cancelled, this marker // is only used to determine what transaction ID is required diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/journalservice/TestJournalService.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/journalservice/TestJournalService.java index ab3ce9fee37..b2cb080066b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/journalservice/TestJournalService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/journalservice/TestJournalService.java @@ -43,7 +43,7 @@ public class TestJournalService { private Configuration conf = new HdfsConfiguration(); /** - * Test calls backs {@link JournalListener#rollLogs(JournalService, long)} and + * Test calls backs {@link JournalListener#startLogSegment(JournalService, long)} and * {@link JournalListener#journal(JournalService, long, int, byte[])} are * called. */ @@ -85,7 +85,7 @@ private JournalService startJournalService(JournalListener listener) */ private void verifyRollLogsCallback(JournalService s, JournalListener l) throws IOException { - Mockito.verify(l, Mockito.times(1)).rollLogs(Mockito.eq(s), Mockito.anyLong()); + Mockito.verify(l, Mockito.times(1)).startLogSegment(Mockito.eq(s), Mockito.anyLong()); } /**