HDFS-2731. Add command to bootstrap the Standby Node's name directories from the Active NameNode. Contributed by Todd Lipcon.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1299807 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
3f46a62ff9
commit
1a75ec8288
|
@ -220,6 +220,9 @@ Release 0.23.3 - UNRELEASED
|
|||
HDFS-3056. Add a new interface RollingLogs for DataBlockScanner logging.
|
||||
(szetszwo)
|
||||
|
||||
HDFS-2731. Add command to bootstrap the Standby Node's name directories
|
||||
from the Active NameNode. (todd)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
HDFS-3024. Improve performance of stringification in addStoredBlock (todd)
|
||||
|
|
|
@ -52,7 +52,8 @@ public final class HdfsServerConstants {
|
|||
UPGRADE ("-upgrade"),
|
||||
ROLLBACK("-rollback"),
|
||||
FINALIZE("-finalize"),
|
||||
IMPORT ("-importCheckpoint");
|
||||
IMPORT ("-importCheckpoint"),
|
||||
BOOTSTRAPSTANDBY("-bootstrapStandby");
|
||||
|
||||
private String name = null;
|
||||
|
||||
|
|
|
@ -53,6 +53,7 @@ import org.apache.hadoop.hdfs.server.protocol.CheckpointCommand;
|
|||
import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
|
||||
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
|
||||
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
|
||||
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
||||
import org.apache.hadoop.hdfs.util.MD5FileUtils;
|
||||
import org.apache.hadoop.io.MD5Hash;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
|
@ -97,7 +98,7 @@ public class FSImage implements Closeable {
|
|||
* Collection imageDirs, Collection editsDirs)
|
||||
* @throws IOException if default directories are invalid.
|
||||
*/
|
||||
protected FSImage(Configuration conf) throws IOException {
|
||||
public FSImage(Configuration conf) throws IOException {
|
||||
this(conf,
|
||||
FSNamesystem.getNamespaceDirs(conf),
|
||||
FSNamesystem.getNamespaceEditsDirs(conf));
|
||||
|
@ -142,7 +143,9 @@ public class FSImage implements Closeable {
|
|||
Preconditions.checkState(fileCount == 1,
|
||||
"FSImage.format should be called with an uninitialized namesystem, has " +
|
||||
fileCount + " files");
|
||||
storage.format(clusterId);
|
||||
NamespaceInfo ns = NNStorage.newNamespaceInfo();
|
||||
ns.clusterID = clusterId;
|
||||
storage.format(ns);
|
||||
saveFSImageInAllDirs(fsn, 0);
|
||||
}
|
||||
|
||||
|
@ -1040,7 +1043,7 @@ public class FSImage implements Closeable {
|
|||
* renames the image from fsimage_N.ckpt to fsimage_N and also
|
||||
* saves the related .md5 file into place.
|
||||
*/
|
||||
synchronized void saveDigestAndRenameCheckpointImage(
|
||||
public synchronized void saveDigestAndRenameCheckpointImage(
|
||||
long txid, MD5Hash digest) throws IOException {
|
||||
renameCheckpoint(txid);
|
||||
List<StorageDirectory> badSds = Lists.newArrayList();
|
||||
|
|
|
@ -712,29 +712,36 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
*/
|
||||
public static List<URI> getNamespaceEditsDirs(Configuration conf)
|
||||
throws IOException {
|
||||
return getNamespaceEditsDirs(conf, true);
|
||||
}
|
||||
|
||||
public static List<URI> getNamespaceEditsDirs(Configuration conf,
|
||||
boolean includeShared)
|
||||
throws IOException {
|
||||
// Use a LinkedHashSet so that order is maintained while we de-dup
|
||||
// the entries.
|
||||
LinkedHashSet<URI> editsDirs = new LinkedHashSet<URI>();
|
||||
|
||||
List<URI> sharedDirs = getSharedEditsDirs(conf);
|
||||
|
||||
// Fail until multiple shared edits directories are supported (HDFS-2782)
|
||||
if (sharedDirs.size() > 1) {
|
||||
throw new IOException(
|
||||
"Multiple shared edits directories are not yet supported");
|
||||
}
|
||||
|
||||
// First add the shared edits dirs. It's critical that the shared dirs
|
||||
// are added first, since JournalSet syncs them in the order they are listed,
|
||||
// and we need to make sure all edits are in place in the shared storage
|
||||
// before they are replicated locally. See HDFS-2874.
|
||||
for (URI dir : sharedDirs) {
|
||||
if (!editsDirs.add(dir)) {
|
||||
LOG.warn("Edits URI " + dir + " listed multiple times in " +
|
||||
DFS_NAMENODE_SHARED_EDITS_DIR_KEY + ". Ignoring duplicates.");
|
||||
if (includeShared) {
|
||||
List<URI> sharedDirs = getSharedEditsDirs(conf);
|
||||
|
||||
// Fail until multiple shared edits directories are supported (HDFS-2782)
|
||||
if (sharedDirs.size() > 1) {
|
||||
throw new IOException(
|
||||
"Multiple shared edits directories are not yet supported");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// First add the shared edits dirs. It's critical that the shared dirs
|
||||
// are added first, since JournalSet syncs them in the order they are listed,
|
||||
// and we need to make sure all edits are in place in the shared storage
|
||||
// before they are replicated locally. See HDFS-2874.
|
||||
for (URI dir : sharedDirs) {
|
||||
if (!editsDirs.add(dir)) {
|
||||
LOG.warn("Edits URI " + dir + " listed multiple times in " +
|
||||
DFS_NAMENODE_SHARED_EDITS_DIR_KEY + ". Ignoring duplicates.");
|
||||
}
|
||||
}
|
||||
}
|
||||
// Now add the non-shared dirs.
|
||||
for (URI dir : getStorageDirs(conf, DFS_NAMENODE_EDITS_DIR_KEY)) {
|
||||
if (!editsDirs.add(dir)) {
|
||||
|
|
|
@ -28,7 +28,6 @@ import java.net.URI;
|
|||
import java.net.UnknownHostException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
|
@ -51,6 +50,7 @@ import org.apache.hadoop.hdfs.server.common.Storage;
|
|||
import org.apache.hadoop.hdfs.server.common.UpgradeManager;
|
||||
import org.apache.hadoop.hdfs.server.common.Util;
|
||||
import org.apache.hadoop.hdfs.server.namenode.JournalStream.JournalType;
|
||||
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
||||
import org.apache.hadoop.hdfs.util.AtomicFileOutputStream;
|
||||
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
|
@ -551,12 +551,31 @@ public class NNStorage extends Storage implements Closeable {
|
|||
/**
|
||||
* Format all available storage directories.
|
||||
*/
|
||||
public void format(String clusterId) throws IOException {
|
||||
public void format(NamespaceInfo nsInfo) throws IOException {
|
||||
Preconditions.checkArgument(nsInfo.getLayoutVersion() == 0 ||
|
||||
nsInfo.getLayoutVersion() == HdfsConstants.LAYOUT_VERSION,
|
||||
"Bad layout version: %s", nsInfo.getLayoutVersion());
|
||||
|
||||
this.setStorageInfo(nsInfo);
|
||||
this.blockpoolID = nsInfo.getBlockPoolID();
|
||||
for (Iterator<StorageDirectory> it =
|
||||
dirIterator(); it.hasNext();) {
|
||||
StorageDirectory sd = it.next();
|
||||
format(sd);
|
||||
}
|
||||
}
|
||||
|
||||
public static NamespaceInfo newNamespaceInfo()
|
||||
throws UnknownHostException {
|
||||
return new NamespaceInfo(
|
||||
newNamespaceID(),
|
||||
newClusterID(),
|
||||
newBlockPoolID(),
|
||||
0L, 0);
|
||||
}
|
||||
|
||||
public void format() throws IOException {
|
||||
this.layoutVersion = HdfsConstants.LAYOUT_VERSION;
|
||||
this.namespaceID = newNamespaceID();
|
||||
this.clusterID = clusterId;
|
||||
this.blockpoolID = newBlockPoolID();
|
||||
this.cTime = 0L;
|
||||
for (Iterator<StorageDirectory> it =
|
||||
dirIterator(); it.hasNext();) {
|
||||
StorageDirectory sd = it.next();
|
||||
|
@ -576,7 +595,7 @@ public class NNStorage extends Storage implements Closeable {
|
|||
*
|
||||
* @return new namespaceID
|
||||
*/
|
||||
private int newNamespaceID() {
|
||||
private static int newNamespaceID() {
|
||||
int newID = 0;
|
||||
while(newID == 0)
|
||||
newID = DFSUtil.getRandom().nextInt(0x7FFFFFFF); // use 31 bits only
|
||||
|
@ -997,7 +1016,7 @@ public class NNStorage extends Storage implements Closeable {
|
|||
*
|
||||
* @return new blockpoolID
|
||||
*/
|
||||
String newBlockPoolID() throws UnknownHostException{
|
||||
static String newBlockPoolID() throws UnknownHostException{
|
||||
String ip = "unknownIP";
|
||||
try {
|
||||
ip = DNS.getDefaultIP("default");
|
||||
|
|
|
@ -21,6 +21,7 @@ import java.io.File;
|
|||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.URI;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
|
@ -46,6 +47,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
|||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
|
||||
import org.apache.hadoop.hdfs.server.namenode.ha.ActiveState;
|
||||
import org.apache.hadoop.hdfs.server.namenode.ha.BootstrapStandby;
|
||||
import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
|
||||
import org.apache.hadoop.hdfs.server.namenode.ha.HAState;
|
||||
import org.apache.hadoop.hdfs.server.namenode.ha.StandbyState;
|
||||
|
@ -652,32 +654,13 @@ public class NameNode {
|
|||
String nsId = DFSUtil.getNamenodeNameServiceId(conf);
|
||||
String namenodeId = HAUtil.getNameNodeId(conf, nsId);
|
||||
initializeGenericKeys(conf, nsId, namenodeId);
|
||||
|
||||
if (!conf.getBoolean(DFS_NAMENODE_SUPPORT_ALLOW_FORMAT_KEY,
|
||||
DFS_NAMENODE_SUPPORT_ALLOW_FORMAT_DEFAULT)) {
|
||||
throw new IOException("The option " + DFS_NAMENODE_SUPPORT_ALLOW_FORMAT_KEY
|
||||
+ " is set to false for this filesystem, so it "
|
||||
+ "cannot be formatted. You will need to set "
|
||||
+ DFS_NAMENODE_SUPPORT_ALLOW_FORMAT_KEY +" parameter "
|
||||
+ "to true in order to format this filesystem");
|
||||
}
|
||||
checkAllowFormat(conf);
|
||||
|
||||
Collection<URI> dirsToFormat = FSNamesystem.getNamespaceDirs(conf);
|
||||
List<URI> editDirsToFormat =
|
||||
FSNamesystem.getNamespaceEditsDirs(conf);
|
||||
for(Iterator<URI> it = dirsToFormat.iterator(); it.hasNext();) {
|
||||
File curDir = new File(it.next().getPath());
|
||||
// Its alright for a dir not to exist, or to exist (properly accessible)
|
||||
// and be completely empty.
|
||||
if (!curDir.exists() ||
|
||||
(curDir.isDirectory() && FileUtil.listFiles(curDir).length == 0))
|
||||
continue;
|
||||
if (isConfirmationNeeded) {
|
||||
if (!confirmPrompt("Re-format filesystem in " + curDir + " ?")) {
|
||||
System.err.println("Format aborted in "+ curDir);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
if (!confirmFormat(dirsToFormat, isConfirmationNeeded, true)) {
|
||||
return true; // aborted
|
||||
}
|
||||
|
||||
// if clusterID is not provided - see if you can find the current one
|
||||
|
@ -694,6 +677,58 @@ public class NameNode {
|
|||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check whether the given storage directories already exist.
|
||||
* If running in interactive mode, will prompt the user for each
|
||||
* directory to allow them to format anyway. Otherwise, returns
|
||||
* false, unless 'force' is specified.
|
||||
*
|
||||
* @param dirsToFormat the dirs to check
|
||||
* @param force format regardless of whether dirs exist
|
||||
* @param interactive prompt the user when a dir exists
|
||||
* @return true if formatting should proceed
|
||||
* @throws IOException
|
||||
*/
|
||||
public static boolean confirmFormat(Collection<URI> dirsToFormat,
|
||||
boolean force, boolean interactive)
|
||||
throws IOException {
|
||||
for(Iterator<URI> it = dirsToFormat.iterator(); it.hasNext();) {
|
||||
File curDir = new File(it.next().getPath());
|
||||
// Its alright for a dir not to exist, or to exist (properly accessible)
|
||||
// and be completely empty.
|
||||
if (!curDir.exists() ||
|
||||
(curDir.isDirectory() && FileUtil.listFiles(curDir).length == 0))
|
||||
continue;
|
||||
if (force) { // Don't confirm, always format.
|
||||
System.err.println(
|
||||
"Storage directory exists in " + curDir + ". Formatting anyway.");
|
||||
continue;
|
||||
}
|
||||
if (!interactive) { // Don't ask - always don't format
|
||||
System.err.println(
|
||||
"Running in non-interactive mode, and image appears to exist in " +
|
||||
curDir + ". Not formatting.");
|
||||
return false;
|
||||
}
|
||||
if (!confirmPrompt("Re-format filesystem in " + curDir + " ?")) {
|
||||
System.err.println("Format aborted in " + curDir);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
public static void checkAllowFormat(Configuration conf) throws IOException {
|
||||
if (!conf.getBoolean(DFS_NAMENODE_SUPPORT_ALLOW_FORMAT_KEY,
|
||||
DFS_NAMENODE_SUPPORT_ALLOW_FORMAT_DEFAULT)) {
|
||||
throw new IOException("The option " + DFS_NAMENODE_SUPPORT_ALLOW_FORMAT_KEY
|
||||
+ " is set to false for this filesystem, so it "
|
||||
+ "cannot be formatted. You will need to set "
|
||||
+ DFS_NAMENODE_SUPPORT_ALLOW_FORMAT_KEY +" parameter "
|
||||
+ "to true in order to format this filesystem");
|
||||
}
|
||||
}
|
||||
|
||||
private static boolean finalize(Configuration conf,
|
||||
boolean isConfirmationNeeded
|
||||
) throws IOException {
|
||||
|
@ -726,7 +761,8 @@ public class NameNode {
|
|||
StartupOption.UPGRADE.getName() + "] | [" +
|
||||
StartupOption.ROLLBACK.getName() + "] | [" +
|
||||
StartupOption.FINALIZE.getName() + "] | [" +
|
||||
StartupOption.IMPORT.getName() + "]");
|
||||
StartupOption.IMPORT.getName() + "] | [" +
|
||||
StartupOption.BOOTSTRAPSTANDBY.getName() + "]");
|
||||
}
|
||||
|
||||
private static StartupOption parseArguments(String args[]) {
|
||||
|
@ -764,8 +800,12 @@ public class NameNode {
|
|||
startOpt = StartupOption.FINALIZE;
|
||||
} else if (StartupOption.IMPORT.getName().equalsIgnoreCase(cmd)) {
|
||||
startOpt = StartupOption.IMPORT;
|
||||
} else
|
||||
} else if (StartupOption.BOOTSTRAPSTANDBY.getName().equalsIgnoreCase(cmd)) {
|
||||
startOpt = StartupOption.BOOTSTRAPSTANDBY;
|
||||
return startOpt;
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
return startOpt;
|
||||
}
|
||||
|
@ -840,6 +880,11 @@ public class NameNode {
|
|||
aborted = finalize(conf, true);
|
||||
System.exit(aborted ? 1 : 0);
|
||||
return null; // avoid javac warning
|
||||
case BOOTSTRAPSTANDBY:
|
||||
String toolArgs[] = Arrays.copyOfRange(argv, 1, argv.length);
|
||||
int rc = BootstrapStandby.run(toolArgs, conf);
|
||||
System.exit(rc);
|
||||
return null; // avoid warning
|
||||
case BACKUP:
|
||||
case CHECKPOINT:
|
||||
NamenodeRole role = startOpt.toNodeRole();
|
||||
|
|
|
@ -41,7 +41,6 @@ import org.apache.hadoop.fs.Path;
|
|||
import org.apache.hadoop.fs.UnresolvedLinkException;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.fs.permission.PermissionStatus;
|
||||
import org.apache.hadoop.ha.HAServiceProtocol;
|
||||
import org.apache.hadoop.ha.HealthCheckFailedException;
|
||||
import org.apache.hadoop.ha.ServiceFailedException;
|
||||
import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.HAServiceProtocolService;
|
||||
|
@ -113,7 +112,6 @@ import org.apache.hadoop.io.EnumSetWritable;
|
|||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.ipc.ProtobufRpcEngine;
|
||||
import org.apache.hadoop.ipc.RPC;
|
||||
import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
|
||||
import org.apache.hadoop.ipc.Server;
|
||||
import org.apache.hadoop.ipc.WritableRpcEngine;
|
||||
import org.apache.hadoop.net.Node;
|
||||
|
|
|
@ -251,8 +251,7 @@ public class SecondaryNameNode implements Runnable {
|
|||
new AccessControlList(conf.get(DFS_ADMIN, " ")));
|
||||
|
||||
if(UserGroupInformation.isSecurityEnabled()) {
|
||||
System.setProperty("https.cipherSuites",
|
||||
Krb5AndCertsSslSocketConnector.KRB5_CIPHER_SUITES.get(0));
|
||||
SecurityUtil.initKrb5CipherSuites();
|
||||
InetSocketAddress secInfoSocAddr =
|
||||
NetUtils.createSocketAddr(infoBindAddress + ":"+ conf.getInt(
|
||||
DFS_NAMENODE_SECONDARY_HTTPS_PORT_KEY,
|
||||
|
|
|
@ -52,7 +52,7 @@ public class TransferFsImage {
|
|||
|
||||
private static final Log LOG = LogFactory.getLog(TransferFsImage.class);
|
||||
|
||||
static MD5Hash downloadImageToStorage(
|
||||
public static MD5Hash downloadImageToStorage(
|
||||
String fsName, long imageTxId, NNStorage dstStorage, boolean needDigest)
|
||||
throws IOException {
|
||||
String fileid = GetImageServlet.getParamStringForImage(
|
||||
|
|
|
@ -0,0 +1,294 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.namenode.ha;
|
||||
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.URI;
|
||||
import java.security.PrivilegedAction;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.HadoopIllegalArgumentException;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configurable;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.hdfs.HAUtil;
|
||||
import org.apache.hadoop.hdfs.NameNodeProxies;
|
||||
import org.apache.hadoop.hdfs.NameNodeProxies.ProxyAndInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature;
|
||||
import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSImage;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NNStorage;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||
import org.apache.hadoop.hdfs.server.namenode.TransferFsImage;
|
||||
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
|
||||
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.io.MD5Hash;
|
||||
import org.apache.hadoop.security.SecurityUtil;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.util.Tool;
|
||||
import org.apache.hadoop.util.ToolRunner;
|
||||
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.Sets;
|
||||
|
||||
/**
|
||||
* Tool which allows the standby node's storage directories to be bootstrapped
|
||||
* by copying the latest namespace snapshot from the active namenode. This is
|
||||
* used when first configuring an HA cluster.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class BootstrapStandby implements Tool, Configurable {
|
||||
private static final Log LOG = LogFactory.getLog(BootstrapStandby.class);
|
||||
private String nsId;
|
||||
private String nnId;
|
||||
private String otherNNId;
|
||||
|
||||
private String otherHttpAddr;
|
||||
private InetSocketAddress otherIpcAddr;
|
||||
private Collection<URI> dirsToFormat;
|
||||
private List<URI> editUrisToFormat;
|
||||
private List<URI> sharedEditsUris;
|
||||
private Configuration conf;
|
||||
|
||||
private boolean force = false;
|
||||
private boolean interactive = true;
|
||||
|
||||
|
||||
public int run(String[] args) throws Exception {
|
||||
SecurityUtil.initKrb5CipherSuites();
|
||||
parseArgs(args);
|
||||
parseConfAndFindOtherNN();
|
||||
NameNode.checkAllowFormat(conf);
|
||||
|
||||
InetSocketAddress myAddr = NameNode.getAddress(conf);
|
||||
SecurityUtil.login(conf, DFS_NAMENODE_KEYTAB_FILE_KEY,
|
||||
DFS_NAMENODE_USER_NAME_KEY, myAddr.getHostName());
|
||||
|
||||
return SecurityUtil.doAsLoginUserOrFatal(new PrivilegedAction<Integer>() {
|
||||
@Override
|
||||
public Integer run() {
|
||||
try {
|
||||
return doRun();
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private void parseArgs(String[] args) {
|
||||
for (String arg : args) {
|
||||
if ("-force".equals(arg)) {
|
||||
force = true;
|
||||
} else if ("-nonInteractive".equals(arg)) {
|
||||
interactive = false;
|
||||
} else {
|
||||
printUsage();
|
||||
throw new HadoopIllegalArgumentException(
|
||||
"Illegal argument: " + arg);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void printUsage() {
|
||||
System.err.println("Usage: " + this.getClass().getSimpleName() +
|
||||
"[-force] [-nonInteractive]");
|
||||
}
|
||||
|
||||
private int doRun() throws IOException {
|
||||
ProxyAndInfo<NamenodeProtocol> proxyAndInfo = NameNodeProxies.createNonHAProxy(getConf(),
|
||||
otherIpcAddr, NamenodeProtocol.class,
|
||||
UserGroupInformation.getLoginUser(), true);
|
||||
NamenodeProtocol proxy = proxyAndInfo.getProxy();
|
||||
NamespaceInfo nsInfo;
|
||||
try {
|
||||
nsInfo = proxy.versionRequest();
|
||||
checkLayoutVersion(nsInfo);
|
||||
} catch (IOException ioe) {
|
||||
LOG.fatal("Unable to fetch namespace information from active NN at " +
|
||||
otherIpcAddr + ": " + ioe.getMessage());
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Full exception trace", ioe);
|
||||
}
|
||||
return 1;
|
||||
}
|
||||
|
||||
System.out.println(
|
||||
"=====================================================\n" +
|
||||
"About to bootstrap Standby ID " + nnId + " from:\n" +
|
||||
" Nameservice ID: " + nsId + "\n" +
|
||||
" Other Namenode ID: " + otherNNId + "\n" +
|
||||
" Other NN's HTTP address: " + otherHttpAddr + "\n" +
|
||||
" Other NN's IPC address: " + otherIpcAddr + "\n" +
|
||||
" Namespace ID: " + nsInfo.getNamespaceID() + "\n" +
|
||||
" Block pool ID: " + nsInfo.getBlockPoolID() + "\n" +
|
||||
" Cluster ID: " + nsInfo.getClusterID() + "\n" +
|
||||
" Layout version: " + nsInfo.getLayoutVersion() + "\n" +
|
||||
"=====================================================");
|
||||
|
||||
// Check with the user before blowing away data.
|
||||
if (!NameNode.confirmFormat(
|
||||
Sets.union(Sets.newHashSet(dirsToFormat),
|
||||
Sets.newHashSet(editUrisToFormat)),
|
||||
force, interactive)) {
|
||||
return 1;
|
||||
}
|
||||
|
||||
// Force the active to roll its log
|
||||
CheckpointSignature csig = proxy.rollEditLog();
|
||||
long imageTxId = csig.getMostRecentCheckpointTxId();
|
||||
long rollTxId = csig.getCurSegmentTxId();
|
||||
|
||||
|
||||
// Format the storage (writes VERSION file)
|
||||
NNStorage storage = new NNStorage(conf, dirsToFormat, editUrisToFormat);
|
||||
storage.format(nsInfo);
|
||||
|
||||
// Load the newly formatted image, using all of the directories (including shared
|
||||
// edits)
|
||||
FSImage image = new FSImage(conf);
|
||||
assert image.getEditLog().isOpenForRead() :
|
||||
"Expected edit log to be open for read";
|
||||
|
||||
// Ensure that we have enough edits already in the shared directory to
|
||||
// start up from the last checkpoint on the active.
|
||||
if (!checkLogsAvailableForRead(image, imageTxId, rollTxId)) {
|
||||
return 1;
|
||||
}
|
||||
|
||||
image.getStorage().writeTransactionIdFileToStorage(rollTxId);
|
||||
|
||||
// Download that checkpoint into our storage directories.
|
||||
MD5Hash hash = TransferFsImage.downloadImageToStorage(
|
||||
otherHttpAddr.toString(), imageTxId,
|
||||
storage, true);
|
||||
image.saveDigestAndRenameCheckpointImage(imageTxId, hash);
|
||||
return 0;
|
||||
}
|
||||
|
||||
private boolean checkLogsAvailableForRead(FSImage image, long imageTxId,
|
||||
long rollTxId) {
|
||||
|
||||
long firstTxIdInLogs = imageTxId + 1;
|
||||
long lastTxIdInLogs = rollTxId - 1;
|
||||
assert lastTxIdInLogs >= firstTxIdInLogs;
|
||||
|
||||
try {
|
||||
Collection<EditLogInputStream> streams =
|
||||
image.getEditLog().selectInputStreams(
|
||||
firstTxIdInLogs, lastTxIdInLogs, false);
|
||||
for (EditLogInputStream stream : streams) {
|
||||
IOUtils.closeStream(stream);
|
||||
}
|
||||
return true;
|
||||
} catch (IOException e) {
|
||||
String msg = "Unable to read transaction ids " +
|
||||
firstTxIdInLogs + "-" + lastTxIdInLogs +
|
||||
" from the configured shared edits storage " +
|
||||
Joiner.on(",").join(sharedEditsUris) + ". " +
|
||||
"Please copy these logs into the shared edits storage " +
|
||||
"or call saveNamespace on the active node.\n" +
|
||||
"Error: " + e.getLocalizedMessage();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.fatal(msg, e);
|
||||
} else {
|
||||
LOG.fatal(msg);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
private void checkLayoutVersion(NamespaceInfo nsInfo) throws IOException {
|
||||
if (nsInfo.getLayoutVersion() != HdfsConstants.LAYOUT_VERSION) {
|
||||
throw new IOException("Layout version on remote node (" +
|
||||
nsInfo.getLayoutVersion() + ") does not match " +
|
||||
"this node's layout version (" + HdfsConstants.LAYOUT_VERSION + ")");
|
||||
}
|
||||
}
|
||||
|
||||
private void parseConfAndFindOtherNN() throws IOException {
|
||||
Configuration conf = getConf();
|
||||
nsId = DFSUtil.getNamenodeNameServiceId(conf);
|
||||
|
||||
if (!HAUtil.isHAEnabled(conf, nsId)) {
|
||||
throw new HadoopIllegalArgumentException(
|
||||
"HA is not enabled for this namenode.");
|
||||
}
|
||||
nnId = HAUtil.getNameNodeId(conf, nsId);
|
||||
NameNode.initializeGenericKeys(conf, nsId, nnId);
|
||||
|
||||
if (!HAUtil.usesSharedEditsDir(conf)) {
|
||||
throw new HadoopIllegalArgumentException(
|
||||
"Shared edits storage is not enabled for this namenode.");
|
||||
}
|
||||
|
||||
Configuration otherNode = HAUtil.getConfForOtherNode(conf);
|
||||
otherNNId = HAUtil.getNameNodeId(otherNode, nsId);
|
||||
otherIpcAddr = NameNode.getServiceAddress(otherNode, true);
|
||||
Preconditions.checkArgument(otherIpcAddr.getPort() != 0 &&
|
||||
!otherIpcAddr.getAddress().isAnyLocalAddress(),
|
||||
"Could not determine valid IPC address for other NameNode (%s)" +
|
||||
", got: %s", otherNNId, otherIpcAddr);
|
||||
|
||||
otherHttpAddr = DFSUtil.getInfoServer(null, otherNode, true);
|
||||
otherHttpAddr = DFSUtil.substituteForWildcardAddress(otherHttpAddr,
|
||||
otherIpcAddr.getHostName());
|
||||
|
||||
|
||||
dirsToFormat = FSNamesystem.getNamespaceDirs(conf);
|
||||
editUrisToFormat = FSNamesystem.getNamespaceEditsDirs(
|
||||
conf, false);
|
||||
sharedEditsUris = FSNamesystem.getSharedEditsDirs(conf);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setConf(Configuration conf) {
|
||||
this.conf = conf;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Configuration getConf() {
|
||||
return conf;
|
||||
}
|
||||
|
||||
public static int run(String[] argv, Configuration conf) throws IOException {
|
||||
BootstrapStandby bs = new BootstrapStandby();
|
||||
bs.setConf(conf);
|
||||
try {
|
||||
return ToolRunner.run(bs, argv);
|
||||
} catch (Exception e) {
|
||||
if (e instanceof IOException) {
|
||||
throw (IOException)e;
|
||||
} else {
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -245,8 +245,7 @@ public class DFSck extends Configured implements Tool {
|
|||
private int doWork(final String[] args) throws IOException {
|
||||
String proto = "http://";
|
||||
if (UserGroupInformation.isSecurityEnabled()) {
|
||||
System.setProperty("https.cipherSuites",
|
||||
Krb5AndCertsSslSocketConnector.KRB5_CIPHER_SUITES.get(0));
|
||||
SecurityUtil.initKrb5CipherSuites();
|
||||
proto = "https://";
|
||||
}
|
||||
final StringBuilder url = new StringBuilder(proto);
|
||||
|
|
|
@ -51,6 +51,7 @@ import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
|
|||
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NNStorage;
|
||||
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
|
||||
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.log4j.Level;
|
||||
|
@ -1001,7 +1002,7 @@ public class TestEditLog extends TestCase {
|
|||
NNStorage storage = new NNStorage(new Configuration(),
|
||||
Collections.<URI>emptyList(),
|
||||
editUris);
|
||||
storage.format("test-cluster-id");
|
||||
storage.format(new NamespaceInfo());
|
||||
FSEditLog editlog = new FSEditLog(storage);
|
||||
// open the edit log and add two transactions
|
||||
// logGenerationStamp is used, simply because it doesn't
|
||||
|
|
|
@ -0,0 +1,213 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.namenode.ha;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileUtil;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
|
||||
import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NNStorage;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
|
||||
import org.codehaus.jackson.sym.NameN;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
public class TestBootstrapStandby {
|
||||
private static final Log LOG = LogFactory.getLog(TestBootstrapStandby.class);
|
||||
|
||||
private MiniDFSCluster cluster;
|
||||
private NameNode nn0;
|
||||
|
||||
@Before
|
||||
public void setupCluster() throws IOException {
|
||||
Configuration conf = new Configuration();
|
||||
|
||||
MiniDFSNNTopology topology = new MiniDFSNNTopology()
|
||||
.addNameservice(new MiniDFSNNTopology.NSConf("ns1")
|
||||
.addNN(new MiniDFSNNTopology.NNConf("nn1").setHttpPort(10001))
|
||||
.addNN(new MiniDFSNNTopology.NNConf("nn2").setHttpPort(10002)));
|
||||
|
||||
cluster = new MiniDFSCluster.Builder(conf)
|
||||
.nnTopology(topology)
|
||||
.numDataNodes(0)
|
||||
.build();
|
||||
cluster.waitActive();
|
||||
|
||||
nn0 = cluster.getNameNode(0);
|
||||
cluster.transitionToActive(0);
|
||||
cluster.shutdownNameNode(1);
|
||||
}
|
||||
|
||||
@After
|
||||
public void shutdownCluster() throws IOException {
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test for the base success case. The primary NN
|
||||
* hasn't made any checkpoints, and we copy the fsimage_0
|
||||
* file over and start up.
|
||||
*/
|
||||
@Test
|
||||
public void testSuccessfulBaseCase() throws Exception {
|
||||
removeStandbyNameDirs();
|
||||
|
||||
try {
|
||||
cluster.restartNameNode(1);
|
||||
fail("Did not throw");
|
||||
} catch (IOException ioe) {
|
||||
GenericTestUtils.assertExceptionContains(
|
||||
"Cannot start an HA namenode with name dirs that need recovery",
|
||||
ioe);
|
||||
}
|
||||
|
||||
int rc = BootstrapStandby.run(
|
||||
new String[]{"-nonInteractive"},
|
||||
cluster.getConfiguration(1));
|
||||
assertEquals(0, rc);
|
||||
|
||||
// Should have copied over the namespace from the active
|
||||
FSImageTestUtil.assertNNHasCheckpoints(cluster, 1,
|
||||
ImmutableList.of(0));
|
||||
assertNNFilesMatch();
|
||||
|
||||
// We should now be able to start the standby successfully.
|
||||
cluster.restartNameNode(1);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test for downloading a checkpoint made at a later checkpoint
|
||||
* from the active.
|
||||
*/
|
||||
@Test
|
||||
public void testDownloadingLaterCheckpoint() throws Exception {
|
||||
// Roll edit logs a few times to inflate txid
|
||||
nn0.getRpcServer().rollEditLog();
|
||||
nn0.getRpcServer().rollEditLog();
|
||||
// Make checkpoint
|
||||
NameNodeAdapter.enterSafeMode(nn0, false);
|
||||
NameNodeAdapter.saveNamespace(nn0);
|
||||
NameNodeAdapter.leaveSafeMode(nn0, false);
|
||||
long expectedCheckpointTxId = NameNodeAdapter.getNamesystem(nn0)
|
||||
.getFSImage().getMostRecentCheckpointTxId();
|
||||
assertEquals(6, expectedCheckpointTxId);
|
||||
|
||||
int rc = BootstrapStandby.run(
|
||||
new String[]{"-force"},
|
||||
cluster.getConfiguration(1));
|
||||
assertEquals(0, rc);
|
||||
|
||||
// Should have copied over the namespace from the active
|
||||
FSImageTestUtil.assertNNHasCheckpoints(cluster, 1,
|
||||
ImmutableList.of((int)expectedCheckpointTxId));
|
||||
assertNNFilesMatch();
|
||||
|
||||
// We should now be able to start the standby successfully.
|
||||
cluster.restartNameNode(1);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test for the case where the shared edits dir doesn't have
|
||||
* all of the recent edit logs.
|
||||
*/
|
||||
@Test
|
||||
public void testSharedEditsMissingLogs() throws Exception {
|
||||
removeStandbyNameDirs();
|
||||
|
||||
CheckpointSignature sig = nn0.getRpcServer().rollEditLog();
|
||||
assertEquals(3, sig.getCurSegmentTxId());
|
||||
|
||||
// Should have created edits_1-2 in shared edits dir
|
||||
URI editsUri = cluster.getSharedEditsDir(0, 1);
|
||||
File editsDir = new File(editsUri);
|
||||
File editsSegment = new File(new File(editsDir, "current"),
|
||||
NNStorage.getFinalizedEditsFileName(1, 2));
|
||||
GenericTestUtils.assertExists(editsSegment);
|
||||
|
||||
// Delete the segment.
|
||||
assertTrue(editsSegment.delete());
|
||||
|
||||
// Trying to bootstrap standby should now fail since the edit
|
||||
// logs aren't available in the shared dir.
|
||||
LogCapturer logs = GenericTestUtils.LogCapturer.captureLogs(
|
||||
LogFactory.getLog(BootstrapStandby.class));
|
||||
try {
|
||||
int rc = BootstrapStandby.run(
|
||||
new String[]{"-force"},
|
||||
cluster.getConfiguration(1));
|
||||
assertEquals(1, rc);
|
||||
} finally {
|
||||
logs.stopCapturing();
|
||||
}
|
||||
GenericTestUtils.assertMatches(logs.getOutput(),
|
||||
"FATAL.*Unable to read transaction ids 1-4 from the configured shared");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStandbyDirsAlreadyExist() throws Exception {
|
||||
// Should not pass since standby dirs exist, force not given
|
||||
int rc = BootstrapStandby.run(
|
||||
new String[]{"-nonInteractive"},
|
||||
cluster.getConfiguration(1));
|
||||
assertEquals(1, rc);
|
||||
|
||||
// Should pass with -force
|
||||
rc = BootstrapStandby.run(
|
||||
new String[]{"-force"},
|
||||
cluster.getConfiguration(1));
|
||||
assertEquals(0, rc);
|
||||
}
|
||||
|
||||
private void assertNNFilesMatch() throws Exception {
|
||||
List<File> curDirs = Lists.newArrayList();
|
||||
curDirs.addAll(FSImageTestUtil.getNameNodeCurrentDirs(cluster, 0));
|
||||
curDirs.addAll(FSImageTestUtil.getNameNodeCurrentDirs(cluster, 1));
|
||||
FSImageTestUtil.assertParallelFilesAreIdentical(curDirs,
|
||||
Collections.<String>emptySet());
|
||||
}
|
||||
|
||||
private void removeStandbyNameDirs() {
|
||||
for (URI u : cluster.getNameDirs(1)) {
|
||||
assertTrue(u.getScheme().equals("file"));
|
||||
File dir = new File(u.getPath());
|
||||
LOG.info("Removing standby dir " + dir);
|
||||
assertTrue(FileUtil.fullyDelete(dir));
|
||||
}
|
||||
}
|
||||
}
|
|
@ -19,15 +19,21 @@ package org.apache.hadoop.test;
|
|||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.StringWriter;
|
||||
import java.util.Arrays;
|
||||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.impl.Log4JLogger;
|
||||
import org.apache.hadoop.fs.FileUtil;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.log4j.Layout;
|
||||
import org.apache.log4j.Logger;
|
||||
import org.apache.log4j.WriterAppender;
|
||||
import org.junit.Assert;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
|
@ -100,6 +106,35 @@ public abstract class GenericTestUtils {
|
|||
throw new TimeoutException("Timed out waiting for condition");
|
||||
}
|
||||
|
||||
public static class LogCapturer {
|
||||
private StringWriter sw = new StringWriter();
|
||||
private WriterAppender appender;
|
||||
private Logger logger;
|
||||
|
||||
public static LogCapturer captureLogs(Log l) {
|
||||
Logger logger = ((Log4JLogger)l).getLogger();
|
||||
LogCapturer c = new LogCapturer(logger);
|
||||
return c;
|
||||
}
|
||||
|
||||
|
||||
private LogCapturer(Logger logger) {
|
||||
this.logger = logger;
|
||||
Layout layout = Logger.getRootLogger().getAppender("stdout").getLayout();
|
||||
WriterAppender wa = new WriterAppender(layout, sw);
|
||||
logger.addAppender(wa);
|
||||
}
|
||||
|
||||
public String getOutput() {
|
||||
return sw.toString();
|
||||
}
|
||||
|
||||
public void stopCapturing() {
|
||||
logger.removeAppender(appender);
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Mockito answer helper that triggers one latch as soon as the
|
||||
|
@ -245,4 +280,10 @@ public abstract class GenericTestUtils {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static void assertMatches(String output, String pattern) {
|
||||
Assert.assertTrue("Expected output to match /" + pattern + "/" +
|
||||
" but got:\n" + output,
|
||||
Pattern.compile(pattern).matcher(output).find());
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue