HDFS-2291. Allow the StandbyNode to make checkpoints in an HA setup. Contributed by Todd Lipcon.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-1623@1227411 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2012-01-05 00:22:54 +00:00
parent d004ddee76
commit 5b8dcb20a2
25 changed files with 864 additions and 97 deletions

View File

@ -83,3 +83,5 @@ HDFS-2692. Fix bugs related to failover from/into safe mode. (todd)
HDFS-2716. Configuration needs to allow different dfs.http.addresses for each HA NN (todd)
HDFS-2720. Fix MiniDFSCluster HA support to work properly on Windows. (Uma Maheswara Rao G via todd)
HDFS-2291. Allow the StandbyNode to make checkpoints in an HA setup. (todd)

View File

@ -331,4 +331,6 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
// HA related configuration
public static final String DFS_HA_NAMENODES_KEY = "dfs.ha.namenodes";
public static final String DFS_HA_NAMENODE_ID_KEY = "dfs.ha.namenode.id";
public static final String DFS_HA_STANDBY_CHECKPOINTS_KEY = "dfs.ha.standby.checkpoints";
public static final boolean DFS_HA_STANDBY_CHECKPOINTS_DEFAULT = true;
}

View File

@ -0,0 +1,78 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import com.google.common.collect.ImmutableList;
@InterfaceAudience.Private
public class CheckpointConf {
private static final Log LOG = LogFactory.getLog(CheckpointConf.class);
/** How often to checkpoint regardless of number of txns */
private final long checkpointPeriod; // in seconds
/** How often to poll the NN to check checkpointTxnCount */
private final long checkpointCheckPeriod; // in seconds
/** checkpoint once every this many transactions, regardless of time */
private final long checkpointTxnCount;
public CheckpointConf(Configuration conf) {
checkpointCheckPeriod = conf.getLong(
DFS_NAMENODE_CHECKPOINT_CHECK_PERIOD_KEY,
DFS_NAMENODE_CHECKPOINT_CHECK_PERIOD_DEFAULT);
checkpointPeriod = conf.getLong(DFS_NAMENODE_CHECKPOINT_PERIOD_KEY,
DFS_NAMENODE_CHECKPOINT_PERIOD_DEFAULT);
checkpointTxnCount = conf.getLong(DFS_NAMENODE_CHECKPOINT_TXNS_KEY,
DFS_NAMENODE_CHECKPOINT_TXNS_DEFAULT);
warnForDeprecatedConfigs(conf);
}
private static void warnForDeprecatedConfigs(Configuration conf) {
for (String key : ImmutableList.of(
"fs.checkpoint.size",
"dfs.namenode.checkpoint.size")) {
if (conf.get(key) != null) {
LOG.warn("Configuration key " + key + " is deprecated! Ignoring..." +
" Instead please specify a value for " +
DFS_NAMENODE_CHECKPOINT_TXNS_KEY);
}
}
}
public long getPeriod() {
return checkpointPeriod;
}
public long getCheckPeriod() {
return Math.min(checkpointCheckPeriod, checkpointPeriod);
}
public long getTxnCount() {
return checkpointTxnCount;
}
}

View File

@ -29,7 +29,6 @@ import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
import org.apache.hadoop.hdfs.server.protocol.CheckpointCommand;
import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
@ -58,17 +57,16 @@ class Checkpointer extends Daemon {
private BackupNode backupNode;
volatile boolean shouldRun;
private long checkpointPeriod; // in seconds
// Transactions count to trigger the checkpoint
private long checkpointTxnCount;
private String infoBindAddress;
private CheckpointConf checkpointConf;
private BackupImage getFSImage() {
return (BackupImage)backupNode.getFSImage();
}
private NamenodeProtocol getNamenode(){
private NamenodeProtocol getRemoteNamenodeProxy(){
return backupNode.namenode;
}
@ -89,26 +87,24 @@ class Checkpointer extends Daemon {
/**
* Initialize checkpoint.
*/
@SuppressWarnings("deprecation")
private void initialize(Configuration conf) throws IOException {
// Create connection to the namenode.
shouldRun = true;
// Initialize other scheduling parameters from the configuration
checkpointPeriod = conf.getLong(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_PERIOD_KEY,
DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_PERIOD_DEFAULT);
checkpointTxnCount = conf.getLong(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_KEY,
DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_DEFAULT);
SecondaryNameNode.warnForDeprecatedConfigs(conf);
checkpointConf = new CheckpointConf(conf);
// Pull out exact http address for posting url to avoid ip aliasing issues
String fullInfoAddr = conf.get(DFS_NAMENODE_BACKUP_HTTP_ADDRESS_KEY,
DFS_NAMENODE_BACKUP_HTTP_ADDRESS_DEFAULT);
infoBindAddress = fullInfoAddr.substring(0, fullInfoAddr.indexOf(":"));
LOG.info("Checkpoint Period : " + checkpointPeriod + " secs " +
"(" + checkpointPeriod/60 + " min)");
LOG.info("Transactions count is : " + checkpointTxnCount + ", to trigger checkpoint");
LOG.info("Checkpoint Period : " +
checkpointConf.getPeriod() + " secs " +
"(" + checkpointConf.getPeriod()/60 + " min)");
LOG.info("Transactions count is : " +
checkpointConf.getTxnCount() +
", to trigger checkpoint");
}
/**
@ -125,8 +121,8 @@ class Checkpointer extends Daemon {
public void run() {
// Check the size of the edit log once every 5 minutes.
long periodMSec = 5 * 60; // 5 minutes
if(checkpointPeriod < periodMSec) {
periodMSec = checkpointPeriod;
if(checkpointConf.getPeriod() < periodMSec) {
periodMSec = checkpointConf.getPeriod();
}
periodMSec *= 1000;
@ -142,7 +138,7 @@ class Checkpointer extends Daemon {
shouldCheckpoint = true;
} else {
long txns = countUncheckpointedTxns();
if(txns >= checkpointTxnCount)
if(txns >= checkpointConf.getTxnCount())
shouldCheckpoint = true;
}
if(shouldCheckpoint) {
@ -165,7 +161,7 @@ class Checkpointer extends Daemon {
}
private long countUncheckpointedTxns() throws IOException {
long curTxId = getNamenode().getTransactionID();
long curTxId = getRemoteNamenodeProxy().getTransactionID();
long uncheckpointedTxns = curTxId -
getFSImage().getStorage().getMostRecentCheckpointTxId();
assert uncheckpointedTxns >= 0;
@ -183,7 +179,7 @@ class Checkpointer extends Daemon {
bnImage.freezeNamespaceAtNextRoll();
NamenodeCommand cmd =
getNamenode().startCheckpoint(backupNode.getRegistration());
getRemoteNamenodeProxy().startCheckpoint(backupNode.getRegistration());
CheckpointCommand cpCmd = null;
switch(cmd.getAction()) {
case NamenodeProtocol.ACT_SHUTDOWN:
@ -207,7 +203,7 @@ class Checkpointer extends Daemon {
long lastApplied = bnImage.getLastAppliedTxId();
LOG.debug("Doing checkpoint. Last applied: " + lastApplied);
RemoteEditLogManifest manifest =
getNamenode().getEditLogManifest(bnImage.getLastAppliedTxId() + 1);
getRemoteNamenodeProxy().getEditLogManifest(bnImage.getLastAppliedTxId() + 1);
if (!manifest.getLogs().isEmpty()) {
RemoteEditLog firstRemoteLog = manifest.getLogs().get(0);
@ -260,7 +256,7 @@ class Checkpointer extends Daemon {
bnStorage, txid);
}
getNamenode().endCheckpoint(backupNode.getRegistration(), sig);
getRemoteNamenodeProxy().endCheckpoint(backupNode.getRegistration(), sig);
if (backupNode.getRole() == NamenodeRole.BACKUP) {
bnImage.convergeJournalSpool();

View File

@ -285,7 +285,7 @@ public class FSEditLog {
/**
* @return true if the log is open in read mode.
*/
synchronized boolean isOpenForRead() {
public synchronized boolean isOpenForRead() {
return state == State.OPEN_FOR_READING;
}

View File

@ -808,7 +808,7 @@ public class FSImage implements Closeable {
* Save the contents of the FS image to a new image file in each of the
* current storage directories.
*/
synchronized void saveNamespace(FSNamesystem source) throws IOException {
public synchronized void saveNamespace(FSNamesystem source) throws IOException {
assert editLog != null : "editLog must be initialized";
storage.attemptRestoreRemovedStorage();
@ -817,7 +817,7 @@ public class FSImage implements Closeable {
if (editLogWasOpen) {
editLog.endCurrentLogSegment(true);
}
long imageTxId = editLog.getLastWrittenTxId();
long imageTxId = getLastAppliedOrWrittenTxId();
try {
saveFSImageInAllDirs(source, imageTxId);
storage.writeAll();
@ -834,7 +834,7 @@ public class FSImage implements Closeable {
}
void cancelSaveNamespace(String reason)
public void cancelSaveNamespace(String reason)
throws InterruptedException {
SaveNamespaceContext ctx = curSaveNamespaceContext;
if (ctx != null) {

View File

@ -47,6 +47,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_MIN_DAT
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_MIN_DATANODES_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_STANDBY_CHECKPOINTS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_STANDBY_CHECKPOINTS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_UPGRADE_PERMISSION_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_UPGRADE_PERMISSION_KEY;
@ -112,6 +114,7 @@ import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.ha.ServiceFailedException;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
@ -159,6 +162,7 @@ import org.apache.hadoop.hdfs.server.namenode.ha.ActiveState;
import org.apache.hadoop.hdfs.server.namenode.ha.EditLogTailer;
import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
import org.apache.hadoop.hdfs.server.namenode.ha.HAState;
import org.apache.hadoop.hdfs.server.namenode.ha.StandbyCheckpointer;
import org.apache.hadoop.hdfs.server.namenode.ha.StandbyState;
import org.apache.hadoop.hdfs.server.namenode.metrics.FSNamesystemMBean;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
@ -261,6 +265,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
private UserGroupInformation fsOwner;
private String supergroup;
private PermissionStatus defaultPermission;
private boolean standbyShouldCheckpoint;
// Scan interval is not configurable.
private static final long DELEGATION_TOKEN_REMOVER_SCAN_INTERVAL =
@ -321,12 +326,19 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
*/
private EditLogTailer editLogTailer = null;
/**
* Used when this NN is in standby state to perform checkpoints.
*/
private StandbyCheckpointer standbyCheckpointer;
/**
* Reference to the NN's HAContext object. This is only set once
* {@link #startCommonServices(Configuration, HAContext)} is called.
*/
private HAContext haContext;
private final Configuration conf;
PendingDataNodeMessages getPendingDataNodeMessages() {
return pendingDatanodeMessages;
}
@ -381,6 +393,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
* @throws IOException on bad configuration
*/
FSNamesystem(Configuration conf, FSImage fsImage) throws IOException {
this.conf = conf;
try {
initialize(conf, fsImage);
} catch(IOException e) {
@ -568,11 +581,30 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
}
editLogTailer = new EditLogTailer(this);
editLogTailer.start();
if (standbyShouldCheckpoint) {
standbyCheckpointer = new StandbyCheckpointer(conf, this);
standbyCheckpointer.start();
}
}
/**
* Called while the NN is in Standby state, but just about to be
* asked to enter Active state. This cancels any checkpoints
* currently being taken.
*/
void prepareToStopStandbyServices() throws ServiceFailedException {
if (standbyCheckpointer != null) {
standbyCheckpointer.cancelAndPreventCheckpoints();
}
}
/** Stop services required in standby state */
void stopStandbyServices() throws IOException {
LOG.info("Stopping services started for standby state");
if (standbyCheckpointer != null) {
standbyCheckpointer.stop();
}
if (editLogTailer != null) {
editLogTailer.stop();
}
@ -728,6 +760,10 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
DFS_SUPPORT_APPEND_DEFAULT);
this.dtpReplaceDatanodeOnFailure = ReplaceDatanodeOnFailure.get(conf);
this.standbyShouldCheckpoint = conf.getBoolean(
DFS_HA_STANDBY_CHECKPOINTS_KEY,
DFS_HA_STANDBY_CHECKPOINTS_DEFAULT);
}
/**

View File

@ -124,16 +124,18 @@ public class GetImageServlet extends HttpServlet {
final long txid = parsedParams.getTxId();
if (! currentlyDownloadingCheckpoints.add(txid)) {
throw new IOException(
response.sendError(HttpServletResponse.SC_CONFLICT,
"Another checkpointer is already in the process of uploading a" +
" checkpoint made at transaction ID " + txid);
return null;
}
try {
if (nnImage.getStorage().findImageFile(txid) != null) {
throw new IOException(
response.sendError(HttpServletResponse.SC_CONFLICT,
"Another checkpointer already uploaded an checkpoint " +
"for txid " + txid);
return null;
}
// issue a HTTP get request to download the new fsimage

View File

@ -463,7 +463,7 @@ public class NNStorage extends Storage implements Closeable {
/**
* Return the transaction ID of the last checkpoint.
*/
long getMostRecentCheckpointTxId() {
public long getMostRecentCheckpointTxId() {
return mostRecentCheckpointTxId;
}

View File

@ -543,6 +543,7 @@ public class NameNode {
} else {
state = STANDBY_STATE;;
}
state.prepareToEnterState(haContext);
state.enterState(haContext);
} catch (IOException e) {
this.stop();
@ -965,6 +966,11 @@ public class NameNode {
namesystem.startStandbyServices();
}
@Override
public void prepareToStopStandbyServices() throws ServiceFailedException {
namesystem.prepareToStopStandbyServices();
}
@Override
public void stopStandbyServices() throws IOException {
// TODO(HA): Are we guaranteed to be the only active here?
@ -992,6 +998,7 @@ public class NameNode {
public boolean allowStaleReads() {
return allowStaleStandbyReads;
}
}
public boolean isStandbyState() {

View File

@ -18,8 +18,10 @@
package org.apache.hadoop.hdfs.server.namenode;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;;
class SaveNamespaceCancelledException extends IOException {
@InterfaceAudience.Private
public class SaveNamespaceCancelledException extends IOException {
private static final long serialVersionUID = 1L;
SaveNamespaceCancelledException(String cancelReason) {

View File

@ -45,6 +45,7 @@ import org.apache.hadoop.fs.FileSystem;
import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.hdfs.DFSUtil.ErrorSimulator;
import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
@ -118,15 +119,7 @@ public class SecondaryNameNode implements Runnable {
private Collection<URI> checkpointDirs;
private Collection<URI> checkpointEditsDirs;
/** How often to checkpoint regardless of number of txns */
private long checkpointPeriod; // in seconds
/** How often to poll the NN to check checkpointTxnCount */
private long checkpointCheckPeriod; // in seconds
/** checkpoint once every this many transactions, regardless of time */
private long checkpointTxnCount;
private CheckpointConf checkpointConf;
private FSNamesystem namesystem;
@ -136,9 +129,9 @@ public class SecondaryNameNode implements Runnable {
+ "\nName Node Address : " + nameNodeAddr
+ "\nStart Time : " + new Date(starttime)
+ "\nLast Checkpoint Time : " + (lastCheckpointTime == 0? "--": new Date(lastCheckpointTime))
+ "\nCheckpoint Period : " + checkpointPeriod + " seconds"
+ "\nCheckpoint Size : " + StringUtils.byteDesc(checkpointTxnCount)
+ " (= " + checkpointTxnCount + " bytes)"
+ "\nCheckpoint Period : " + checkpointConf.getPeriod() + " seconds"
+ "\nCheckpoint Size : " + StringUtils.byteDesc(checkpointConf.getTxnCount())
+ " (= " + checkpointConf.getTxnCount() + " bytes)"
+ "\nCheckpoint Dirs : " + checkpointDirs
+ "\nCheckpoint Edits Dirs: " + checkpointEditsDirs;
}
@ -243,15 +236,7 @@ public class SecondaryNameNode implements Runnable {
namesystem = new FSNamesystem(conf, checkpointImage);
// Initialize other scheduling parameters from the configuration
checkpointCheckPeriod = conf.getLong(
DFS_NAMENODE_CHECKPOINT_CHECK_PERIOD_KEY,
DFS_NAMENODE_CHECKPOINT_CHECK_PERIOD_DEFAULT);
checkpointPeriod = conf.getLong(DFS_NAMENODE_CHECKPOINT_PERIOD_KEY,
DFS_NAMENODE_CHECKPOINT_PERIOD_DEFAULT);
checkpointTxnCount = conf.getLong(DFS_NAMENODE_CHECKPOINT_TXNS_KEY,
DFS_NAMENODE_CHECKPOINT_TXNS_DEFAULT);
warnForDeprecatedConfigs(conf);
checkpointConf = new CheckpointConf(conf);
// initialize the webserver for uploading files.
// Kerberized SSL servers must be run from the host principal...
@ -307,21 +292,9 @@ public class SecondaryNameNode implements Runnable {
conf.set(DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY, infoBindAddress + ":" +infoPort);
LOG.info("Secondary Web-server up at: " + infoBindAddress + ":" +infoPort);
LOG.info("Secondary image servlet up at: " + infoBindAddress + ":" + imagePort);
LOG.info("Checkpoint Period :" + checkpointPeriod + " secs " +
"(" + checkpointPeriod/60 + " min)");
LOG.info("Log Size Trigger :" + checkpointTxnCount + " txns");
}
static void warnForDeprecatedConfigs(Configuration conf) {
for (String key : ImmutableList.of(
"fs.checkpoint.size",
"dfs.namenode.checkpoint.size")) {
if (conf.get(key) != null) {
LOG.warn("Configuration key " + key + " is deprecated! Ignoring..." +
" Instead please specify a value for " +
DFS_NAMENODE_CHECKPOINT_TXNS_KEY);
}
}
LOG.info("Checkpoint Period :" + checkpointConf.getPeriod() + " secs " +
"(" + checkpointConf.getPeriod()/60 + " min)");
LOG.info("Log Size Trigger :" + checkpointConf.getTxnCount() + " txns");
}
/**
@ -372,7 +345,7 @@ public class SecondaryNameNode implements Runnable {
// Poll the Namenode (once every checkpointCheckPeriod seconds) to find the
// number of transactions in the edit log that haven't yet been checkpointed.
//
long period = Math.min(checkpointCheckPeriod, checkpointPeriod);
long period = checkpointConf.getCheckPeriod();
while (shouldRun) {
try {
@ -391,7 +364,7 @@ public class SecondaryNameNode implements Runnable {
long now = System.currentTimeMillis();
if (shouldCheckpointBasedOnCount() ||
now >= lastCheckpointTime + 1000 * checkpointPeriod) {
now >= lastCheckpointTime + 1000 * checkpointConf.getPeriod()) {
doCheckpoint();
lastCheckpointTime = now;
}
@ -585,13 +558,13 @@ public class SecondaryNameNode implements Runnable {
switch (opts.getCommand()) {
case CHECKPOINT:
long count = countUncheckpointedTxns();
if (count > checkpointTxnCount ||
if (count > checkpointConf.getTxnCount() ||
opts.shouldForceCheckpoint()) {
doCheckpoint();
} else {
System.err.println("EditLog size " + count + " transactions is " +
"smaller than configured checkpoint " +
"interval " + checkpointTxnCount + " transactions.");
"interval " + checkpointConf.getTxnCount() + " transactions.");
System.err.println("Skipping checkpoint.");
}
break;
@ -637,7 +610,7 @@ public class SecondaryNameNode implements Runnable {
}
boolean shouldCheckpointBasedOnCount() throws IOException {
return countUncheckpointedTxns() >= checkpointTxnCount;
return countUncheckpointedTxns() >= checkpointConf.getTxnCount();
}
/**

View File

@ -24,8 +24,11 @@ import java.security.MessageDigest;
import java.util.List;
import java.lang.Math;
import javax.servlet.http.HttpServletResponse;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
@ -41,7 +44,8 @@ import com.google.common.collect.Lists;
/**
* This class provides fetching a specified file from the NameNode.
*/
class TransferFsImage {
@InterfaceAudience.Private
public class TransferFsImage {
public final static String CONTENT_LENGTH = "Content-Length";
public final static String MD5_HEADER = "X-MD5-Digest";
@ -103,7 +107,7 @@ class TransferFsImage {
* @param storage the storage directory to transfer the image from
* @param txid the transaction ID of the image to be uploaded
*/
static void uploadImageFromStorage(String fsName,
public static void uploadImageFromStorage(String fsName,
InetSocketAddress imageListenAddress,
NNStorage storage, long txid) throws IOException {
@ -111,7 +115,20 @@ class TransferFsImage {
txid, imageListenAddress, storage);
// this doesn't directly upload an image, but rather asks the NN
// to connect back to the 2NN to download the specified image.
try {
TransferFsImage.getFileClient(fsName, fileid, null, null, false);
} catch (HttpGetFailedException e) {
if (e.getResponseCode() == HttpServletResponse.SC_CONFLICT) {
// this is OK - this means that a previous attempt to upload
// this checkpoint succeeded even though we thought it failed.
LOG.info("Image upload with txid " + txid +
" conflicted with a previous image upload to the " +
"same NameNode. Continuing...", e);
return;
} else {
throw e;
}
}
LOG.info("Uploaded image with txid " + txid + " to namenode at " +
fsName);
}
@ -194,10 +211,11 @@ class TransferFsImage {
HttpURLConnection connection = (HttpURLConnection) url.openConnection();
if (connection.getResponseCode() != HttpURLConnection.HTTP_OK) {
throw new IOException(
throw new HttpGetFailedException(
"Image transfer servlet at " + url +
" failed with status code " + connection.getResponseCode() +
"\nResponse message:\n" + connection.getResponseMessage());
"\nResponse message:\n" + connection.getResponseMessage(),
connection);
}
long advertisedSize;
@ -290,4 +308,18 @@ class TransferFsImage {
return (header != null) ? new MD5Hash(header) : null;
}
public static class HttpGetFailedException extends IOException {
private static final long serialVersionUID = 1L;
private final int responseCode;
HttpGetFailedException(String msg, HttpURLConnection connection) throws IOException {
super(msg);
this.responseCode = connection.getResponseCode();
}
public int getResponseCode() {
return responseCode;
}
}
}

View File

@ -46,7 +46,6 @@ public class EditLogTailer {
private final EditLogTailerThread tailerThread;
private final FSNamesystem namesystem;
private final FSImage image;
private final FSEditLog editLog;
private volatile Throwable lastError = null;
@ -54,7 +53,6 @@ public class EditLogTailer {
public EditLogTailer(FSNamesystem namesystem) {
this.tailerThread = new EditLogTailerThread();
this.namesystem = namesystem;
this.image = namesystem.getFSImage();
this.editLog = namesystem.getEditLog();
}
@ -106,6 +104,8 @@ public class EditLogTailer {
// deadlock.
namesystem.writeLockInterruptibly();
try {
FSImage image = namesystem.getFSImage();
long lastTxnId = image.getLastAppliedTxId();
if (LOG.isDebugEnabled()) {

View File

@ -3,6 +3,7 @@ package org.apache.hadoop.hdfs.server.namenode.ha;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.ha.ServiceFailedException;
import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
import org.apache.hadoop.ipc.StandbyException;
@ -27,6 +28,9 @@ public interface HAContext {
/** Start the services required in standby state */
public void startStandbyServices() throws IOException;
/** Prepare to exit the standby state */
public void prepareToStopStandbyServices() throws ServiceFailedException;
/** Stop the services when exiting standby state */
public void stopStandbyServices() throws IOException;

View File

@ -54,6 +54,8 @@ abstract public class HAState {
*/
protected final void setStateInternal(final HAContext context, final HAState s)
throws ServiceFailedException {
prepareToExitState(context);
s.prepareToEnterState(context);
context.writeLock();
try {
exitState(context);
@ -64,6 +66,18 @@ abstract public class HAState {
}
}
/**
* Method to be overridden by subclasses to prepare to enter a state.
* This method is called <em>without</em> the context being locked,
* and after {@link #prepareToExitState(HAContext)} has been called
* for the previous state, but before {@link #exitState(HAContext)}
* has been called for the previous state.
* @param context HA context
* @throws ServiceFailedException on precondition failure
*/
public void prepareToEnterState(final HAContext context)
throws ServiceFailedException {}
/**
* Method to be overridden by subclasses to perform steps necessary for
* entering a state.
@ -73,6 +87,22 @@ abstract public class HAState {
public abstract void enterState(final HAContext context)
throws ServiceFailedException;
/**
* Method to be overridden by subclasses to prepare to exit a state.
* This method is called <em>without</em> the context being locked.
* This is used by the standby state to cancel any checkpoints
* that are going on. It can also be used to check any preconditions
* for the state transition.
*
* This method should not make any destructuve changes to the state
* (eg stopping threads) since {@link #prepareToEnterState(HAContext)}
* may subsequently cancel the state transition.
* @param context HA context
* @throws ServiceFailedException on precondition failure
*/
public void prepareToExitState(final HAContext context)
throws ServiceFailedException {}
/**
* Method to be overridden by subclasses to perform steps necessary for
* exiting a state.

View File

@ -0,0 +1,313 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode.ha;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Collection;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.ServiceFailedException;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.server.namenode.CheckpointConf;
import org.apache.hadoop.hdfs.server.namenode.FSImage;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.SaveNamespaceCancelledException;
import org.apache.hadoop.hdfs.server.namenode.TransferFsImage;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import static org.apache.hadoop.hdfs.server.common.Util.now;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
/**
* Thread which runs inside the NN when it's in Standby state,
* periodically waking up to take a checkpoint of the namespace.
* When it takes a checkpoint, it saves it to its local
* storage and then uploads it to the remote NameNode.
*/
@InterfaceAudience.Private
public class StandbyCheckpointer {
private static final Log LOG = LogFactory.getLog(StandbyCheckpointer.class);
private static final long PREVENT_AFTER_CANCEL_MS = 2*60*1000L;
private final CheckpointConf checkpointConf;
private final FSNamesystem namesystem;
private long lastCheckpointTime;
private final CheckpointerThread thread;
private String activeNNAddress;
private InetSocketAddress myNNAddress;
// Keep track of how many checkpoints were canceled.
// This is for use in tests.
private static int canceledCount = 0;
public StandbyCheckpointer(Configuration conf, FSNamesystem ns) {
this.namesystem = ns;
this.checkpointConf = new CheckpointConf(conf);
this.thread = new CheckpointerThread();
setNameNodeAddresses(conf);
}
/**
* Determine the address of the NN we are checkpointing
* as well as our own HTTP address from the configuration.
*/
private void setNameNodeAddresses(Configuration conf) {
String nsId = DFSUtil.getNamenodeNameServiceId(conf);
Collection<String> nnIds = DFSUtil.getNameNodeIds(conf, nsId);
String myNNId = conf.get(DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY);
Preconditions.checkArgument(nnIds != null,
"Could not determine namenode ids in namespace '%s'",
nsId);
Preconditions.checkArgument(nnIds.size() == 2,
"Expected exactly 2 NameNodes in this namespace. Instead, got: '%s'",
Joiner.on("','").join(nnIds));
Preconditions.checkState(myNNId != null && !myNNId.isEmpty(),
"Could not determine own NN ID");
ArrayList<String> nnSet = Lists.newArrayList(nnIds);
nnSet.remove(myNNId);
assert nnSet.size() == 1;
String activeNN = nnSet.get(0);
// Look up the address of the active NN.
Configuration confForActive = new Configuration(conf);
NameNode.initializeGenericKeys(confForActive, nsId, activeNN);
activeNNAddress = DFSUtil.getInfoServer(null, confForActive, true);
// Look up our own address.
String myAddrString = DFSUtil.getInfoServer(null, conf, true);
// Sanity-check.
Preconditions.checkArgument(checkAddress(activeNNAddress),
"Bad address for active NN: %s", activeNNAddress);
Preconditions.checkArgument(checkAddress(activeNNAddress),
"Bad address for standby NN: %s", myNNAddress);
myNNAddress = NetUtils.createSocketAddr(myAddrString);
}
/**
* Ensure that the given address is valid and has a port
* specified.
*/
private boolean checkAddress(String addrStr) {
InetSocketAddress addr = NetUtils.createSocketAddr(addrStr);
return addr.getPort() != 0;
}
public void start() {
LOG.info("Starting standby checkpoint thread...\n" +
"Checkpointing active NN at " + activeNNAddress + "\n" +
"Serving checkpoints at " + myNNAddress);
thread.start();
}
public void stop() throws IOException {
thread.setShouldRun(false);
thread.interrupt();
try {
thread.join();
} catch (InterruptedException e) {
LOG.warn("Edit log tailer thread exited with an exception");
throw new IOException(e);
}
}
private void doCheckpoint() throws InterruptedException, IOException {
long txid;
namesystem.writeLockInterruptibly();
try {
assert namesystem.getEditLog().isOpenForRead() :
"Standby Checkpointer should only attempt a checkpoint when " +
"NN is in standby mode, but the edit logs are in an unexpected state";
FSImage img = namesystem.getFSImage();
long prevCheckpointTxId = img.getStorage().getMostRecentCheckpointTxId();
long thisCheckpointTxId = img.getLastAppliedOrWrittenTxId();
assert thisCheckpointTxId >= prevCheckpointTxId;
if (thisCheckpointTxId == prevCheckpointTxId) {
LOG.info("A checkpoint was triggered but the Standby Node has not " +
"received any transactions since the last checkpoint at txid " +
thisCheckpointTxId + ". Skipping...");
return;
}
img.saveNamespace(namesystem);
txid = img.getStorage().getMostRecentCheckpointTxId();
assert txid == thisCheckpointTxId : "expected to save checkpoint at txid=" +
thisCheckpointTxId + " but instead saved at txid=" + txid;
} finally {
namesystem.writeUnlock();
}
// Upload the saved checkpoint back to the active
TransferFsImage.uploadImageFromStorage(
activeNNAddress, myNNAddress,
namesystem.getFSImage().getStorage(), txid);
}
/**
* Cancel any checkpoint that's currently being made,
* and prevent any new checkpoints from starting for the next
* minute or so.
*/
public void cancelAndPreventCheckpoints() throws ServiceFailedException {
try {
thread.preventCheckpointsFor(PREVENT_AFTER_CANCEL_MS);
// TODO: there is a really narrow race here if we are just
// about to start a checkpoint - this won't cancel it!
namesystem.getFSImage().cancelSaveNamespace(
"About to exit standby state");
} catch (InterruptedException e) {
throw new ServiceFailedException(
"Interrupted while trying to cancel checkpoint");
}
}
@VisibleForTesting
static int getCanceledCount() {
return canceledCount;
}
private long countUncheckpointedTxns() {
FSImage img = namesystem.getFSImage();
return img.getLastAppliedOrWrittenTxId() -
img.getStorage().getMostRecentCheckpointTxId();
}
private class CheckpointerThread extends Thread {
private volatile boolean shouldRun = true;
private volatile long preventCheckpointsUntil = 0;
private CheckpointerThread() {
super("Standby State Checkpointer");
}
private void setShouldRun(boolean shouldRun) {
this.shouldRun = shouldRun;
}
@Override
public void run() {
// We have to make sure we're logged in as far as JAAS
// is concerned, in order to use kerberized SSL properly.
// This code copied from SecondaryNameNode - TODO: refactor
// to a utility function.
if (UserGroupInformation.isSecurityEnabled()) {
UserGroupInformation ugi = null;
try {
ugi = UserGroupInformation.getLoginUser();
} catch (IOException e) {
LOG.error("Exception while getting login user", e);
Runtime.getRuntime().exit(-1);
}
ugi.doAs(new PrivilegedAction<Object>() {
@Override
public Object run() {
doWork();
return null;
}
});
} else {
doWork();
}
}
/**
* Prevent checkpoints from occurring for some time period
* in the future. This is used when preparing to enter active
* mode. We need to not only cancel any concurrent checkpoint,
* but also prevent any checkpoints from racing to start just
* after the cancel call.
*
* @param delayMs the number of MS for which checkpoints will be
* prevented
*/
private void preventCheckpointsFor(long delayMs) {
preventCheckpointsUntil = now() + delayMs;
}
private void doWork() {
// Reset checkpoint time so that we don't always checkpoint
// on startup.
lastCheckpointTime = now();
while (shouldRun) {
try {
Thread.sleep(1000 * checkpointConf.getCheckPeriod());
} catch (InterruptedException ie) {
}
if (!shouldRun) {
break;
}
try {
// We may have lost our ticket since last checkpoint, log in again, just in case
if (UserGroupInformation.isSecurityEnabled()) {
UserGroupInformation.getCurrentUser().reloginFromKeytab();
}
long now = now();
long uncheckpointed = countUncheckpointedTxns();
long secsSinceLast = (now - lastCheckpointTime)/1000;
boolean needCheckpoint = false;
if (uncheckpointed >= checkpointConf.getTxnCount()) {
LOG.info("Triggering checkpoint because there have been " +
uncheckpointed + " txns since the last checkpoint, which " +
"exceeds the configured threshold " +
checkpointConf.getTxnCount());
needCheckpoint = true;
} else if (secsSinceLast >= checkpointConf.getPeriod()) {
LOG.info("Triggering checkpoint because it has been " +
secsSinceLast + " seconds since the last checkpoint, which " +
"exceeds the configured interval " + checkpointConf.getPeriod());
needCheckpoint = true;
}
if (needCheckpoint && now < preventCheckpointsUntil) {
LOG.info("But skipping this checkpoint since we are about to failover!");
canceledCount++;
} else if (needCheckpoint) {
doCheckpoint();
lastCheckpointTime = now;
}
} catch (SaveNamespaceCancelledException ce) {
LOG.info("Checkpoint was cancelled: " + ce.getMessage());
canceledCount++;
} catch (InterruptedException ie) {
// Probably requested shutdown.
continue;
} catch (Throwable t) {
LOG.error("Exception in doCheckpoint", t);
}
}
}
}
}

View File

@ -61,6 +61,11 @@ public class StandbyState extends HAState {
}
}
@Override
public void prepareToExitState(HAContext context) throws ServiceFailedException {
context.prepareToStopStandbyServices();
}
@Override
public void exitState(HAContext context) throws ServiceFailedException {
try {

View File

@ -538,6 +538,16 @@ public class MiniDFSCluster {
conf.setClass(NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
StaticMapping.class, DNSToSwitchMapping.class);
// In an HA cluster, in order for the StandbyNode to perform checkpoints,
// it needs to know the HTTP port of the Active. So, if ephemeral ports
// are chosen, disable checkpoints for the test.
if (!nnTopology.allHttpPortsSpecified() &&
nnTopology.isHA()) {
LOG.info("MiniDFSCluster disabling checkpointing in the Standby node " +
"since no HTTP ports have been specified.");
conf.setBoolean(DFS_HA_STANDBY_CHECKPOINTS_KEY, false);
}
federation = nnTopology.isFederated();
createNameNodesAndSetConf(
nnTopology, manageNameDfsDirs, format, operation, clusterId, conf);

View File

@ -107,6 +107,34 @@ public class MiniDFSNNTopology {
return nameservices.size() > 1 || federation;
}
/**
* @return true if at least one of the nameservices
* in the topology has HA enabled.
*/
public boolean isHA() {
for (NSConf ns : nameservices) {
if (ns.getNNs().size() > 1) {
return true;
}
}
return false;
}
/**
* @return true if all of the NNs in the cluster have their HTTP
* port specified to be non-ephemeral.
*/
public boolean allHttpPortsSpecified() {
for (NSConf ns : nameservices) {
for (NNConf nn : ns.getNNs()) {
if (nn.getHttpPort() == 0) {
return false;
}
}
}
return true;
}
public List<NSConf> getNameservices() {
return nameservices;
}

View File

@ -195,9 +195,10 @@ public abstract class FSImageTestUtil {
* Create an aborted in-progress log in the given directory, containing
* only a specified number of "mkdirs" operations.
*/
public static void createAbortedLogWithMkdirs(File editsLogDir, int numDirs)
throws IOException {
public static void createAbortedLogWithMkdirs(File editsLogDir, int numDirs,
long firstTxId) throws IOException {
FSEditLog editLog = FSImageTestUtil.createStandaloneEditLog(editsLogDir);
editLog.setNextTxId(firstTxId);
editLog.openForWrite();
PermissionStatus perms = PermissionStatus.createImmutable("fakeuser", "fakegroup",
@ -399,10 +400,15 @@ public abstract class FSImageTestUtil {
* Assert that the NameNode has checkpoints at the expected
* transaction IDs.
*/
static void assertNNHasCheckpoints(MiniDFSCluster cluster,
public static void assertNNHasCheckpoints(MiniDFSCluster cluster,
List<Integer> txids) {
assertNNHasCheckpoints(cluster, 0, txids);
}
for (File nameDir : getNameNodeCurrentDirs(cluster)) {
public static void assertNNHasCheckpoints(MiniDFSCluster cluster,
int nnIdx, List<Integer> txids) {
for (File nameDir : getNameNodeCurrentDirs(cluster, nnIdx)) {
// Should have fsimage_N for the three checkpoints
for (long checkpointTxId : txids) {
File image = new File(nameDir,
@ -412,9 +418,9 @@ public abstract class FSImageTestUtil {
}
}
static List<File> getNameNodeCurrentDirs(MiniDFSCluster cluster) {
public static List<File> getNameNodeCurrentDirs(MiniDFSCluster cluster, int nnIdx) {
List<File> nameDirs = Lists.newArrayList();
for (URI u : cluster.getNameDirs(0)) {
for (URI u : cluster.getNameDirs(nnIdx)) {
nameDirs.add(new File(u.getPath(), "current"));
}
return nameDirs;

View File

@ -32,6 +32,7 @@ import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.test.GenericTestUtils;
import org.mockito.Mockito;
/**
@ -149,4 +150,10 @@ public class NameNodeAdapter {
fsn.setFsLockForTests(spy);
return spy;
}
public static FSImage spyOnFsImage(NameNode nn1) {
FSImage spy = Mockito.spy(nn1.getNamesystem().dir.fsImage);
nn1.getNamesystem().dir.fsImage = spy;
return spy;
}
}

View File

@ -1339,17 +1339,11 @@ public class TestCheckpoint extends TestCase {
// Let the first one finish
delayer.proceed();
// Letting the first node continue should catch an exception
// Letting the first node continue, it should try to upload the
// same image, and gracefully ignore it, while logging an
// error message.
checkpointThread.join();
try {
checkpointThread.propagateExceptions();
fail("Didn't throw!");
} catch (Exception ioe) {
assertTrue("Unexpected exception: " +
StringUtils.stringifyException(ioe),
ioe.toString().contains("Another checkpointer already uploaded"));
LOG.info("Caught expected exception", ioe);
}
// primary should still consider fsimage_4 the latest
assertEquals(4, storage.getMostRecentCheckpointTxId());
@ -1791,7 +1785,7 @@ public class TestCheckpoint extends TestCase {
private void assertParallelFilesInvariant(MiniDFSCluster cluster,
ImmutableList<SecondaryNameNode> secondaries) throws Exception {
List<File> allCurrentDirs = Lists.newArrayList();
allCurrentDirs.addAll(getNameNodeCurrentDirs(cluster));
allCurrentDirs.addAll(getNameNodeCurrentDirs(cluster, 0));
for (SecondaryNameNode snn : secondaries) {
allCurrentDirs.addAll(getCheckpointCurrentDirs(snn));
}

View File

@ -129,7 +129,7 @@ public class TestEditLogsDuringFailover {
// Create a fake in-progress edit-log in the shared directory
URI sharedUri = cluster.getSharedEditsDir(0, 1);
File sharedDir = new File(sharedUri.getPath(), "current");
FSImageTestUtil.createAbortedLogWithMkdirs(sharedDir, NUM_DIRS_IN_LOG);
FSImageTestUtil.createAbortedLogWithMkdirs(sharedDir, NUM_DIRS_IN_LOG, 1);
assertEditFiles(Collections.singletonList(sharedUri),
NNStorage.getInProgressEditsFileName(1));

View File

@ -0,0 +1,240 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode.ha;
import static org.junit.Assert.*;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.TestDFSClientFailover;
import org.apache.hadoop.hdfs.server.namenode.FSImage;
import org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NNStorage;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
public class TestStandbyCheckpoints {
private static final int NUM_DIRS_IN_LOG = 200000;
private MiniDFSCluster cluster;
private NameNode nn0, nn1;
private FileSystem fs;
@Before
public void setupCluster() throws Exception {
Configuration conf = new Configuration();
conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_CHECK_PERIOD_KEY, 1);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_KEY, 5);
MiniDFSNNTopology topology = new MiniDFSNNTopology()
.addNameservice(new MiniDFSNNTopology.NSConf(null)
.addNN(new MiniDFSNNTopology.NNConf("nn1").setHttpPort(10001))
.addNN(new MiniDFSNNTopology.NNConf("nn2").setHttpPort(10002)));
cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(topology)
.numDataNodes(0)
.build();
cluster.waitActive();
nn0 = cluster.getNameNode(0);
nn1 = cluster.getNameNode(1);
fs = TestDFSClientFailover.configureFailoverFs(cluster, conf);
nn1.getNamesystem().getEditLogTailer().setSleepTime(250);
nn1.getNamesystem().getEditLogTailer().interrupt();
cluster.transitionToActive(0);
}
@After
public void shutdownCluster() throws IOException {
if (cluster != null) {
cluster.shutdown();
}
}
@Test
public void testSBNCheckpoints() throws Exception {
doEdits(0, 10);
TestEditLogTailer.waitForStandbyToCatchUp(nn0, nn1);
// Once the standby catches up, it should notice that it needs to
// do a checkpoint and save one to its local directories.
waitForCheckpoint(1, ImmutableList.of(0, 12));
// It should also upload it back to the active.
waitForCheckpoint(0, ImmutableList.of(0, 12));
}
/**
* Test for the case when both of the NNs in the cluster are
* in the standby state, and thus are both creating checkpoints
* and uploading them to each other.
* In this circumstance, they should receive the error from the
* other node indicating that the other node already has a
* checkpoint for the given txid, but this should not cause
* an abort, etc.
*/
@Test
public void testBothNodesInStandbyState() throws Exception {
doEdits(0, 10);
cluster.transitionToStandby(0);
// Transitioning to standby closed the edit log on the active,
// so the standby will catch up. Then, both will be in standby mode
// with enough uncheckpointed txns to cause a checkpoint, and they
// will each try to take a checkpoint and upload to each other.
waitForCheckpoint(1, ImmutableList.of(0, 12));
waitForCheckpoint(0, ImmutableList.of(0, 12));
assertEquals(12, nn0.getNamesystem().getFSImage().getStorage()
.getMostRecentCheckpointTxId());
assertEquals(12, nn1.getNamesystem().getFSImage().getStorage()
.getMostRecentCheckpointTxId());
List<File> dirs = Lists.newArrayList();
dirs.addAll(FSImageTestUtil.getNameNodeCurrentDirs(cluster, 0));
dirs.addAll(FSImageTestUtil.getNameNodeCurrentDirs(cluster, 1));
// TODO: this failed once because it caught a ckpt file -- maybe
// this is possible if one of the NNs is really fast and the other is slow?
// need to loop this to suss out the race.
FSImageTestUtil.assertParallelFilesAreIdentical(dirs, ImmutableSet.<String>of());
}
/**
* Test for the case when the SBN is configured to checkpoint based
* on a time period, but no transactions are happening on the
* active. Thus, it would want to save a second checkpoint at the
* same txid, which is a no-op. This test makes sure this doesn't
* cause any problem.
*/
@Test
public void testCheckpointWhenNoNewTransactionsHappened()
throws Exception {
// Checkpoint as fast as we can, in a tight loop.
cluster.getConfiguration(1).setInt(
DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_PERIOD_KEY, 0);
cluster.restartNameNode(1);
nn1 = cluster.getNameNode(1);
nn1.getNamesystem().getEditLogTailer().setSleepTime(250);
nn1.getNamesystem().getEditLogTailer().interrupt();
FSImage spyImage1 = NameNodeAdapter.spyOnFsImage(nn1);
// We shouldn't save any checkpoints at txid=0
Thread.sleep(1000);
Mockito.verify(spyImage1, Mockito.never())
.saveNamespace((FSNamesystem) Mockito.anyObject());
// Roll the primary and wait for the standby to catch up
TestEditLogTailer.waitForStandbyToCatchUp(nn0, nn1);
Thread.sleep(2000);
// We should make exactly one checkpoint at this new txid.
Mockito.verify(spyImage1, Mockito.times(1))
.saveNamespace((FSNamesystem) Mockito.anyObject());
}
/**
* Test cancellation of ongoing checkpoints when failover happens
* mid-checkpoint.
*/
@Test
public void testCheckpointCancellation() throws Exception {
cluster.transitionToStandby(0);
// Create an edit log in the shared edits dir with a lot
// of mkdirs operations. This is solely so that the image is
// large enough to take a non-trivial amount of time to load.
// (only ~15MB)
URI sharedUri = cluster.getSharedEditsDir(0, 1);
File sharedDir = new File(sharedUri.getPath(), "current");
File tmpDir = new File(MiniDFSCluster.getBaseDirectory(),
"testCheckpointCancellation-tmp");
FSImageTestUtil.createAbortedLogWithMkdirs(tmpDir, NUM_DIRS_IN_LOG,
3);
String fname = NNStorage.getInProgressEditsFileName(3);
new File(tmpDir, fname).renameTo(new File(sharedDir, fname));
// Checkpoint as fast as we can, in a tight loop.
cluster.getConfiguration(1).setInt(
DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_PERIOD_KEY, 0);
cluster.restartNameNode(1);
nn1 = cluster.getNameNode(1);
nn1.getNamesystem().getEditLogTailer().setSleepTime(250);
nn1.getNamesystem().getEditLogTailer().interrupt();
cluster.transitionToActive(0);
for (int i = 0; i < 10; i++) {
doEdits(i*10, i*10 + 10);
cluster.transitionToStandby(0);
cluster.transitionToActive(1);
cluster.transitionToStandby(1);
cluster.transitionToActive(0);
}
assertTrue(StandbyCheckpointer.getCanceledCount() > 0);
}
private void doEdits(int start, int stop) throws IOException {
for (int i = start; i < stop; i++) {
Path p = new Path("/test" + i);
fs.mkdirs(p);
}
}
private void waitForCheckpoint(int nnIdx, List<Integer> txids)
throws InterruptedException {
long start = System.currentTimeMillis();
while (true) {
try {
FSImageTestUtil.assertNNHasCheckpoints(cluster, nnIdx, txids);
return;
} catch (AssertionError err) {
if (System.currentTimeMillis() - start > 10000) {
throw err;
} else {
Thread.sleep(300);
}
}
}
}
}