HDFS-8127. NameNode Failover during HA upgrade can cause DataNode to finalize upgrade. Contributed by Jing Zhao.
(cherry picked from commitfddd55279d
) (cherry picked from commit38b031d6ba
) Conflicts: hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNUpgradeUtil.java hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/BootstrapStandby.java (cherry picked from commit b4e227e65d3a81150f46aac5e987d759defc1452)
This commit is contained in:
parent
dcc0d4658c
commit
b7e7896410
|
@ -120,6 +120,9 @@ Release 2.6.1 - UNRELEASED
|
|||
HDFS-8072. Reserved RBW space is not released if client terminates while
|
||||
writing block. (Arpit Agarwal)
|
||||
|
||||
HDFS-8127. NameNode Failover during HA upgrade can cause DataNode to
|
||||
finalize upgrade. (jing9)
|
||||
|
||||
Release 2.6.0 - 2014-11-18
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -22,6 +22,7 @@ import java.io.IOException;
|
|||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.VersionRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.VersionResponseProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.EndCheckpointRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.EndCheckpointResponseProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.ErrorReportRequestProto;
|
||||
|
@ -36,6 +37,8 @@ import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetMostRecen
|
|||
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetMostRecentCheckpointTxIdResponseProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetTransactionIdRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetTransactionIdResponseProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.IsUpgradeFinalizedRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.IsUpgradeFinalizedResponseProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.RegisterRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.RegisterResponseProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.RollEditLogRequestProto;
|
||||
|
@ -222,4 +225,18 @@ public class NamenodeProtocolServerSideTranslatorPB implements
|
|||
return VersionResponseProto.newBuilder()
|
||||
.setInfo(PBHelper.convert(info)).build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public IsUpgradeFinalizedResponseProto isUpgradeFinalized(
|
||||
RpcController controller, IsUpgradeFinalizedRequestProto request)
|
||||
throws ServiceException {
|
||||
boolean isUpgradeFinalized;
|
||||
try {
|
||||
isUpgradeFinalized = impl.isUpgradeFinalized();
|
||||
} catch (IOException e) {
|
||||
throw new ServiceException(e);
|
||||
}
|
||||
return IsUpgradeFinalizedResponseProto.newBuilder()
|
||||
.setIsUpgradeFinalized(isUpgradeFinalized).build();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -26,6 +26,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
|||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.NamenodeCommandProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.VersionRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.EndCheckpointRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.ErrorReportRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetBlockKeysRequestProto;
|
||||
|
@ -34,6 +35,8 @@ import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetBlocksReq
|
|||
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetEditLogManifestRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetMostRecentCheckpointTxIdRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetTransactionIdRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.IsUpgradeFinalizedRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.IsUpgradeFinalizedResponseProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.RegisterRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.RollEditLogRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.StartCheckpointRequestProto;
|
||||
|
@ -232,4 +235,17 @@ public class NamenodeProtocolTranslatorPB implements NamenodeProtocol,
|
|||
RPC.RpcKind.RPC_PROTOCOL_BUFFER,
|
||||
RPC.getProtocolVersion(NamenodeProtocolPB.class), methodName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isUpgradeFinalized() throws IOException {
|
||||
IsUpgradeFinalizedRequestProto req = IsUpgradeFinalizedRequestProto
|
||||
.newBuilder().build();
|
||||
try {
|
||||
IsUpgradeFinalizedResponseProto response = rpcProxy.isUpgradeFinalized(
|
||||
NULL_CONTROLLER, req);
|
||||
return response.getIsUpgradeFinalized();
|
||||
} catch (ServiceException e) {
|
||||
throw ProtobufHelper.getRemoteException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -199,7 +199,7 @@ public class FSImage implements Closeable {
|
|||
// check whether all is consistent before transitioning.
|
||||
Map<StorageDirectory, StorageState> dataDirStates =
|
||||
new HashMap<StorageDirectory, StorageState>();
|
||||
boolean isFormatted = recoverStorageDirs(startOpt, dataDirStates);
|
||||
boolean isFormatted = recoverStorageDirs(startOpt, storage, dataDirStates);
|
||||
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Data dir states:\n " +
|
||||
|
@ -288,8 +288,9 @@ public class FSImage implements Closeable {
|
|||
* @param dataDirStates output of storage directory states
|
||||
* @return true if there is at least one valid formatted storage directory
|
||||
*/
|
||||
private boolean recoverStorageDirs(StartupOption startOpt,
|
||||
Map<StorageDirectory, StorageState> dataDirStates) throws IOException {
|
||||
public static boolean recoverStorageDirs(StartupOption startOpt,
|
||||
NNStorage storage, Map<StorageDirectory, StorageState> dataDirStates)
|
||||
throws IOException {
|
||||
boolean isFormatted = false;
|
||||
// This loop needs to be over all storage dirs, even shared dirs, to make
|
||||
// sure that we properly examine their state, but we make sure we don't
|
||||
|
@ -339,7 +340,7 @@ public class FSImage implements Closeable {
|
|||
}
|
||||
|
||||
/** Check if upgrade is in progress. */
|
||||
void checkUpgrade(FSNamesystem target) throws IOException {
|
||||
public static void checkUpgrade(NNStorage storage) throws IOException {
|
||||
// Upgrade or rolling upgrade is allowed only if there are
|
||||
// no previous fs states in any of the directories
|
||||
for (Iterator<StorageDirectory> it = storage.dirIterator(false); it.hasNext();) {
|
||||
|
@ -351,6 +352,10 @@ public class FSImage implements Closeable {
|
|||
}
|
||||
}
|
||||
|
||||
void checkUpgrade() throws IOException {
|
||||
checkUpgrade(storage);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return true if there is rollback fsimage (for rolling upgrade) in NameNode
|
||||
* directory.
|
||||
|
@ -368,7 +373,7 @@ public class FSImage implements Closeable {
|
|||
}
|
||||
|
||||
void doUpgrade(FSNamesystem target) throws IOException {
|
||||
checkUpgrade(target);
|
||||
checkUpgrade();
|
||||
|
||||
// load the latest image
|
||||
this.loadFSImage(target, StartupOption.UPGRADE, null);
|
||||
|
|
|
@ -8365,7 +8365,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
void startRollingUpgradeInternal(long startTime)
|
||||
throws IOException {
|
||||
checkRollingUpgrade("start rolling upgrade");
|
||||
getFSImage().checkUpgrade(this);
|
||||
getFSImage().checkUpgrade();
|
||||
setRollingUpgradeInfo(false, startTime);
|
||||
}
|
||||
|
||||
|
@ -8382,7 +8382,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
+ "in order to create namespace image.");
|
||||
}
|
||||
checkRollingUpgrade("start rolling upgrade");
|
||||
getFSImage().checkUpgrade(this);
|
||||
getFSImage().checkUpgrade();
|
||||
// in non-HA setup, we do an extra checkpoint to generate a rollback image
|
||||
getFSImage().saveNamespace(this, NameNodeFile.IMAGE_ROLLBACK, null);
|
||||
LOG.info("Successfully saved namespace for preparing rolling upgrade.");
|
||||
|
|
|
@ -975,7 +975,7 @@ public class NNStorage extends Storage implements Closeable,
|
|||
}
|
||||
|
||||
/** Validate and set block pool ID */
|
||||
void setBlockPoolID(String bpid) {
|
||||
public void setBlockPoolID(String bpid) {
|
||||
blockpoolID = bpid;
|
||||
}
|
||||
|
||||
|
|
|
@ -31,7 +31,7 @@ import org.apache.hadoop.hdfs.server.common.StorageInfo;
|
|||
|
||||
import com.google.common.base.Preconditions;
|
||||
|
||||
abstract class NNUpgradeUtil {
|
||||
public abstract class NNUpgradeUtil {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(NNUpgradeUtil.class);
|
||||
|
||||
|
@ -110,25 +110,12 @@ abstract class NNUpgradeUtil {
|
|||
static void doPreUpgrade(Configuration conf, StorageDirectory sd)
|
||||
throws IOException {
|
||||
LOG.info("Starting upgrade of storage directory " + sd.getRoot());
|
||||
File curDir = sd.getCurrentDir();
|
||||
File prevDir = sd.getPreviousDir();
|
||||
final File tmpDir = sd.getPreviousTmp();
|
||||
|
||||
Preconditions.checkState(curDir.exists(),
|
||||
"Current directory must exist for preupgrade.");
|
||||
Preconditions.checkState(!prevDir.exists(),
|
||||
"Previous directory must not exist for preupgrade.");
|
||||
Preconditions.checkState(!tmpDir.exists(),
|
||||
"Previous.tmp directory must not exist for preupgrade."
|
||||
+ "Consider restarting for recovery.");
|
||||
|
||||
// rename current to tmp
|
||||
NNStorage.rename(curDir, tmpDir);
|
||||
|
||||
if (!curDir.mkdir()) {
|
||||
throw new IOException("Cannot create directory " + curDir);
|
||||
}
|
||||
renameCurToTmp(sd);
|
||||
|
||||
final File curDir = sd.getCurrentDir();
|
||||
final File tmpDir = sd.getPreviousTmp();
|
||||
String[] fileNameList = tmpDir.list(new FilenameFilter() {
|
||||
@Override
|
||||
public boolean accept(File dir, String name) {
|
||||
|
@ -162,6 +149,31 @@ abstract class NNUpgradeUtil {
|
|||
in.close();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Rename the existing current dir to previous.tmp, and create a new empty
|
||||
* current dir.
|
||||
*/
|
||||
public static void renameCurToTmp(StorageDirectory sd) throws IOException {
|
||||
File curDir = sd.getCurrentDir();
|
||||
File prevDir = sd.getPreviousDir();
|
||||
final File tmpDir = sd.getPreviousTmp();
|
||||
|
||||
Preconditions.checkState(curDir.exists(),
|
||||
"Current directory must exist for preupgrade.");
|
||||
Preconditions.checkState(!prevDir.exists(),
|
||||
"Previous directory must not exist for preupgrade.");
|
||||
Preconditions.checkState(!tmpDir.exists(),
|
||||
"Previous.tmp directory must not exist for preupgrade."
|
||||
+ "Consider restarting for recovery.");
|
||||
|
||||
// rename current to tmp
|
||||
NNStorage.rename(curDir, tmpDir);
|
||||
|
||||
if (!curDir.mkdir()) {
|
||||
throw new IOException("Cannot create directory " + curDir);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Perform the upgrade of the storage dir to the given storage info. The new
|
||||
|
@ -172,14 +184,14 @@ abstract class NNUpgradeUtil {
|
|||
* @param storage info about the new upgraded versions.
|
||||
* @throws IOException in the event of error
|
||||
*/
|
||||
static void doUpgrade(StorageDirectory sd, Storage storage) throws
|
||||
IOException {
|
||||
public static void doUpgrade(StorageDirectory sd, Storage storage)
|
||||
throws IOException {
|
||||
LOG.info("Performing upgrade of storage directory " + sd.getRoot());
|
||||
try {
|
||||
// Write the version file, since saveFsImage only makes the
|
||||
// fsimage_<txid>, and the directory is otherwise empty.
|
||||
storage.writeProperties(sd);
|
||||
|
||||
|
||||
File prevDir = sd.getPreviousDir();
|
||||
File tmpDir = sd.getPreviousTmp();
|
||||
Preconditions.checkState(!prevDir.exists(),
|
||||
|
|
|
@ -991,6 +991,13 @@ class NameNodeRpcServer implements NamenodeProtocols {
|
|||
namesystem.checkSuperuserPrivilege();
|
||||
return namesystem.getEditLog().getEditLogManifest(sinceTxId);
|
||||
}
|
||||
|
||||
@Override // NamenodeProtocol
|
||||
public boolean isUpgradeFinalized() throws IOException {
|
||||
checkNNStartup();
|
||||
namesystem.checkSuperuserPrivilege();
|
||||
return namesystem.isUpgradeFinalized();
|
||||
}
|
||||
|
||||
@Override // ClientProtocol
|
||||
public void finalizeUpgrade() throws IOException {
|
||||
|
|
|
@ -26,7 +26,10 @@ import java.net.URI;
|
|||
import java.net.URL;
|
||||
import java.security.PrivilegedAction;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -38,13 +41,17 @@ import org.apache.hadoop.hdfs.DFSUtil;
|
|||
import org.apache.hadoop.hdfs.HAUtil;
|
||||
import org.apache.hadoop.hdfs.NameNodeProxies;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
|
||||
import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
|
||||
import org.apache.hadoop.hdfs.server.common.Storage;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
|
||||
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
|
||||
import org.apache.hadoop.hdfs.server.common.Storage.StorageState;
|
||||
import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSImage;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NNStorage;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NNUpgradeUtil;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||
import org.apache.hadoop.hdfs.server.namenode.TransferFsImage;
|
||||
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
|
||||
|
@ -142,11 +149,12 @@ public class BootstrapStandby implements Tool, Configurable {
|
|||
}
|
||||
|
||||
private int doRun() throws IOException {
|
||||
|
||||
NamenodeProtocol proxy = createNNProtocolProxy();
|
||||
NamespaceInfo nsInfo;
|
||||
boolean isUpgradeFinalized;
|
||||
try {
|
||||
nsInfo = proxy.versionRequest();
|
||||
isUpgradeFinalized = proxy.isUpgradeFinalized();
|
||||
} catch (IOException ioe) {
|
||||
LOG.fatal("Unable to fetch namespace information from active NN at " +
|
||||
otherIpcAddr + ": " + ioe.getMessage());
|
||||
|
@ -163,7 +171,6 @@ public class BootstrapStandby implements Tool, Configurable {
|
|||
return ERR_CODE_INVALID_VERSION;
|
||||
}
|
||||
|
||||
|
||||
System.out.println(
|
||||
"=====================================================\n" +
|
||||
"About to bootstrap Standby ID " + nnId + " from:\n" +
|
||||
|
@ -175,35 +182,133 @@ public class BootstrapStandby implements Tool, Configurable {
|
|||
" Block pool ID: " + nsInfo.getBlockPoolID() + "\n" +
|
||||
" Cluster ID: " + nsInfo.getClusterID() + "\n" +
|
||||
" Layout version: " + nsInfo.getLayoutVersion() + "\n" +
|
||||
" isUpgradeFinalized: " + isUpgradeFinalized + "\n" +
|
||||
"=====================================================");
|
||||
|
||||
long imageTxId = proxy.getMostRecentCheckpointTxId();
|
||||
long curTxId = proxy.getTransactionID();
|
||||
|
||||
NNStorage storage = new NNStorage(conf, dirsToFormat, editUrisToFormat);
|
||||
|
||||
// Check with the user before blowing away data.
|
||||
if (!Storage.confirmFormat(storage.dirIterable(null),
|
||||
force, interactive)) {
|
||||
storage.close();
|
||||
|
||||
if (!isUpgradeFinalized) {
|
||||
// the remote NameNode is in upgrade state, this NameNode should also
|
||||
// create the previous directory. First prepare the upgrade and rename
|
||||
// the current dir to previous.tmp.
|
||||
LOG.info("The active NameNode is in Upgrade. " +
|
||||
"Prepare the upgrade for the standby NameNode as well.");
|
||||
if (!doPreUpgrade(storage, nsInfo)) {
|
||||
return ERR_CODE_ALREADY_FORMATTED;
|
||||
}
|
||||
} else if (!format(storage, nsInfo)) { // prompt the user to format storage
|
||||
return ERR_CODE_ALREADY_FORMATTED;
|
||||
}
|
||||
|
||||
// Format the storage (writes VERSION file)
|
||||
storage.format(nsInfo);
|
||||
|
||||
// Load the newly formatted image, using all of the directories (including shared
|
||||
// edits)
|
||||
// download the fsimage from active namenode
|
||||
int download = downloadImage(storage, proxy);
|
||||
if (download != 0) {
|
||||
return download;
|
||||
}
|
||||
|
||||
// finish the upgrade: rename previous.tmp to previous
|
||||
if (!isUpgradeFinalized) {
|
||||
doUpgrade(storage);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Iterate over all the storage directories, checking if it should be
|
||||
* formatted. Format the storage if necessary and allowed by the user.
|
||||
* @return True if formatting is processed
|
||||
*/
|
||||
private boolean format(NNStorage storage, NamespaceInfo nsInfo)
|
||||
throws IOException {
|
||||
// Check with the user before blowing away data.
|
||||
if (!Storage.confirmFormat(storage.dirIterable(null), force, interactive)) {
|
||||
storage.close();
|
||||
return false;
|
||||
} else {
|
||||
// Format the storage (writes VERSION file)
|
||||
storage.format(nsInfo);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This is called when using bootstrapStandby for HA upgrade. The SBN should
|
||||
* also create previous directory so that later when it starts, it understands
|
||||
* that the cluster is in the upgrade state. This function renames the old
|
||||
* current directory to previous.tmp.
|
||||
*/
|
||||
private boolean doPreUpgrade(NNStorage storage, NamespaceInfo nsInfo)
|
||||
throws IOException {
|
||||
boolean isFormatted = false;
|
||||
Map<StorageDirectory, StorageState> dataDirStates =
|
||||
new HashMap<StorageDirectory, StorageState>();
|
||||
try {
|
||||
isFormatted = FSImage.recoverStorageDirs(StartupOption.UPGRADE, storage,
|
||||
dataDirStates);
|
||||
if (dataDirStates.values().contains(StorageState.NOT_FORMATTED)) {
|
||||
// recoverStorageDirs returns true if there is a formatted directory
|
||||
isFormatted = false;
|
||||
System.err.println("The original storage directory is not formatted.");
|
||||
}
|
||||
} catch (InconsistentFSStateException e) {
|
||||
// if the storage is in a bad state,
|
||||
LOG.warn("The storage directory is in an inconsistent state", e);
|
||||
} finally {
|
||||
storage.unlockAll();
|
||||
}
|
||||
|
||||
// if there is InconsistentFSStateException or the storage is not formatted,
|
||||
// format the storage. Although this format is done through the new
|
||||
// software, since in HA setup the SBN is rolled back through
|
||||
// "-bootstrapStandby", we should still be fine.
|
||||
if (!isFormatted && !format(storage, nsInfo)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// make sure there is no previous directory
|
||||
FSImage.checkUpgrade(storage);
|
||||
// Do preUpgrade for each directory
|
||||
for (Iterator<StorageDirectory> it = storage.dirIterator(false);
|
||||
it.hasNext();) {
|
||||
StorageDirectory sd = it.next();
|
||||
try {
|
||||
NNUpgradeUtil.renameCurToTmp(sd);
|
||||
} catch (IOException e) {
|
||||
LOG.error("Failed to move aside pre-upgrade storage " +
|
||||
"in image directory " + sd.getRoot(), e);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
storage.setStorageInfo(nsInfo);
|
||||
storage.setBlockPoolID(nsInfo.getBlockPoolID());
|
||||
return true;
|
||||
}
|
||||
|
||||
private void doUpgrade(NNStorage storage) throws IOException {
|
||||
for (Iterator<StorageDirectory> it = storage.dirIterator(false);
|
||||
it.hasNext();) {
|
||||
StorageDirectory sd = it.next();
|
||||
NNUpgradeUtil.doUpgrade(sd, storage);
|
||||
}
|
||||
}
|
||||
|
||||
private int downloadImage(NNStorage storage, NamenodeProtocol proxy)
|
||||
throws IOException {
|
||||
// Load the newly formatted image, using all of the directories
|
||||
// (including shared edits)
|
||||
final long imageTxId = proxy.getMostRecentCheckpointTxId();
|
||||
final long curTxId = proxy.getTransactionID();
|
||||
FSImage image = new FSImage(conf);
|
||||
try {
|
||||
image.getStorage().setStorageInfo(storage);
|
||||
image.initEditLog(StartupOption.REGULAR);
|
||||
assert image.getEditLog().isOpenForRead() :
|
||||
"Expected edit log to be open for read";
|
||||
"Expected edit log to be open for read";
|
||||
|
||||
// Ensure that we have enough edits already in the shared directory to
|
||||
// start up from the last checkpoint on the active.
|
||||
if (!skipSharedEditsCheck && !checkLogsAvailableForRead(image, imageTxId, curTxId)) {
|
||||
if (!skipSharedEditsCheck &&
|
||||
!checkLogsAvailableForRead(image, imageTxId, curTxId)) {
|
||||
return ERR_CODE_LOGS_UNAVAILABLE;
|
||||
}
|
||||
|
||||
|
@ -211,7 +316,7 @@ public class BootstrapStandby implements Tool, Configurable {
|
|||
|
||||
// Download that checkpoint into our storage directories.
|
||||
MD5Hash hash = TransferFsImage.downloadImageToStorage(
|
||||
otherHttpAddr, imageTxId, storage, true);
|
||||
otherHttpAddr, imageTxId, storage, true);
|
||||
image.saveDigestAndRenameCheckpointImage(NameNodeFile.IMAGE, imageTxId,
|
||||
hash);
|
||||
} catch (IOException ioe) {
|
||||
|
|
|
@ -186,5 +186,12 @@ public interface NamenodeProtocol {
|
|||
@Idempotent
|
||||
public RemoteEditLogManifest getEditLogManifest(long sinceTxId)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* @return Whether the NameNode is in upgrade state (false) or not (true)
|
||||
*/
|
||||
@Idempotent
|
||||
public boolean isUpgradeFinalized() throws IOException;
|
||||
|
||||
}
|
||||
|
||||
|
|
|
@ -181,6 +181,16 @@ message GetEditLogManifestResponseProto {
|
|||
required RemoteEditLogManifestProto manifest = 1;
|
||||
}
|
||||
|
||||
/**
|
||||
* void request
|
||||
*/
|
||||
message IsUpgradeFinalizedRequestProto {
|
||||
}
|
||||
|
||||
message IsUpgradeFinalizedResponseProto {
|
||||
required bool isUpgradeFinalized = 1;
|
||||
}
|
||||
|
||||
/**
|
||||
* Protocol used by the sub-ordinate namenode to send requests
|
||||
* the active/primary namenode.
|
||||
|
@ -250,4 +260,10 @@ service NamenodeProtocolService {
|
|||
*/
|
||||
rpc getEditLogManifest(GetEditLogManifestRequestProto)
|
||||
returns(GetEditLogManifestResponseProto);
|
||||
|
||||
/**
|
||||
* Return whether the NameNode is in upgrade state (false) or not (true)
|
||||
*/
|
||||
rpc isUpgradeFinalized(IsUpgradeFinalizedRequestProto)
|
||||
returns (IsUpgradeFinalizedResponseProto);
|
||||
}
|
||||
|
|
|
@ -18,7 +18,9 @@
|
|||
package org.apache.hadoop.hdfs.server.namenode.ha;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -28,18 +30,26 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
|
|||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.qjournal.MiniJournalCluster;
|
||||
import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSImage;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NNStorage;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import org.mockito.internal.util.reflection.Whitebox;
|
||||
|
||||
/**
|
||||
* Test BootstrapStandby when QJM is used for shared edits.
|
||||
*/
|
||||
public class TestBootstrapStandbyWithQJM {
|
||||
private MiniQJMHACluster miniQjmHaCluster;
|
||||
public class TestBootstrapStandbyWithQJM {
|
||||
enum UpgradeState {
|
||||
NORMAL,
|
||||
RECOVER,
|
||||
FORMAT
|
||||
}
|
||||
|
||||
private MiniDFSCluster cluster;
|
||||
private MiniJournalCluster jCluster;
|
||||
|
||||
|
@ -52,7 +62,7 @@ public class TestBootstrapStandbyWithQJM {
|
|||
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY,
|
||||
0);
|
||||
|
||||
miniQjmHaCluster = new MiniQJMHACluster.Builder(conf).build();
|
||||
MiniQJMHACluster miniQjmHaCluster = new MiniQJMHACluster.Builder(conf).build();
|
||||
cluster = miniQjmHaCluster.getDfsCluster();
|
||||
jCluster = miniQjmHaCluster.getJournalCluster();
|
||||
|
||||
|
@ -112,4 +122,77 @@ public class TestBootstrapStandbyWithQJM {
|
|||
ImmutableList.of(0));
|
||||
FSImageTestUtil.assertNNFilesMatch(cluster);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test the bootstrapstandby while the other namenode is in upgrade state.
|
||||
* Make sure a previous directory can be created.
|
||||
*/
|
||||
@Test
|
||||
public void testUpgrade() throws Exception {
|
||||
testUpgrade(UpgradeState.NORMAL);
|
||||
}
|
||||
|
||||
/**
|
||||
* Similar with testUpgrade, but rename nn1's current directory to
|
||||
* previous.tmp before bootstrapStandby, and make sure the nn1 is recovered
|
||||
* first then converted into upgrade state.
|
||||
*/
|
||||
@Test
|
||||
public void testUpgradeWithRecover() throws Exception {
|
||||
testUpgrade(UpgradeState.RECOVER);
|
||||
}
|
||||
|
||||
/**
|
||||
* Similar with testUpgrade, but rename nn1's current directory to a random
|
||||
* name so that it's not formatted. Make sure the nn1 is formatted and then
|
||||
* converted into upgrade state.
|
||||
*/
|
||||
@Test
|
||||
public void testUpgradeWithFormat() throws Exception {
|
||||
testUpgrade(UpgradeState.FORMAT);
|
||||
}
|
||||
|
||||
private void testUpgrade(UpgradeState state) throws Exception {
|
||||
cluster.transitionToActive(0);
|
||||
final Configuration confNN1 = cluster.getConfiguration(1);
|
||||
|
||||
final File current = cluster.getNameNode(1).getFSImage().getStorage()
|
||||
.getStorageDir(0).getCurrentDir();
|
||||
final File tmp = cluster.getNameNode(1).getFSImage().getStorage()
|
||||
.getStorageDir(0).getPreviousTmp();
|
||||
// shut down nn1
|
||||
cluster.shutdownNameNode(1);
|
||||
|
||||
// make NN0 in upgrade state
|
||||
FSImage fsImage0 = cluster.getNameNode(0).getNamesystem().getFSImage();
|
||||
Whitebox.setInternalState(fsImage0, "isUpgradeFinalized", false);
|
||||
|
||||
switch (state) {
|
||||
case RECOVER:
|
||||
// rename the current directory to previous.tmp in nn1
|
||||
NNStorage.rename(current, tmp);
|
||||
break;
|
||||
case FORMAT:
|
||||
// rename the current directory to a random name so it's not formatted
|
||||
final File wrongPath = new File(current.getParentFile(), "wrong");
|
||||
NNStorage.rename(current, wrongPath);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
||||
int rc = BootstrapStandby.run(new String[] { "-force" }, confNN1);
|
||||
assertEquals(0, rc);
|
||||
|
||||
// Should have copied over the namespace from the standby
|
||||
FSImageTestUtil.assertNNHasCheckpoints(cluster, 1,
|
||||
ImmutableList.of(0));
|
||||
FSImageTestUtil.assertNNFilesMatch(cluster);
|
||||
|
||||
// make sure the NN1 is in upgrade state, i.e., the previous directory has
|
||||
// been successfully created
|
||||
cluster.restartNameNode(1);
|
||||
assertFalse(cluster.getNameNode(1).getNamesystem().isUpgradeFinalized());
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -42,7 +42,6 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
|||
import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
|
||||
import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster.Builder;
|
||||
import org.apache.hadoop.hdfs.qjournal.server.Journal;
|
||||
import org.apache.hadoop.hdfs.qjournal.server.JournalNode;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
|
||||
import org.apache.hadoop.hdfs.server.common.Storage;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||
|
@ -594,7 +593,7 @@ public class TestDFSUpgradeWithHA {
|
|||
cluster.restartNameNode(1);
|
||||
|
||||
checkNnPreviousDirExistence(cluster, 0, true);
|
||||
checkNnPreviousDirExistence(cluster, 1, false);
|
||||
checkNnPreviousDirExistence(cluster, 1, true);
|
||||
checkPreviousDirExistence(sharedDir, true);
|
||||
assertCTimesEqual(cluster);
|
||||
|
||||
|
@ -671,7 +670,7 @@ public class TestDFSUpgradeWithHA {
|
|||
cluster.restartNameNode(1);
|
||||
|
||||
checkNnPreviousDirExistence(cluster, 0, true);
|
||||
checkNnPreviousDirExistence(cluster, 1, false);
|
||||
checkNnPreviousDirExistence(cluster, 1, true);
|
||||
checkJnPreviousDirExistence(qjCluster, true);
|
||||
assertCTimesEqual(cluster);
|
||||
|
||||
|
|
Loading…
Reference in New Issue