diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt index 23b0eb7ea0b..795e65876f2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt @@ -83,3 +83,5 @@ HDFS-2692. Fix bugs related to failover from/into safe mode. (todd) HDFS-2716. Configuration needs to allow different dfs.http.addresses for each HA NN (todd) HDFS-2720. Fix MiniDFSCluster HA support to work properly on Windows. (Uma Maheswara Rao G via todd) + +HDFS-2291. Allow the StandbyNode to make checkpoints in an HA setup. (todd) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index a5debe0a29d..b3fee6fc511 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -331,4 +331,6 @@ public class DFSConfigKeys extends CommonConfigurationKeys { // HA related configuration public static final String DFS_HA_NAMENODES_KEY = "dfs.ha.namenodes"; public static final String DFS_HA_NAMENODE_ID_KEY = "dfs.ha.namenode.id"; + public static final String DFS_HA_STANDBY_CHECKPOINTS_KEY = "dfs.ha.standby.checkpoints"; + public static final boolean DFS_HA_STANDBY_CHECKPOINTS_DEFAULT = true; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CheckpointConf.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CheckpointConf.java new file mode 100644 index 00000000000..8b3cf04d741 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CheckpointConf.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode; + +import static org.apache.hadoop.hdfs.DFSConfigKeys.*; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; + +import com.google.common.collect.ImmutableList; + +@InterfaceAudience.Private +public class CheckpointConf { + private static final Log LOG = LogFactory.getLog(CheckpointConf.class); + + /** How often to checkpoint regardless of number of txns */ + private final long checkpointPeriod; // in seconds + + /** How often to poll the NN to check checkpointTxnCount */ + private final long checkpointCheckPeriod; // in seconds + + /** checkpoint once every this many transactions, regardless of time */ + private final long checkpointTxnCount; + + + public CheckpointConf(Configuration conf) { + checkpointCheckPeriod = conf.getLong( + DFS_NAMENODE_CHECKPOINT_CHECK_PERIOD_KEY, + DFS_NAMENODE_CHECKPOINT_CHECK_PERIOD_DEFAULT); + + checkpointPeriod = conf.getLong(DFS_NAMENODE_CHECKPOINT_PERIOD_KEY, + DFS_NAMENODE_CHECKPOINT_PERIOD_DEFAULT); + checkpointTxnCount = conf.getLong(DFS_NAMENODE_CHECKPOINT_TXNS_KEY, + DFS_NAMENODE_CHECKPOINT_TXNS_DEFAULT); + warnForDeprecatedConfigs(conf); + } + + private static void warnForDeprecatedConfigs(Configuration conf) { + for (String key : ImmutableList.of( + "fs.checkpoint.size", + "dfs.namenode.checkpoint.size")) { + if (conf.get(key) != null) { + LOG.warn("Configuration key " + key + " is deprecated! Ignoring..." + + " Instead please specify a value for " + + DFS_NAMENODE_CHECKPOINT_TXNS_KEY); + } + } + } + + public long getPeriod() { + return checkpointPeriod; + } + + public long getCheckPeriod() { + return Math.min(checkpointCheckPeriod, checkpointPeriod); + } + + public long getTxnCount() { + return checkpointTxnCount; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java index 4f485916b5d..6ae931fd44f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java @@ -29,7 +29,6 @@ import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole; import org.apache.hadoop.hdfs.server.protocol.CheckpointCommand; import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand; @@ -58,17 +57,16 @@ class Checkpointer extends Daemon { private BackupNode backupNode; volatile boolean shouldRun; - private long checkpointPeriod; // in seconds - // Transactions count to trigger the checkpoint - private long checkpointTxnCount; private String infoBindAddress; + private CheckpointConf checkpointConf; + private BackupImage getFSImage() { return (BackupImage)backupNode.getFSImage(); } - private NamenodeProtocol getNamenode(){ + private NamenodeProtocol getRemoteNamenodeProxy(){ return backupNode.namenode; } @@ -89,26 +87,24 @@ class Checkpointer extends Daemon { /** * Initialize checkpoint. */ - @SuppressWarnings("deprecation") private void initialize(Configuration conf) throws IOException { // Create connection to the namenode. shouldRun = true; // Initialize other scheduling parameters from the configuration - checkpointPeriod = conf.getLong(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_PERIOD_KEY, - DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_PERIOD_DEFAULT); - checkpointTxnCount = conf.getLong(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_KEY, - DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_DEFAULT); - SecondaryNameNode.warnForDeprecatedConfigs(conf); + checkpointConf = new CheckpointConf(conf); // Pull out exact http address for posting url to avoid ip aliasing issues String fullInfoAddr = conf.get(DFS_NAMENODE_BACKUP_HTTP_ADDRESS_KEY, DFS_NAMENODE_BACKUP_HTTP_ADDRESS_DEFAULT); infoBindAddress = fullInfoAddr.substring(0, fullInfoAddr.indexOf(":")); - LOG.info("Checkpoint Period : " + checkpointPeriod + " secs " + - "(" + checkpointPeriod/60 + " min)"); - LOG.info("Transactions count is : " + checkpointTxnCount + ", to trigger checkpoint"); + LOG.info("Checkpoint Period : " + + checkpointConf.getPeriod() + " secs " + + "(" + checkpointConf.getPeriod()/60 + " min)"); + LOG.info("Transactions count is : " + + checkpointConf.getTxnCount() + + ", to trigger checkpoint"); } /** @@ -125,8 +121,8 @@ class Checkpointer extends Daemon { public void run() { // Check the size of the edit log once every 5 minutes. long periodMSec = 5 * 60; // 5 minutes - if(checkpointPeriod < periodMSec) { - periodMSec = checkpointPeriod; + if(checkpointConf.getPeriod() < periodMSec) { + periodMSec = checkpointConf.getPeriod(); } periodMSec *= 1000; @@ -142,7 +138,7 @@ class Checkpointer extends Daemon { shouldCheckpoint = true; } else { long txns = countUncheckpointedTxns(); - if(txns >= checkpointTxnCount) + if(txns >= checkpointConf.getTxnCount()) shouldCheckpoint = true; } if(shouldCheckpoint) { @@ -165,7 +161,7 @@ class Checkpointer extends Daemon { } private long countUncheckpointedTxns() throws IOException { - long curTxId = getNamenode().getTransactionID(); + long curTxId = getRemoteNamenodeProxy().getTransactionID(); long uncheckpointedTxns = curTxId - getFSImage().getStorage().getMostRecentCheckpointTxId(); assert uncheckpointedTxns >= 0; @@ -183,7 +179,7 @@ class Checkpointer extends Daemon { bnImage.freezeNamespaceAtNextRoll(); NamenodeCommand cmd = - getNamenode().startCheckpoint(backupNode.getRegistration()); + getRemoteNamenodeProxy().startCheckpoint(backupNode.getRegistration()); CheckpointCommand cpCmd = null; switch(cmd.getAction()) { case NamenodeProtocol.ACT_SHUTDOWN: @@ -207,7 +203,7 @@ class Checkpointer extends Daemon { long lastApplied = bnImage.getLastAppliedTxId(); LOG.debug("Doing checkpoint. Last applied: " + lastApplied); RemoteEditLogManifest manifest = - getNamenode().getEditLogManifest(bnImage.getLastAppliedTxId() + 1); + getRemoteNamenodeProxy().getEditLogManifest(bnImage.getLastAppliedTxId() + 1); if (!manifest.getLogs().isEmpty()) { RemoteEditLog firstRemoteLog = manifest.getLogs().get(0); @@ -260,7 +256,7 @@ class Checkpointer extends Daemon { bnStorage, txid); } - getNamenode().endCheckpoint(backupNode.getRegistration(), sig); + getRemoteNamenodeProxy().endCheckpoint(backupNode.getRegistration(), sig); if (backupNode.getRole() == NamenodeRole.BACKUP) { bnImage.convergeJournalSpool(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java index f1f163eed2e..19f9f5117aa 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java @@ -285,7 +285,7 @@ public class FSEditLog { /** * @return true if the log is open in read mode. */ - synchronized boolean isOpenForRead() { + public synchronized boolean isOpenForRead() { return state == State.OPEN_FOR_READING; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java index 54c5cf8e109..b92a37eae8c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java @@ -808,7 +808,7 @@ public class FSImage implements Closeable { * Save the contents of the FS image to a new image file in each of the * current storage directories. */ - synchronized void saveNamespace(FSNamesystem source) throws IOException { + public synchronized void saveNamespace(FSNamesystem source) throws IOException { assert editLog != null : "editLog must be initialized"; storage.attemptRestoreRemovedStorage(); @@ -817,7 +817,7 @@ public class FSImage implements Closeable { if (editLogWasOpen) { editLog.endCurrentLogSegment(true); } - long imageTxId = editLog.getLastWrittenTxId(); + long imageTxId = getLastAppliedOrWrittenTxId(); try { saveFSImageInAllDirs(source, imageTxId); storage.writeAll(); @@ -834,7 +834,7 @@ public class FSImage implements Closeable { } - void cancelSaveNamespace(String reason) + public void cancelSaveNamespace(String reason) throws InterruptedException { SaveNamespaceContext ctx = curSaveNamespaceContext; if (ctx != null) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index ac20b0ee4d8..b4f522e00b3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -47,6 +47,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_MIN_DAT import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_MIN_DATANODES_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_STANDBY_CHECKPOINTS_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_STANDBY_CHECKPOINTS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_UPGRADE_PERMISSION_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_UPGRADE_PERMISSION_KEY; @@ -112,6 +114,7 @@ import org.apache.hadoop.fs.UnresolvedLinkException; import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.PermissionStatus; +import org.apache.hadoop.ha.ServiceFailedException; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.HAUtil; import org.apache.hadoop.hdfs.HdfsConfiguration; @@ -159,6 +162,7 @@ import org.apache.hadoop.hdfs.server.namenode.ha.ActiveState; import org.apache.hadoop.hdfs.server.namenode.ha.EditLogTailer; import org.apache.hadoop.hdfs.server.namenode.ha.HAContext; import org.apache.hadoop.hdfs.server.namenode.ha.HAState; +import org.apache.hadoop.hdfs.server.namenode.ha.StandbyCheckpointer; import org.apache.hadoop.hdfs.server.namenode.ha.StandbyState; import org.apache.hadoop.hdfs.server.namenode.metrics.FSNamesystemMBean; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; @@ -261,6 +265,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, private UserGroupInformation fsOwner; private String supergroup; private PermissionStatus defaultPermission; + private boolean standbyShouldCheckpoint; // Scan interval is not configurable. private static final long DELEGATION_TOKEN_REMOVER_SCAN_INTERVAL = @@ -321,11 +326,18 @@ public class FSNamesystem implements Namesystem, FSClusterStats, */ private EditLogTailer editLogTailer = null; + /** + * Used when this NN is in standby state to perform checkpoints. + */ + private StandbyCheckpointer standbyCheckpointer; + /** * Reference to the NN's HAContext object. This is only set once * {@link #startCommonServices(Configuration, HAContext)} is called. */ private HAContext haContext; + + private final Configuration conf; PendingDataNodeMessages getPendingDataNodeMessages() { return pendingDatanodeMessages; @@ -381,6 +393,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, * @throws IOException on bad configuration */ FSNamesystem(Configuration conf, FSImage fsImage) throws IOException { + this.conf = conf; try { initialize(conf, fsImage); } catch(IOException e) { @@ -568,11 +581,30 @@ public class FSNamesystem implements Namesystem, FSClusterStats, } editLogTailer = new EditLogTailer(this); editLogTailer.start(); + if (standbyShouldCheckpoint) { + standbyCheckpointer = new StandbyCheckpointer(conf, this); + standbyCheckpointer.start(); + } + } + + + /** + * Called while the NN is in Standby state, but just about to be + * asked to enter Active state. This cancels any checkpoints + * currently being taken. + */ + void prepareToStopStandbyServices() throws ServiceFailedException { + if (standbyCheckpointer != null) { + standbyCheckpointer.cancelAndPreventCheckpoints(); + } } /** Stop services required in standby state */ void stopStandbyServices() throws IOException { LOG.info("Stopping services started for standby state"); + if (standbyCheckpointer != null) { + standbyCheckpointer.stop(); + } if (editLogTailer != null) { editLogTailer.stop(); } @@ -728,6 +760,10 @@ public class FSNamesystem implements Namesystem, FSClusterStats, DFS_SUPPORT_APPEND_DEFAULT); this.dtpReplaceDatanodeOnFailure = ReplaceDatanodeOnFailure.get(conf); + + this.standbyShouldCheckpoint = conf.getBoolean( + DFS_HA_STANDBY_CHECKPOINTS_KEY, + DFS_HA_STANDBY_CHECKPOINTS_DEFAULT); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GetImageServlet.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GetImageServlet.java index 8753b270f1d..b9860032e6f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GetImageServlet.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GetImageServlet.java @@ -124,16 +124,18 @@ public class GetImageServlet extends HttpServlet { final long txid = parsedParams.getTxId(); if (! currentlyDownloadingCheckpoints.add(txid)) { - throw new IOException( + response.sendError(HttpServletResponse.SC_CONFLICT, "Another checkpointer is already in the process of uploading a" + " checkpoint made at transaction ID " + txid); + return null; } try { if (nnImage.getStorage().findImageFile(txid) != null) { - throw new IOException( + response.sendError(HttpServletResponse.SC_CONFLICT, "Another checkpointer already uploaded an checkpoint " + "for txid " + txid); + return null; } // issue a HTTP get request to download the new fsimage diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java index e763f6f6828..0dcf1e6f629 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java @@ -463,7 +463,7 @@ public class NNStorage extends Storage implements Closeable { /** * Return the transaction ID of the last checkpoint. */ - long getMostRecentCheckpointTxId() { + public long getMostRecentCheckpointTxId() { return mostRecentCheckpointTxId; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java index fc0c22eeeae..c9af0ba05bf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java @@ -543,6 +543,7 @@ public class NameNode { } else { state = STANDBY_STATE;; } + state.prepareToEnterState(haContext); state.enterState(haContext); } catch (IOException e) { this.stop(); @@ -965,6 +966,11 @@ public class NameNode { namesystem.startStandbyServices(); } + @Override + public void prepareToStopStandbyServices() throws ServiceFailedException { + namesystem.prepareToStopStandbyServices(); + } + @Override public void stopStandbyServices() throws IOException { // TODO(HA): Are we guaranteed to be the only active here? @@ -992,6 +998,7 @@ public class NameNode { public boolean allowStaleReads() { return allowStaleStandbyReads; } + } public boolean isStandbyState() { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SaveNamespaceCancelledException.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SaveNamespaceCancelledException.java index 2731275f261..5b49f0ee47c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SaveNamespaceCancelledException.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SaveNamespaceCancelledException.java @@ -18,8 +18,10 @@ package org.apache.hadoop.hdfs.server.namenode; import java.io.IOException; +import org.apache.hadoop.classification.InterfaceAudience;; -class SaveNamespaceCancelledException extends IOException { +@InterfaceAudience.Private +public class SaveNamespaceCancelledException extends IOException { private static final long serialVersionUID = 1L; SaveNamespaceCancelledException(String cancelReason) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java index 9231f11d8b8..da41917ff99 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java @@ -45,6 +45,7 @@ import org.apache.hadoop.fs.FileSystem; import static org.apache.hadoop.hdfs.DFSConfigKeys.*; import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdfs.HAUtil; import org.apache.hadoop.hdfs.DFSUtil.ErrorSimulator; import org.apache.hadoop.hdfs.HAUtil; import org.apache.hadoop.hdfs.HdfsConfiguration; @@ -117,16 +118,8 @@ public class SecondaryNameNode implements Runnable { private Collection checkpointDirs; private Collection checkpointEditsDirs; - - /** How often to checkpoint regardless of number of txns */ - private long checkpointPeriod; // in seconds - - /** How often to poll the NN to check checkpointTxnCount */ - private long checkpointCheckPeriod; // in seconds - - /** checkpoint once every this many transactions, regardless of time */ - private long checkpointTxnCount; + private CheckpointConf checkpointConf; private FSNamesystem namesystem; @@ -136,9 +129,9 @@ public class SecondaryNameNode implements Runnable { + "\nName Node Address : " + nameNodeAddr + "\nStart Time : " + new Date(starttime) + "\nLast Checkpoint Time : " + (lastCheckpointTime == 0? "--": new Date(lastCheckpointTime)) - + "\nCheckpoint Period : " + checkpointPeriod + " seconds" - + "\nCheckpoint Size : " + StringUtils.byteDesc(checkpointTxnCount) - + " (= " + checkpointTxnCount + " bytes)" + + "\nCheckpoint Period : " + checkpointConf.getPeriod() + " seconds" + + "\nCheckpoint Size : " + StringUtils.byteDesc(checkpointConf.getTxnCount()) + + " (= " + checkpointConf.getTxnCount() + " bytes)" + "\nCheckpoint Dirs : " + checkpointDirs + "\nCheckpoint Edits Dirs: " + checkpointEditsDirs; } @@ -243,16 +236,8 @@ public class SecondaryNameNode implements Runnable { namesystem = new FSNamesystem(conf, checkpointImage); // Initialize other scheduling parameters from the configuration - checkpointCheckPeriod = conf.getLong( - DFS_NAMENODE_CHECKPOINT_CHECK_PERIOD_KEY, - DFS_NAMENODE_CHECKPOINT_CHECK_PERIOD_DEFAULT); - - checkpointPeriod = conf.getLong(DFS_NAMENODE_CHECKPOINT_PERIOD_KEY, - DFS_NAMENODE_CHECKPOINT_PERIOD_DEFAULT); - checkpointTxnCount = conf.getLong(DFS_NAMENODE_CHECKPOINT_TXNS_KEY, - DFS_NAMENODE_CHECKPOINT_TXNS_DEFAULT); - warnForDeprecatedConfigs(conf); - + checkpointConf = new CheckpointConf(conf); + // initialize the webserver for uploading files. // Kerberized SSL servers must be run from the host principal... UserGroupInformation httpUGI = @@ -307,21 +292,9 @@ public class SecondaryNameNode implements Runnable { conf.set(DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY, infoBindAddress + ":" +infoPort); LOG.info("Secondary Web-server up at: " + infoBindAddress + ":" +infoPort); LOG.info("Secondary image servlet up at: " + infoBindAddress + ":" + imagePort); - LOG.info("Checkpoint Period :" + checkpointPeriod + " secs " + - "(" + checkpointPeriod/60 + " min)"); - LOG.info("Log Size Trigger :" + checkpointTxnCount + " txns"); - } - - static void warnForDeprecatedConfigs(Configuration conf) { - for (String key : ImmutableList.of( - "fs.checkpoint.size", - "dfs.namenode.checkpoint.size")) { - if (conf.get(key) != null) { - LOG.warn("Configuration key " + key + " is deprecated! Ignoring..." + - " Instead please specify a value for " + - DFS_NAMENODE_CHECKPOINT_TXNS_KEY); - } - } + LOG.info("Checkpoint Period :" + checkpointConf.getPeriod() + " secs " + + "(" + checkpointConf.getPeriod()/60 + " min)"); + LOG.info("Log Size Trigger :" + checkpointConf.getTxnCount() + " txns"); } /** @@ -372,7 +345,7 @@ public class SecondaryNameNode implements Runnable { // Poll the Namenode (once every checkpointCheckPeriod seconds) to find the // number of transactions in the edit log that haven't yet been checkpointed. // - long period = Math.min(checkpointCheckPeriod, checkpointPeriod); + long period = checkpointConf.getCheckPeriod(); while (shouldRun) { try { @@ -391,7 +364,7 @@ public class SecondaryNameNode implements Runnable { long now = System.currentTimeMillis(); if (shouldCheckpointBasedOnCount() || - now >= lastCheckpointTime + 1000 * checkpointPeriod) { + now >= lastCheckpointTime + 1000 * checkpointConf.getPeriod()) { doCheckpoint(); lastCheckpointTime = now; } @@ -585,13 +558,13 @@ public class SecondaryNameNode implements Runnable { switch (opts.getCommand()) { case CHECKPOINT: long count = countUncheckpointedTxns(); - if (count > checkpointTxnCount || + if (count > checkpointConf.getTxnCount() || opts.shouldForceCheckpoint()) { doCheckpoint(); } else { System.err.println("EditLog size " + count + " transactions is " + "smaller than configured checkpoint " + - "interval " + checkpointTxnCount + " transactions."); + "interval " + checkpointConf.getTxnCount() + " transactions."); System.err.println("Skipping checkpoint."); } break; @@ -637,7 +610,7 @@ public class SecondaryNameNode implements Runnable { } boolean shouldCheckpointBasedOnCount() throws IOException { - return countUncheckpointedTxns() >= checkpointTxnCount; + return countUncheckpointedTxns() >= checkpointConf.getTxnCount(); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java index cc8dccaf1ae..985d85ba981 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java @@ -24,8 +24,11 @@ import java.security.MessageDigest; import java.util.List; import java.lang.Math; +import javax.servlet.http.HttpServletResponse; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType; @@ -41,7 +44,8 @@ import com.google.common.collect.Lists; /** * This class provides fetching a specified file from the NameNode. */ -class TransferFsImage { +@InterfaceAudience.Private +public class TransferFsImage { public final static String CONTENT_LENGTH = "Content-Length"; public final static String MD5_HEADER = "X-MD5-Digest"; @@ -103,7 +107,7 @@ class TransferFsImage { * @param storage the storage directory to transfer the image from * @param txid the transaction ID of the image to be uploaded */ - static void uploadImageFromStorage(String fsName, + public static void uploadImageFromStorage(String fsName, InetSocketAddress imageListenAddress, NNStorage storage, long txid) throws IOException { @@ -111,7 +115,20 @@ class TransferFsImage { txid, imageListenAddress, storage); // this doesn't directly upload an image, but rather asks the NN // to connect back to the 2NN to download the specified image. - TransferFsImage.getFileClient(fsName, fileid, null, null, false); + try { + TransferFsImage.getFileClient(fsName, fileid, null, null, false); + } catch (HttpGetFailedException e) { + if (e.getResponseCode() == HttpServletResponse.SC_CONFLICT) { + // this is OK - this means that a previous attempt to upload + // this checkpoint succeeded even though we thought it failed. + LOG.info("Image upload with txid " + txid + + " conflicted with a previous image upload to the " + + "same NameNode. Continuing...", e); + return; + } else { + throw e; + } + } LOG.info("Uploaded image with txid " + txid + " to namenode at " + fsName); } @@ -194,10 +211,11 @@ class TransferFsImage { HttpURLConnection connection = (HttpURLConnection) url.openConnection(); if (connection.getResponseCode() != HttpURLConnection.HTTP_OK) { - throw new IOException( + throw new HttpGetFailedException( "Image transfer servlet at " + url + " failed with status code " + connection.getResponseCode() + - "\nResponse message:\n" + connection.getResponseMessage()); + "\nResponse message:\n" + connection.getResponseMessage(), + connection); } long advertisedSize; @@ -289,5 +307,19 @@ class TransferFsImage { String header = connection.getHeaderField(MD5_HEADER); return (header != null) ? new MD5Hash(header) : null; } + + public static class HttpGetFailedException extends IOException { + private static final long serialVersionUID = 1L; + private final int responseCode; + + HttpGetFailedException(String msg, HttpURLConnection connection) throws IOException { + super(msg); + this.responseCode = connection.getResponseCode(); + } + + public int getResponseCode() { + return responseCode; + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java index 9bded332d14..53e96a73a32 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java @@ -46,7 +46,6 @@ public class EditLogTailer { private final EditLogTailerThread tailerThread; private final FSNamesystem namesystem; - private final FSImage image; private final FSEditLog editLog; private volatile Throwable lastError = null; @@ -54,7 +53,6 @@ public class EditLogTailer { public EditLogTailer(FSNamesystem namesystem) { this.tailerThread = new EditLogTailerThread(); this.namesystem = namesystem; - this.image = namesystem.getFSImage(); this.editLog = namesystem.getEditLog(); } @@ -106,6 +104,8 @@ public class EditLogTailer { // deadlock. namesystem.writeLockInterruptibly(); try { + FSImage image = namesystem.getFSImage(); + long lastTxnId = image.getLastAppliedTxId(); if (LOG.isDebugEnabled()) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/HAContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/HAContext.java index dce1cfb34a8..6b070b25f54 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/HAContext.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/HAContext.java @@ -3,6 +3,7 @@ package org.apache.hadoop.hdfs.server.namenode.ha; import java.io.IOException; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.ha.ServiceFailedException; import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory; import org.apache.hadoop.ipc.StandbyException; @@ -26,7 +27,10 @@ public interface HAContext { /** Start the services required in standby state */ public void startStandbyServices() throws IOException; - + + /** Prepare to exit the standby state */ + public void prepareToStopStandbyServices() throws ServiceFailedException; + /** Stop the services when exiting standby state */ public void stopStandbyServices() throws IOException; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/HAState.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/HAState.java index 2f0b6ff1a6d..20ea854b461 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/HAState.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/HAState.java @@ -54,6 +54,8 @@ abstract public class HAState { */ protected final void setStateInternal(final HAContext context, final HAState s) throws ServiceFailedException { + prepareToExitState(context); + s.prepareToEnterState(context); context.writeLock(); try { exitState(context); @@ -64,6 +66,18 @@ abstract public class HAState { } } + /** + * Method to be overridden by subclasses to prepare to enter a state. + * This method is called without the context being locked, + * and after {@link #prepareToExitState(HAContext)} has been called + * for the previous state, but before {@link #exitState(HAContext)} + * has been called for the previous state. + * @param context HA context + * @throws ServiceFailedException on precondition failure + */ + public void prepareToEnterState(final HAContext context) + throws ServiceFailedException {} + /** * Method to be overridden by subclasses to perform steps necessary for * entering a state. @@ -73,6 +87,22 @@ abstract public class HAState { public abstract void enterState(final HAContext context) throws ServiceFailedException; + /** + * Method to be overridden by subclasses to prepare to exit a state. + * This method is called without the context being locked. + * This is used by the standby state to cancel any checkpoints + * that are going on. It can also be used to check any preconditions + * for the state transition. + * + * This method should not make any destructuve changes to the state + * (eg stopping threads) since {@link #prepareToEnterState(HAContext)} + * may subsequently cancel the state transition. + * @param context HA context + * @throws ServiceFailedException on precondition failure + */ + public void prepareToExitState(final HAContext context) + throws ServiceFailedException {} + /** * Method to be overridden by subclasses to perform steps necessary for * exiting a state. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyCheckpointer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyCheckpointer.java new file mode 100644 index 00000000000..ee7921db4f8 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyCheckpointer.java @@ -0,0 +1,313 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode.ha; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.security.PrivilegedAction; +import java.util.ArrayList; +import java.util.Collection; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ha.ServiceFailedException; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdfs.server.namenode.CheckpointConf; +import org.apache.hadoop.hdfs.server.namenode.FSImage; +import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; +import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.hdfs.server.namenode.SaveNamespaceCancelledException; +import org.apache.hadoop.hdfs.server.namenode.TransferFsImage; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.UserGroupInformation; +import static org.apache.hadoop.hdfs.server.common.Util.now; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; + +/** + * Thread which runs inside the NN when it's in Standby state, + * periodically waking up to take a checkpoint of the namespace. + * When it takes a checkpoint, it saves it to its local + * storage and then uploads it to the remote NameNode. + */ +@InterfaceAudience.Private +public class StandbyCheckpointer { + private static final Log LOG = LogFactory.getLog(StandbyCheckpointer.class); + private static final long PREVENT_AFTER_CANCEL_MS = 2*60*1000L; + private final CheckpointConf checkpointConf; + private final FSNamesystem namesystem; + private long lastCheckpointTime; + private final CheckpointerThread thread; + private String activeNNAddress; + private InetSocketAddress myNNAddress; + + // Keep track of how many checkpoints were canceled. + // This is for use in tests. + private static int canceledCount = 0; + + public StandbyCheckpointer(Configuration conf, FSNamesystem ns) { + this.namesystem = ns; + this.checkpointConf = new CheckpointConf(conf); + this.thread = new CheckpointerThread(); + + setNameNodeAddresses(conf); + } + + /** + * Determine the address of the NN we are checkpointing + * as well as our own HTTP address from the configuration. + */ + private void setNameNodeAddresses(Configuration conf) { + String nsId = DFSUtil.getNamenodeNameServiceId(conf); + Collection nnIds = DFSUtil.getNameNodeIds(conf, nsId); + String myNNId = conf.get(DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY); + Preconditions.checkArgument(nnIds != null, + "Could not determine namenode ids in namespace '%s'", + nsId); + Preconditions.checkArgument(nnIds.size() == 2, + "Expected exactly 2 NameNodes in this namespace. Instead, got: '%s'", + Joiner.on("','").join(nnIds)); + Preconditions.checkState(myNNId != null && !myNNId.isEmpty(), + "Could not determine own NN ID"); + + ArrayList nnSet = Lists.newArrayList(nnIds); + nnSet.remove(myNNId); + assert nnSet.size() == 1; + String activeNN = nnSet.get(0); + + // Look up the address of the active NN. + Configuration confForActive = new Configuration(conf); + NameNode.initializeGenericKeys(confForActive, nsId, activeNN); + activeNNAddress = DFSUtil.getInfoServer(null, confForActive, true); + + // Look up our own address. + String myAddrString = DFSUtil.getInfoServer(null, conf, true); + + // Sanity-check. + Preconditions.checkArgument(checkAddress(activeNNAddress), + "Bad address for active NN: %s", activeNNAddress); + Preconditions.checkArgument(checkAddress(activeNNAddress), + "Bad address for standby NN: %s", myNNAddress); + + myNNAddress = NetUtils.createSocketAddr(myAddrString); + } + + /** + * Ensure that the given address is valid and has a port + * specified. + */ + private boolean checkAddress(String addrStr) { + InetSocketAddress addr = NetUtils.createSocketAddr(addrStr); + return addr.getPort() != 0; + } + + public void start() { + LOG.info("Starting standby checkpoint thread...\n" + + "Checkpointing active NN at " + activeNNAddress + "\n" + + "Serving checkpoints at " + myNNAddress); + thread.start(); + } + + public void stop() throws IOException { + thread.setShouldRun(false); + thread.interrupt(); + try { + thread.join(); + } catch (InterruptedException e) { + LOG.warn("Edit log tailer thread exited with an exception"); + throw new IOException(e); + } + } + + private void doCheckpoint() throws InterruptedException, IOException { + long txid; + + namesystem.writeLockInterruptibly(); + try { + assert namesystem.getEditLog().isOpenForRead() : + "Standby Checkpointer should only attempt a checkpoint when " + + "NN is in standby mode, but the edit logs are in an unexpected state"; + + FSImage img = namesystem.getFSImage(); + + long prevCheckpointTxId = img.getStorage().getMostRecentCheckpointTxId(); + long thisCheckpointTxId = img.getLastAppliedOrWrittenTxId(); + assert thisCheckpointTxId >= prevCheckpointTxId; + if (thisCheckpointTxId == prevCheckpointTxId) { + LOG.info("A checkpoint was triggered but the Standby Node has not " + + "received any transactions since the last checkpoint at txid " + + thisCheckpointTxId + ". Skipping..."); + return; + } + + img.saveNamespace(namesystem); + txid = img.getStorage().getMostRecentCheckpointTxId(); + assert txid == thisCheckpointTxId : "expected to save checkpoint at txid=" + + thisCheckpointTxId + " but instead saved at txid=" + txid; + } finally { + namesystem.writeUnlock(); + } + + // Upload the saved checkpoint back to the active + TransferFsImage.uploadImageFromStorage( + activeNNAddress, myNNAddress, + namesystem.getFSImage().getStorage(), txid); + } + + /** + * Cancel any checkpoint that's currently being made, + * and prevent any new checkpoints from starting for the next + * minute or so. + */ + public void cancelAndPreventCheckpoints() throws ServiceFailedException { + try { + thread.preventCheckpointsFor(PREVENT_AFTER_CANCEL_MS); + // TODO: there is a really narrow race here if we are just + // about to start a checkpoint - this won't cancel it! + namesystem.getFSImage().cancelSaveNamespace( + "About to exit standby state"); + } catch (InterruptedException e) { + throw new ServiceFailedException( + "Interrupted while trying to cancel checkpoint"); + } + } + + @VisibleForTesting + static int getCanceledCount() { + return canceledCount; + } + + private long countUncheckpointedTxns() { + FSImage img = namesystem.getFSImage(); + return img.getLastAppliedOrWrittenTxId() - + img.getStorage().getMostRecentCheckpointTxId(); + } + + private class CheckpointerThread extends Thread { + private volatile boolean shouldRun = true; + private volatile long preventCheckpointsUntil = 0; + + private CheckpointerThread() { + super("Standby State Checkpointer"); + } + + private void setShouldRun(boolean shouldRun) { + this.shouldRun = shouldRun; + } + + @Override + public void run() { + // We have to make sure we're logged in as far as JAAS + // is concerned, in order to use kerberized SSL properly. + // This code copied from SecondaryNameNode - TODO: refactor + // to a utility function. + if (UserGroupInformation.isSecurityEnabled()) { + UserGroupInformation ugi = null; + try { + ugi = UserGroupInformation.getLoginUser(); + } catch (IOException e) { + LOG.error("Exception while getting login user", e); + Runtime.getRuntime().exit(-1); + } + ugi.doAs(new PrivilegedAction() { + @Override + public Object run() { + doWork(); + return null; + } + }); + } else { + doWork(); + } + } + + /** + * Prevent checkpoints from occurring for some time period + * in the future. This is used when preparing to enter active + * mode. We need to not only cancel any concurrent checkpoint, + * but also prevent any checkpoints from racing to start just + * after the cancel call. + * + * @param delayMs the number of MS for which checkpoints will be + * prevented + */ + private void preventCheckpointsFor(long delayMs) { + preventCheckpointsUntil = now() + delayMs; + } + + private void doWork() { + // Reset checkpoint time so that we don't always checkpoint + // on startup. + lastCheckpointTime = now(); + while (shouldRun) { + try { + Thread.sleep(1000 * checkpointConf.getCheckPeriod()); + } catch (InterruptedException ie) { + } + if (!shouldRun) { + break; + } + try { + // We may have lost our ticket since last checkpoint, log in again, just in case + if (UserGroupInformation.isSecurityEnabled()) { + UserGroupInformation.getCurrentUser().reloginFromKeytab(); + } + + long now = now(); + long uncheckpointed = countUncheckpointedTxns(); + long secsSinceLast = (now - lastCheckpointTime)/1000; + + boolean needCheckpoint = false; + if (uncheckpointed >= checkpointConf.getTxnCount()) { + LOG.info("Triggering checkpoint because there have been " + + uncheckpointed + " txns since the last checkpoint, which " + + "exceeds the configured threshold " + + checkpointConf.getTxnCount()); + needCheckpoint = true; + } else if (secsSinceLast >= checkpointConf.getPeriod()) { + LOG.info("Triggering checkpoint because it has been " + + secsSinceLast + " seconds since the last checkpoint, which " + + "exceeds the configured interval " + checkpointConf.getPeriod()); + needCheckpoint = true; + } + if (needCheckpoint && now < preventCheckpointsUntil) { + LOG.info("But skipping this checkpoint since we are about to failover!"); + canceledCount++; + } else if (needCheckpoint) { + doCheckpoint(); + lastCheckpointTime = now; + } + } catch (SaveNamespaceCancelledException ce) { + LOG.info("Checkpoint was cancelled: " + ce.getMessage()); + canceledCount++; + } catch (InterruptedException ie) { + // Probably requested shutdown. + continue; + } catch (Throwable t) { + LOG.error("Exception in doCheckpoint", t); + } + } + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyState.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyState.java index ec0dcec9964..80f42e60fea 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyState.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyState.java @@ -61,6 +61,11 @@ public class StandbyState extends HAState { } } + @Override + public void prepareToExitState(HAContext context) throws ServiceFailedException { + context.prepareToStopStandbyServices(); + } + @Override public void exitState(HAContext context) throws ServiceFailedException { try { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java index 9b6328374a2..6b800a9637f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java @@ -538,6 +538,16 @@ public class MiniDFSCluster { conf.setClass(NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, StaticMapping.class, DNSToSwitchMapping.class); + // In an HA cluster, in order for the StandbyNode to perform checkpoints, + // it needs to know the HTTP port of the Active. So, if ephemeral ports + // are chosen, disable checkpoints for the test. + if (!nnTopology.allHttpPortsSpecified() && + nnTopology.isHA()) { + LOG.info("MiniDFSCluster disabling checkpointing in the Standby node " + + "since no HTTP ports have been specified."); + conf.setBoolean(DFS_HA_STANDBY_CHECKPOINTS_KEY, false); + } + federation = nnTopology.isFederated(); createNameNodesAndSetConf( nnTopology, manageNameDfsDirs, format, operation, clusterId, conf); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSNNTopology.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSNNTopology.java index 407ec8f5d10..fc9bb64f9ed 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSNNTopology.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSNNTopology.java @@ -106,6 +106,34 @@ public class MiniDFSNNTopology { public boolean isFederated() { return nameservices.size() > 1 || federation; } + + /** + * @return true if at least one of the nameservices + * in the topology has HA enabled. + */ + public boolean isHA() { + for (NSConf ns : nameservices) { + if (ns.getNNs().size() > 1) { + return true; + } + } + return false; + } + + /** + * @return true if all of the NNs in the cluster have their HTTP + * port specified to be non-ephemeral. + */ + public boolean allHttpPortsSpecified() { + for (NSConf ns : nameservices) { + for (NNConf nn : ns.getNNs()) { + if (nn.getHttpPort() == 0) { + return false; + } + } + } + return true; + } public List getNameservices() { return nameservices; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSImageTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSImageTestUtil.java index 9e9af7af617..2e4e932b386 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSImageTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSImageTestUtil.java @@ -195,9 +195,10 @@ public abstract class FSImageTestUtil { * Create an aborted in-progress log in the given directory, containing * only a specified number of "mkdirs" operations. */ - public static void createAbortedLogWithMkdirs(File editsLogDir, int numDirs) - throws IOException { + public static void createAbortedLogWithMkdirs(File editsLogDir, int numDirs, + long firstTxId) throws IOException { FSEditLog editLog = FSImageTestUtil.createStandaloneEditLog(editsLogDir); + editLog.setNextTxId(firstTxId); editLog.openForWrite(); PermissionStatus perms = PermissionStatus.createImmutable("fakeuser", "fakegroup", @@ -399,10 +400,15 @@ public abstract class FSImageTestUtil { * Assert that the NameNode has checkpoints at the expected * transaction IDs. */ - static void assertNNHasCheckpoints(MiniDFSCluster cluster, + public static void assertNNHasCheckpoints(MiniDFSCluster cluster, List txids) { + assertNNHasCheckpoints(cluster, 0, txids); + } + + public static void assertNNHasCheckpoints(MiniDFSCluster cluster, + int nnIdx, List txids) { - for (File nameDir : getNameNodeCurrentDirs(cluster)) { + for (File nameDir : getNameNodeCurrentDirs(cluster, nnIdx)) { // Should have fsimage_N for the three checkpoints for (long checkpointTxId : txids) { File image = new File(nameDir, @@ -412,9 +418,9 @@ public abstract class FSImageTestUtil { } } - static List getNameNodeCurrentDirs(MiniDFSCluster cluster) { + public static List getNameNodeCurrentDirs(MiniDFSCluster cluster, int nnIdx) { List nameDirs = Lists.newArrayList(); - for (URI u : cluster.getNameDirs(0)) { + for (URI u : cluster.getNameDirs(nnIdx)) { nameDirs.add(new File(u.getPath(), "current")); } return nameDirs; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java index 551588425b4..8223e7c60c1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse; import org.apache.hadoop.ipc.Server; import org.apache.hadoop.ipc.StandbyException; import org.apache.hadoop.security.AccessControlException; +import org.apache.hadoop.test.GenericTestUtils; import org.mockito.Mockito; /** @@ -149,4 +150,10 @@ public class NameNodeAdapter { fsn.setFsLockForTests(spy); return spy; } + + public static FSImage spyOnFsImage(NameNode nn1) { + FSImage spy = Mockito.spy(nn1.getNamesystem().dir.fsImage); + nn1.getNamesystem().dir.fsImage = spy; + return spy; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java index f40a89e8491..54550164b88 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java @@ -1339,17 +1339,11 @@ public class TestCheckpoint extends TestCase { // Let the first one finish delayer.proceed(); - // Letting the first node continue should catch an exception + // Letting the first node continue, it should try to upload the + // same image, and gracefully ignore it, while logging an + // error message. checkpointThread.join(); - try { - checkpointThread.propagateExceptions(); - fail("Didn't throw!"); - } catch (Exception ioe) { - assertTrue("Unexpected exception: " + - StringUtils.stringifyException(ioe), - ioe.toString().contains("Another checkpointer already uploaded")); - LOG.info("Caught expected exception", ioe); - } + checkpointThread.propagateExceptions(); // primary should still consider fsimage_4 the latest assertEquals(4, storage.getMostRecentCheckpointTxId()); @@ -1791,7 +1785,7 @@ public class TestCheckpoint extends TestCase { private void assertParallelFilesInvariant(MiniDFSCluster cluster, ImmutableList secondaries) throws Exception { List allCurrentDirs = Lists.newArrayList(); - allCurrentDirs.addAll(getNameNodeCurrentDirs(cluster)); + allCurrentDirs.addAll(getNameNodeCurrentDirs(cluster, 0)); for (SecondaryNameNode snn : secondaries) { allCurrentDirs.addAll(getCheckpointCurrentDirs(snn)); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogsDuringFailover.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogsDuringFailover.java index 952df211a74..a245301dd90 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogsDuringFailover.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogsDuringFailover.java @@ -129,7 +129,7 @@ public class TestEditLogsDuringFailover { // Create a fake in-progress edit-log in the shared directory URI sharedUri = cluster.getSharedEditsDir(0, 1); File sharedDir = new File(sharedUri.getPath(), "current"); - FSImageTestUtil.createAbortedLogWithMkdirs(sharedDir, NUM_DIRS_IN_LOG); + FSImageTestUtil.createAbortedLogWithMkdirs(sharedDir, NUM_DIRS_IN_LOG, 1); assertEditFiles(Collections.singletonList(sharedUri), NNStorage.getInProgressEditsFileName(1)); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyCheckpoints.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyCheckpoints.java new file mode 100644 index 00000000000..905dd03c60d --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyCheckpoints.java @@ -0,0 +1,240 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode.ha; + +import static org.junit.Assert.*; + +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.MiniDFSNNTopology; +import org.apache.hadoop.hdfs.TestDFSClientFailover; +import org.apache.hadoop.hdfs.server.namenode.FSImage; +import org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil; +import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; +import org.apache.hadoop.hdfs.server.namenode.NNStorage; +import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; + + +public class TestStandbyCheckpoints { + private static final int NUM_DIRS_IN_LOG = 200000; + private MiniDFSCluster cluster; + private NameNode nn0, nn1; + private FileSystem fs; + + @Before + public void setupCluster() throws Exception { + Configuration conf = new Configuration(); + conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_CHECK_PERIOD_KEY, 1); + conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_KEY, 5); + + MiniDFSNNTopology topology = new MiniDFSNNTopology() + .addNameservice(new MiniDFSNNTopology.NSConf(null) + .addNN(new MiniDFSNNTopology.NNConf("nn1").setHttpPort(10001)) + .addNN(new MiniDFSNNTopology.NNConf("nn2").setHttpPort(10002))); + + cluster = new MiniDFSCluster.Builder(conf) + .nnTopology(topology) + .numDataNodes(0) + .build(); + cluster.waitActive(); + + nn0 = cluster.getNameNode(0); + nn1 = cluster.getNameNode(1); + fs = TestDFSClientFailover.configureFailoverFs(cluster, conf); + + nn1.getNamesystem().getEditLogTailer().setSleepTime(250); + nn1.getNamesystem().getEditLogTailer().interrupt(); + + cluster.transitionToActive(0); + } + + @After + public void shutdownCluster() throws IOException { + if (cluster != null) { + cluster.shutdown(); + } + } + + @Test + public void testSBNCheckpoints() throws Exception { + doEdits(0, 10); + + TestEditLogTailer.waitForStandbyToCatchUp(nn0, nn1); + // Once the standby catches up, it should notice that it needs to + // do a checkpoint and save one to its local directories. + waitForCheckpoint(1, ImmutableList.of(0, 12)); + + // It should also upload it back to the active. + waitForCheckpoint(0, ImmutableList.of(0, 12)); + } + + /** + * Test for the case when both of the NNs in the cluster are + * in the standby state, and thus are both creating checkpoints + * and uploading them to each other. + * In this circumstance, they should receive the error from the + * other node indicating that the other node already has a + * checkpoint for the given txid, but this should not cause + * an abort, etc. + */ + @Test + public void testBothNodesInStandbyState() throws Exception { + doEdits(0, 10); + + cluster.transitionToStandby(0); + + // Transitioning to standby closed the edit log on the active, + // so the standby will catch up. Then, both will be in standby mode + // with enough uncheckpointed txns to cause a checkpoint, and they + // will each try to take a checkpoint and upload to each other. + waitForCheckpoint(1, ImmutableList.of(0, 12)); + waitForCheckpoint(0, ImmutableList.of(0, 12)); + + assertEquals(12, nn0.getNamesystem().getFSImage().getStorage() + .getMostRecentCheckpointTxId()); + assertEquals(12, nn1.getNamesystem().getFSImage().getStorage() + .getMostRecentCheckpointTxId()); + + List dirs = Lists.newArrayList(); + dirs.addAll(FSImageTestUtil.getNameNodeCurrentDirs(cluster, 0)); + dirs.addAll(FSImageTestUtil.getNameNodeCurrentDirs(cluster, 1)); + // TODO: this failed once because it caught a ckpt file -- maybe + // this is possible if one of the NNs is really fast and the other is slow? + // need to loop this to suss out the race. + FSImageTestUtil.assertParallelFilesAreIdentical(dirs, ImmutableSet.of()); + } + + /** + * Test for the case when the SBN is configured to checkpoint based + * on a time period, but no transactions are happening on the + * active. Thus, it would want to save a second checkpoint at the + * same txid, which is a no-op. This test makes sure this doesn't + * cause any problem. + */ + @Test + public void testCheckpointWhenNoNewTransactionsHappened() + throws Exception { + // Checkpoint as fast as we can, in a tight loop. + cluster.getConfiguration(1).setInt( + DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_PERIOD_KEY, 0); + cluster.restartNameNode(1); + nn1 = cluster.getNameNode(1); + nn1.getNamesystem().getEditLogTailer().setSleepTime(250); + nn1.getNamesystem().getEditLogTailer().interrupt(); + + FSImage spyImage1 = NameNodeAdapter.spyOnFsImage(nn1); + + // We shouldn't save any checkpoints at txid=0 + Thread.sleep(1000); + Mockito.verify(spyImage1, Mockito.never()) + .saveNamespace((FSNamesystem) Mockito.anyObject()); + + // Roll the primary and wait for the standby to catch up + TestEditLogTailer.waitForStandbyToCatchUp(nn0, nn1); + Thread.sleep(2000); + + // We should make exactly one checkpoint at this new txid. + Mockito.verify(spyImage1, Mockito.times(1)) + .saveNamespace((FSNamesystem) Mockito.anyObject()); + } + + /** + * Test cancellation of ongoing checkpoints when failover happens + * mid-checkpoint. + */ + @Test + public void testCheckpointCancellation() throws Exception { + cluster.transitionToStandby(0); + + // Create an edit log in the shared edits dir with a lot + // of mkdirs operations. This is solely so that the image is + // large enough to take a non-trivial amount of time to load. + // (only ~15MB) + URI sharedUri = cluster.getSharedEditsDir(0, 1); + File sharedDir = new File(sharedUri.getPath(), "current"); + File tmpDir = new File(MiniDFSCluster.getBaseDirectory(), + "testCheckpointCancellation-tmp"); + FSImageTestUtil.createAbortedLogWithMkdirs(tmpDir, NUM_DIRS_IN_LOG, + 3); + String fname = NNStorage.getInProgressEditsFileName(3); + new File(tmpDir, fname).renameTo(new File(sharedDir, fname)); + + // Checkpoint as fast as we can, in a tight loop. + cluster.getConfiguration(1).setInt( + DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_PERIOD_KEY, 0); + cluster.restartNameNode(1); + nn1 = cluster.getNameNode(1); + nn1.getNamesystem().getEditLogTailer().setSleepTime(250); + nn1.getNamesystem().getEditLogTailer().interrupt(); + + cluster.transitionToActive(0); + + for (int i = 0; i < 10; i++) { + + doEdits(i*10, i*10 + 10); + cluster.transitionToStandby(0); + cluster.transitionToActive(1); + cluster.transitionToStandby(1); + cluster.transitionToActive(0); + } + + assertTrue(StandbyCheckpointer.getCanceledCount() > 0); + } + + + private void doEdits(int start, int stop) throws IOException { + for (int i = start; i < stop; i++) { + Path p = new Path("/test" + i); + fs.mkdirs(p); + } + } + + private void waitForCheckpoint(int nnIdx, List txids) + throws InterruptedException { + long start = System.currentTimeMillis(); + while (true) { + try { + FSImageTestUtil.assertNNHasCheckpoints(cluster, nnIdx, txids); + return; + } catch (AssertionError err) { + if (System.currentTimeMillis() - start > 10000) { + throw err; + } else { + Thread.sleep(300); + } + } + } + } +}