HDFS-2731. Add command to bootstrap the Standby Node's name directories from the Active NameNode. Contributed by Todd Lipcon.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1299809 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2012-03-12 19:44:43 +00:00
parent 275323b42f
commit 7b9ebbb217
14 changed files with 685 additions and 62 deletions

View File

@ -130,6 +130,9 @@ Release 0.23.3 - UNRELEASED
HDFS-3056. Add a new interface RollingLogs for DataBlockScanner logging.
(szetszwo)
HDFS-2731. Add command to bootstrap the Standby Node's name directories
from the Active NameNode. (todd)
OPTIMIZATIONS
HDFS-2477. Optimize computing the diff between a block report and the
namenode state. (Tomasz Nykiel via hairong)

View File

@ -52,7 +52,8 @@ public final class HdfsServerConstants {
UPGRADE ("-upgrade"),
ROLLBACK("-rollback"),
FINALIZE("-finalize"),
IMPORT ("-importCheckpoint");
IMPORT ("-importCheckpoint"),
BOOTSTRAPSTANDBY("-bootstrapStandby");
private String name = null;

View File

@ -53,6 +53,7 @@ import org.apache.hadoop.hdfs.server.protocol.CheckpointCommand;
import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.util.MD5FileUtils;
import org.apache.hadoop.io.MD5Hash;
import org.apache.hadoop.hdfs.DFSConfigKeys;
@ -97,7 +98,7 @@ public class FSImage implements Closeable {
* Collection imageDirs, Collection editsDirs)
* @throws IOException if default directories are invalid.
*/
protected FSImage(Configuration conf) throws IOException {
public FSImage(Configuration conf) throws IOException {
this(conf,
FSNamesystem.getNamespaceDirs(conf),
FSNamesystem.getNamespaceEditsDirs(conf));
@ -142,7 +143,9 @@ public class FSImage implements Closeable {
Preconditions.checkState(fileCount == 1,
"FSImage.format should be called with an uninitialized namesystem, has " +
fileCount + " files");
storage.format(clusterId);
NamespaceInfo ns = NNStorage.newNamespaceInfo();
ns.clusterID = clusterId;
storage.format(ns);
saveFSImageInAllDirs(fsn, 0);
}
@ -1040,7 +1043,7 @@ public class FSImage implements Closeable {
* renames the image from fsimage_N.ckpt to fsimage_N and also
* saves the related .md5 file into place.
*/
synchronized void saveDigestAndRenameCheckpointImage(
public synchronized void saveDigestAndRenameCheckpointImage(
long txid, MD5Hash digest) throws IOException {
renameCheckpoint(txid);
List<StorageDirectory> badSds = Lists.newArrayList();

View File

@ -712,29 +712,36 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
*/
public static List<URI> getNamespaceEditsDirs(Configuration conf)
throws IOException {
return getNamespaceEditsDirs(conf, true);
}
public static List<URI> getNamespaceEditsDirs(Configuration conf,
boolean includeShared)
throws IOException {
// Use a LinkedHashSet so that order is maintained while we de-dup
// the entries.
LinkedHashSet<URI> editsDirs = new LinkedHashSet<URI>();
List<URI> sharedDirs = getSharedEditsDirs(conf);
if (includeShared) {
List<URI> sharedDirs = getSharedEditsDirs(conf);
// Fail until multiple shared edits directories are supported (HDFS-2782)
if (sharedDirs.size() > 1) {
throw new IOException(
"Multiple shared edits directories are not yet supported");
}
// Fail until multiple shared edits directories are supported (HDFS-2782)
if (sharedDirs.size() > 1) {
throw new IOException(
"Multiple shared edits directories are not yet supported");
}
// First add the shared edits dirs. It's critical that the shared dirs
// are added first, since JournalSet syncs them in the order they are listed,
// and we need to make sure all edits are in place in the shared storage
// before they are replicated locally. See HDFS-2874.
for (URI dir : sharedDirs) {
if (!editsDirs.add(dir)) {
LOG.warn("Edits URI " + dir + " listed multiple times in " +
DFS_NAMENODE_SHARED_EDITS_DIR_KEY + ". Ignoring duplicates.");
// First add the shared edits dirs. It's critical that the shared dirs
// are added first, since JournalSet syncs them in the order they are listed,
// and we need to make sure all edits are in place in the shared storage
// before they are replicated locally. See HDFS-2874.
for (URI dir : sharedDirs) {
if (!editsDirs.add(dir)) {
LOG.warn("Edits URI " + dir + " listed multiple times in " +
DFS_NAMENODE_SHARED_EDITS_DIR_KEY + ". Ignoring duplicates.");
}
}
}
// Now add the non-shared dirs.
for (URI dir : getStorageDirs(conf, DFS_NAMENODE_EDITS_DIR_KEY)) {
if (!editsDirs.add(dir)) {

View File

@ -28,7 +28,6 @@ import java.net.URI;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@ -51,6 +50,7 @@ import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.UpgradeManager;
import org.apache.hadoop.hdfs.server.common.Util;
import org.apache.hadoop.hdfs.server.namenode.JournalStream.JournalType;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.util.AtomicFileOutputStream;
import org.apache.hadoop.io.IOUtils;
@ -545,12 +545,31 @@ public class NNStorage extends Storage implements Closeable {
/**
* Format all available storage directories.
*/
public void format(String clusterId) throws IOException {
public void format(NamespaceInfo nsInfo) throws IOException {
Preconditions.checkArgument(nsInfo.getLayoutVersion() == 0 ||
nsInfo.getLayoutVersion() == HdfsConstants.LAYOUT_VERSION,
"Bad layout version: %s", nsInfo.getLayoutVersion());
this.setStorageInfo(nsInfo);
this.blockpoolID = nsInfo.getBlockPoolID();
for (Iterator<StorageDirectory> it =
dirIterator(); it.hasNext();) {
StorageDirectory sd = it.next();
format(sd);
}
}
public static NamespaceInfo newNamespaceInfo()
throws UnknownHostException {
return new NamespaceInfo(
newNamespaceID(),
newClusterID(),
newBlockPoolID(),
0L, 0);
}
public void format() throws IOException {
this.layoutVersion = HdfsConstants.LAYOUT_VERSION;
this.namespaceID = newNamespaceID();
this.clusterID = clusterId;
this.blockpoolID = newBlockPoolID();
this.cTime = 0L;
for (Iterator<StorageDirectory> it =
dirIterator(); it.hasNext();) {
StorageDirectory sd = it.next();
@ -570,7 +589,7 @@ public class NNStorage extends Storage implements Closeable {
*
* @return new namespaceID
*/
private int newNamespaceID() {
private static int newNamespaceID() {
int newID = 0;
while(newID == 0)
newID = DFSUtil.getRandom().nextInt(0x7FFFFFFF); // use 31 bits only
@ -991,7 +1010,7 @@ public class NNStorage extends Storage implements Closeable {
*
* @return new blockpoolID
*/
String newBlockPoolID() throws UnknownHostException{
static String newBlockPoolID() throws UnknownHostException{
String ip = "unknownIP";
try {
ip = DNS.getDefaultIP("default");

View File

@ -21,6 +21,7 @@ import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
@ -46,6 +47,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
import org.apache.hadoop.hdfs.server.namenode.ha.ActiveState;
import org.apache.hadoop.hdfs.server.namenode.ha.BootstrapStandby;
import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
import org.apache.hadoop.hdfs.server.namenode.ha.HAState;
import org.apache.hadoop.hdfs.server.namenode.ha.StandbyState;
@ -652,32 +654,13 @@ public class NameNode {
String nsId = DFSUtil.getNamenodeNameServiceId(conf);
String namenodeId = HAUtil.getNameNodeId(conf, nsId);
initializeGenericKeys(conf, nsId, namenodeId);
if (!conf.getBoolean(DFS_NAMENODE_SUPPORT_ALLOW_FORMAT_KEY,
DFS_NAMENODE_SUPPORT_ALLOW_FORMAT_DEFAULT)) {
throw new IOException("The option " + DFS_NAMENODE_SUPPORT_ALLOW_FORMAT_KEY
+ " is set to false for this filesystem, so it "
+ "cannot be formatted. You will need to set "
+ DFS_NAMENODE_SUPPORT_ALLOW_FORMAT_KEY +" parameter "
+ "to true in order to format this filesystem");
}
checkAllowFormat(conf);
Collection<URI> dirsToFormat = FSNamesystem.getNamespaceDirs(conf);
List<URI> editDirsToFormat =
FSNamesystem.getNamespaceEditsDirs(conf);
for(Iterator<URI> it = dirsToFormat.iterator(); it.hasNext();) {
File curDir = new File(it.next().getPath());
// Its alright for a dir not to exist, or to exist (properly accessible)
// and be completely empty.
if (!curDir.exists() ||
(curDir.isDirectory() && FileUtil.listFiles(curDir).length == 0))
continue;
if (isConfirmationNeeded) {
if (!confirmPrompt("Re-format filesystem in " + curDir + " ?")) {
System.err.println("Format aborted in "+ curDir);
return true;
}
}
if (!confirmFormat(dirsToFormat, isConfirmationNeeded, true)) {
return true; // aborted
}
// if clusterID is not provided - see if you can find the current one
@ -694,6 +677,58 @@ public class NameNode {
return false;
}
/**
* Check whether the given storage directories already exist.
* If running in interactive mode, will prompt the user for each
* directory to allow them to format anyway. Otherwise, returns
* false, unless 'force' is specified.
*
* @param dirsToFormat the dirs to check
* @param force format regardless of whether dirs exist
* @param interactive prompt the user when a dir exists
* @return true if formatting should proceed
* @throws IOException
*/
public static boolean confirmFormat(Collection<URI> dirsToFormat,
boolean force, boolean interactive)
throws IOException {
for(Iterator<URI> it = dirsToFormat.iterator(); it.hasNext();) {
File curDir = new File(it.next().getPath());
// Its alright for a dir not to exist, or to exist (properly accessible)
// and be completely empty.
if (!curDir.exists() ||
(curDir.isDirectory() && FileUtil.listFiles(curDir).length == 0))
continue;
if (force) { // Don't confirm, always format.
System.err.println(
"Storage directory exists in " + curDir + ". Formatting anyway.");
continue;
}
if (!interactive) { // Don't ask - always don't format
System.err.println(
"Running in non-interactive mode, and image appears to exist in " +
curDir + ". Not formatting.");
return false;
}
if (!confirmPrompt("Re-format filesystem in " + curDir + " ?")) {
System.err.println("Format aborted in " + curDir);
return false;
}
}
return true;
}
public static void checkAllowFormat(Configuration conf) throws IOException {
if (!conf.getBoolean(DFS_NAMENODE_SUPPORT_ALLOW_FORMAT_KEY,
DFS_NAMENODE_SUPPORT_ALLOW_FORMAT_DEFAULT)) {
throw new IOException("The option " + DFS_NAMENODE_SUPPORT_ALLOW_FORMAT_KEY
+ " is set to false for this filesystem, so it "
+ "cannot be formatted. You will need to set "
+ DFS_NAMENODE_SUPPORT_ALLOW_FORMAT_KEY +" parameter "
+ "to true in order to format this filesystem");
}
}
private static boolean finalize(Configuration conf,
boolean isConfirmationNeeded
) throws IOException {
@ -726,7 +761,8 @@ public class NameNode {
StartupOption.UPGRADE.getName() + "] | [" +
StartupOption.ROLLBACK.getName() + "] | [" +
StartupOption.FINALIZE.getName() + "] | [" +
StartupOption.IMPORT.getName() + "]");
StartupOption.IMPORT.getName() + "] | [" +
StartupOption.BOOTSTRAPSTANDBY.getName() + "]");
}
private static StartupOption parseArguments(String args[]) {
@ -764,8 +800,12 @@ public class NameNode {
startOpt = StartupOption.FINALIZE;
} else if (StartupOption.IMPORT.getName().equalsIgnoreCase(cmd)) {
startOpt = StartupOption.IMPORT;
} else
} else if (StartupOption.BOOTSTRAPSTANDBY.getName().equalsIgnoreCase(cmd)) {
startOpt = StartupOption.BOOTSTRAPSTANDBY;
return startOpt;
} else {
return null;
}
}
return startOpt;
}
@ -840,6 +880,11 @@ public class NameNode {
aborted = finalize(conf, true);
System.exit(aborted ? 1 : 0);
return null; // avoid javac warning
case BOOTSTRAPSTANDBY:
String toolArgs[] = Arrays.copyOfRange(argv, 1, argv.length);
int rc = BootstrapStandby.run(toolArgs, conf);
System.exit(rc);
return null; // avoid warning
case BACKUP:
case CHECKPOINT:
NamenodeRole role = startOpt.toNodeRole();

View File

@ -41,7 +41,6 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.ha.HealthCheckFailedException;
import org.apache.hadoop.ha.ServiceFailedException;
import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.HAServiceProtocolService;
@ -114,7 +113,6 @@ import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.ipc.WritableRpcEngine;
import org.apache.hadoop.net.Node;

View File

@ -251,8 +251,7 @@ public class SecondaryNameNode implements Runnable {
new AccessControlList(conf.get(DFS_ADMIN, " ")));
if(UserGroupInformation.isSecurityEnabled()) {
System.setProperty("https.cipherSuites",
Krb5AndCertsSslSocketConnector.KRB5_CIPHER_SUITES.get(0));
SecurityUtil.initKrb5CipherSuites();
InetSocketAddress secInfoSocAddr =
NetUtils.createSocketAddr(infoBindAddress + ":"+ conf.getInt(
DFS_NAMENODE_SECONDARY_HTTPS_PORT_KEY,

View File

@ -52,7 +52,7 @@ public class TransferFsImage {
private static final Log LOG = LogFactory.getLog(TransferFsImage.class);
static MD5Hash downloadImageToStorage(
public static MD5Hash downloadImageToStorage(
String fsName, long imageTxId, NNStorage dstStorage, boolean needDigest)
throws IOException {
String fileid = GetImageServlet.getParamStringForImage(

View File

@ -0,0 +1,294 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode.ha;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.security.PrivilegedAction;
import java.util.Collection;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.hdfs.NameNodeProxies;
import org.apache.hadoop.hdfs.NameNodeProxies.ProxyAndInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature;
import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
import org.apache.hadoop.hdfs.server.namenode.FSImage;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NNStorage;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.TransferFsImage;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.MD5Hash;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
/**
* Tool which allows the standby node's storage directories to be bootstrapped
* by copying the latest namespace snapshot from the active namenode. This is
* used when first configuring an HA cluster.
*/
@InterfaceAudience.Private
public class BootstrapStandby implements Tool, Configurable {
private static final Log LOG = LogFactory.getLog(BootstrapStandby.class);
private String nsId;
private String nnId;
private String otherNNId;
private String otherHttpAddr;
private InetSocketAddress otherIpcAddr;
private Collection<URI> dirsToFormat;
private List<URI> editUrisToFormat;
private List<URI> sharedEditsUris;
private Configuration conf;
private boolean force = false;
private boolean interactive = true;
public int run(String[] args) throws Exception {
SecurityUtil.initKrb5CipherSuites();
parseArgs(args);
parseConfAndFindOtherNN();
NameNode.checkAllowFormat(conf);
InetSocketAddress myAddr = NameNode.getAddress(conf);
SecurityUtil.login(conf, DFS_NAMENODE_KEYTAB_FILE_KEY,
DFS_NAMENODE_USER_NAME_KEY, myAddr.getHostName());
return SecurityUtil.doAsLoginUserOrFatal(new PrivilegedAction<Integer>() {
@Override
public Integer run() {
try {
return doRun();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
});
}
private void parseArgs(String[] args) {
for (String arg : args) {
if ("-force".equals(arg)) {
force = true;
} else if ("-nonInteractive".equals(arg)) {
interactive = false;
} else {
printUsage();
throw new HadoopIllegalArgumentException(
"Illegal argument: " + arg);
}
}
}
private void printUsage() {
System.err.println("Usage: " + this.getClass().getSimpleName() +
"[-force] [-nonInteractive]");
}
private int doRun() throws IOException {
ProxyAndInfo<NamenodeProtocol> proxyAndInfo = NameNodeProxies.createNonHAProxy(getConf(),
otherIpcAddr, NamenodeProtocol.class,
UserGroupInformation.getLoginUser(), true);
NamenodeProtocol proxy = proxyAndInfo.getProxy();
NamespaceInfo nsInfo;
try {
nsInfo = proxy.versionRequest();
checkLayoutVersion(nsInfo);
} catch (IOException ioe) {
LOG.fatal("Unable to fetch namespace information from active NN at " +
otherIpcAddr + ": " + ioe.getMessage());
if (LOG.isDebugEnabled()) {
LOG.debug("Full exception trace", ioe);
}
return 1;
}
System.out.println(
"=====================================================\n" +
"About to bootstrap Standby ID " + nnId + " from:\n" +
" Nameservice ID: " + nsId + "\n" +
" Other Namenode ID: " + otherNNId + "\n" +
" Other NN's HTTP address: " + otherHttpAddr + "\n" +
" Other NN's IPC address: " + otherIpcAddr + "\n" +
" Namespace ID: " + nsInfo.getNamespaceID() + "\n" +
" Block pool ID: " + nsInfo.getBlockPoolID() + "\n" +
" Cluster ID: " + nsInfo.getClusterID() + "\n" +
" Layout version: " + nsInfo.getLayoutVersion() + "\n" +
"=====================================================");
// Check with the user before blowing away data.
if (!NameNode.confirmFormat(
Sets.union(Sets.newHashSet(dirsToFormat),
Sets.newHashSet(editUrisToFormat)),
force, interactive)) {
return 1;
}
// Force the active to roll its log
CheckpointSignature csig = proxy.rollEditLog();
long imageTxId = csig.getMostRecentCheckpointTxId();
long rollTxId = csig.getCurSegmentTxId();
// Format the storage (writes VERSION file)
NNStorage storage = new NNStorage(conf, dirsToFormat, editUrisToFormat);
storage.format(nsInfo);
// Load the newly formatted image, using all of the directories (including shared
// edits)
FSImage image = new FSImage(conf);
assert image.getEditLog().isOpenForRead() :
"Expected edit log to be open for read";
// Ensure that we have enough edits already in the shared directory to
// start up from the last checkpoint on the active.
if (!checkLogsAvailableForRead(image, imageTxId, rollTxId)) {
return 1;
}
image.getStorage().writeTransactionIdFileToStorage(rollTxId);
// Download that checkpoint into our storage directories.
MD5Hash hash = TransferFsImage.downloadImageToStorage(
otherHttpAddr.toString(), imageTxId,
storage, true);
image.saveDigestAndRenameCheckpointImage(imageTxId, hash);
return 0;
}
private boolean checkLogsAvailableForRead(FSImage image, long imageTxId,
long rollTxId) {
long firstTxIdInLogs = imageTxId + 1;
long lastTxIdInLogs = rollTxId - 1;
assert lastTxIdInLogs >= firstTxIdInLogs;
try {
Collection<EditLogInputStream> streams =
image.getEditLog().selectInputStreams(
firstTxIdInLogs, lastTxIdInLogs, false);
for (EditLogInputStream stream : streams) {
IOUtils.closeStream(stream);
}
return true;
} catch (IOException e) {
String msg = "Unable to read transaction ids " +
firstTxIdInLogs + "-" + lastTxIdInLogs +
" from the configured shared edits storage " +
Joiner.on(",").join(sharedEditsUris) + ". " +
"Please copy these logs into the shared edits storage " +
"or call saveNamespace on the active node.\n" +
"Error: " + e.getLocalizedMessage();
if (LOG.isDebugEnabled()) {
LOG.fatal(msg, e);
} else {
LOG.fatal(msg);
}
return false;
}
}
private void checkLayoutVersion(NamespaceInfo nsInfo) throws IOException {
if (nsInfo.getLayoutVersion() != HdfsConstants.LAYOUT_VERSION) {
throw new IOException("Layout version on remote node (" +
nsInfo.getLayoutVersion() + ") does not match " +
"this node's layout version (" + HdfsConstants.LAYOUT_VERSION + ")");
}
}
private void parseConfAndFindOtherNN() throws IOException {
Configuration conf = getConf();
nsId = DFSUtil.getNamenodeNameServiceId(conf);
if (!HAUtil.isHAEnabled(conf, nsId)) {
throw new HadoopIllegalArgumentException(
"HA is not enabled for this namenode.");
}
nnId = HAUtil.getNameNodeId(conf, nsId);
NameNode.initializeGenericKeys(conf, nsId, nnId);
if (!HAUtil.usesSharedEditsDir(conf)) {
throw new HadoopIllegalArgumentException(
"Shared edits storage is not enabled for this namenode.");
}
Configuration otherNode = HAUtil.getConfForOtherNode(conf);
otherNNId = HAUtil.getNameNodeId(otherNode, nsId);
otherIpcAddr = NameNode.getServiceAddress(otherNode, true);
Preconditions.checkArgument(otherIpcAddr.getPort() != 0 &&
!otherIpcAddr.getAddress().isAnyLocalAddress(),
"Could not determine valid IPC address for other NameNode (%s)" +
", got: %s", otherNNId, otherIpcAddr);
otherHttpAddr = DFSUtil.getInfoServer(null, otherNode, true);
otherHttpAddr = DFSUtil.substituteForWildcardAddress(otherHttpAddr,
otherIpcAddr.getHostName());
dirsToFormat = FSNamesystem.getNamespaceDirs(conf);
editUrisToFormat = FSNamesystem.getNamespaceEditsDirs(
conf, false);
sharedEditsUris = FSNamesystem.getSharedEditsDirs(conf);
}
@Override
public void setConf(Configuration conf) {
this.conf = conf;
}
@Override
public Configuration getConf() {
return conf;
}
public static int run(String[] argv, Configuration conf) throws IOException {
BootstrapStandby bs = new BootstrapStandby();
bs.setConf(conf);
try {
return ToolRunner.run(bs, argv);
} catch (Exception e) {
if (e instanceof IOException) {
throw (IOException)e;
} else {
throw new IOException(e);
}
}
}
}

View File

@ -245,8 +245,7 @@ public class DFSck extends Configured implements Tool {
private int doWork(final String[] args) throws IOException {
String proto = "http://";
if (UserGroupInformation.isSecurityEnabled()) {
System.setProperty("https.cipherSuites",
Krb5AndCertsSslSocketConnector.KRB5_CIPHER_SUITES.get(0));
SecurityUtil.initKrb5CipherSuites();
proto = "https://";
}
final StringBuilder url = new StringBuilder(proto);

View File

@ -52,6 +52,7 @@ import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
import org.apache.hadoop.hdfs.server.namenode.NNStorage;
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.log4j.Level;
@ -1002,7 +1003,7 @@ public class TestEditLog extends TestCase {
NNStorage storage = new NNStorage(new Configuration(),
Collections.<URI>emptyList(),
editUris);
storage.format("test-cluster-id");
storage.format(new NamespaceInfo());
FSEditLog editlog = new FSEditLog(storage);
// open the edit log and add two transactions
// logGenerationStamp is used, simply because it doesn't

View File

@ -0,0 +1,213 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode.ha;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.Collections;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature;
import org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil;
import org.apache.hadoop.hdfs.server.namenode.NNStorage;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
import org.codehaus.jackson.sym.NameN;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import static org.junit.Assert.*;
public class TestBootstrapStandby {
private static final Log LOG = LogFactory.getLog(TestBootstrapStandby.class);
private MiniDFSCluster cluster;
private NameNode nn0;
@Before
public void setupCluster() throws IOException {
Configuration conf = new Configuration();
MiniDFSNNTopology topology = new MiniDFSNNTopology()
.addNameservice(new MiniDFSNNTopology.NSConf("ns1")
.addNN(new MiniDFSNNTopology.NNConf("nn1").setHttpPort(10001))
.addNN(new MiniDFSNNTopology.NNConf("nn2").setHttpPort(10002)));
cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(topology)
.numDataNodes(0)
.build();
cluster.waitActive();
nn0 = cluster.getNameNode(0);
cluster.transitionToActive(0);
cluster.shutdownNameNode(1);
}
@After
public void shutdownCluster() throws IOException {
if (cluster != null) {
cluster.shutdown();
}
}
/**
* Test for the base success case. The primary NN
* hasn't made any checkpoints, and we copy the fsimage_0
* file over and start up.
*/
@Test
public void testSuccessfulBaseCase() throws Exception {
removeStandbyNameDirs();
try {
cluster.restartNameNode(1);
fail("Did not throw");
} catch (IOException ioe) {
GenericTestUtils.assertExceptionContains(
"Cannot start an HA namenode with name dirs that need recovery",
ioe);
}
int rc = BootstrapStandby.run(
new String[]{"-nonInteractive"},
cluster.getConfiguration(1));
assertEquals(0, rc);
// Should have copied over the namespace from the active
FSImageTestUtil.assertNNHasCheckpoints(cluster, 1,
ImmutableList.of(0));
assertNNFilesMatch();
// We should now be able to start the standby successfully.
cluster.restartNameNode(1);
}
/**
* Test for downloading a checkpoint made at a later checkpoint
* from the active.
*/
@Test
public void testDownloadingLaterCheckpoint() throws Exception {
// Roll edit logs a few times to inflate txid
nn0.getRpcServer().rollEditLog();
nn0.getRpcServer().rollEditLog();
// Make checkpoint
NameNodeAdapter.enterSafeMode(nn0, false);
NameNodeAdapter.saveNamespace(nn0);
NameNodeAdapter.leaveSafeMode(nn0, false);
long expectedCheckpointTxId = NameNodeAdapter.getNamesystem(nn0)
.getFSImage().getMostRecentCheckpointTxId();
assertEquals(6, expectedCheckpointTxId);
int rc = BootstrapStandby.run(
new String[]{"-force"},
cluster.getConfiguration(1));
assertEquals(0, rc);
// Should have copied over the namespace from the active
FSImageTestUtil.assertNNHasCheckpoints(cluster, 1,
ImmutableList.of((int)expectedCheckpointTxId));
assertNNFilesMatch();
// We should now be able to start the standby successfully.
cluster.restartNameNode(1);
}
/**
* Test for the case where the shared edits dir doesn't have
* all of the recent edit logs.
*/
@Test
public void testSharedEditsMissingLogs() throws Exception {
removeStandbyNameDirs();
CheckpointSignature sig = nn0.getRpcServer().rollEditLog();
assertEquals(3, sig.getCurSegmentTxId());
// Should have created edits_1-2 in shared edits dir
URI editsUri = cluster.getSharedEditsDir(0, 1);
File editsDir = new File(editsUri);
File editsSegment = new File(new File(editsDir, "current"),
NNStorage.getFinalizedEditsFileName(1, 2));
GenericTestUtils.assertExists(editsSegment);
// Delete the segment.
assertTrue(editsSegment.delete());
// Trying to bootstrap standby should now fail since the edit
// logs aren't available in the shared dir.
LogCapturer logs = GenericTestUtils.LogCapturer.captureLogs(
LogFactory.getLog(BootstrapStandby.class));
try {
int rc = BootstrapStandby.run(
new String[]{"-force"},
cluster.getConfiguration(1));
assertEquals(1, rc);
} finally {
logs.stopCapturing();
}
GenericTestUtils.assertMatches(logs.getOutput(),
"FATAL.*Unable to read transaction ids 1-4 from the configured shared");
}
@Test
public void testStandbyDirsAlreadyExist() throws Exception {
// Should not pass since standby dirs exist, force not given
int rc = BootstrapStandby.run(
new String[]{"-nonInteractive"},
cluster.getConfiguration(1));
assertEquals(1, rc);
// Should pass with -force
rc = BootstrapStandby.run(
new String[]{"-force"},
cluster.getConfiguration(1));
assertEquals(0, rc);
}
private void assertNNFilesMatch() throws Exception {
List<File> curDirs = Lists.newArrayList();
curDirs.addAll(FSImageTestUtil.getNameNodeCurrentDirs(cluster, 0));
curDirs.addAll(FSImageTestUtil.getNameNodeCurrentDirs(cluster, 1));
FSImageTestUtil.assertParallelFilesAreIdentical(curDirs,
Collections.<String>emptySet());
}
private void removeStandbyNameDirs() {
for (URI u : cluster.getNameDirs(1)) {
assertTrue(u.getScheme().equals("file"));
File dir = new File(u.getPath());
LOG.info("Removing standby dir " + dir);
assertTrue(FileUtil.fullyDelete(dir));
}
}
}

View File

@ -19,15 +19,21 @@ package org.apache.hadoop.test;
import java.io.File;
import java.io.IOException;
import java.io.StringWriter;
import java.util.Arrays;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeoutException;
import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.util.StringUtils;
import org.apache.log4j.Layout;
import org.apache.log4j.Logger;
import org.apache.log4j.WriterAppender;
import org.junit.Assert;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@ -100,6 +106,35 @@ public abstract class GenericTestUtils {
throw new TimeoutException("Timed out waiting for condition");
}
public static class LogCapturer {
private StringWriter sw = new StringWriter();
private WriterAppender appender;
private Logger logger;
public static LogCapturer captureLogs(Log l) {
Logger logger = ((Log4JLogger)l).getLogger();
LogCapturer c = new LogCapturer(logger);
return c;
}
private LogCapturer(Logger logger) {
this.logger = logger;
Layout layout = Logger.getRootLogger().getAppender("stdout").getLayout();
WriterAppender wa = new WriterAppender(layout, sw);
logger.addAppender(wa);
}
public String getOutput() {
return sw.toString();
}
public void stopCapturing() {
logger.removeAppender(appender);
}
}
/**
* Mockito answer helper that triggers one latch as soon as the
@ -245,4 +280,10 @@ public abstract class GenericTestUtils {
}
}
}
public static void assertMatches(String output, String pattern) {
Assert.assertTrue("Expected output to match /" + pattern + "/" +
" but got:\n" + output,
Pattern.compile(pattern).matcher(output).find());
}
}