Merged revision(s) 1604692 from hadoop/common/trunk:

HDFS-6507. Improve DFSAdmin to support HA cluster better. (Contributd by Zesheng Wu)
........

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1604695 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinayakumar B 2014-06-23 05:19:28 +00:00
parent dbfde21372
commit 4e3551eea6
6 changed files with 539 additions and 75 deletions

View File

@ -207,6 +207,9 @@ Release 2.5.0 - UNRELEASED
HDFS-4667. Capture renamed files/directories in snapshot diff report. (jing9 HDFS-4667. Capture renamed files/directories in snapshot diff report. (jing9
and Binglin Chang via jing9) and Binglin Chang via jing9)
HDFS-6507. Improve DFSAdmin to support HA cluster better.
(Zesheng Wu via vinayakumarb)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-6214. Webhdfs has poor throughput for files >2GB (daryn) HDFS-6214. Webhdfs has poor throughput for files >2GB (daryn)

View File

@ -38,6 +38,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.NameNodeProxies; import org.apache.hadoop.hdfs.NameNodeProxies;
import org.apache.hadoop.hdfs.NameNodeProxies.ProxyAndInfo;
import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSelector; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSelector;
@ -351,18 +352,42 @@ public class HAUtil {
*/ */
public static List<ClientProtocol> getProxiesForAllNameNodesInNameservice( public static List<ClientProtocol> getProxiesForAllNameNodesInNameservice(
Configuration conf, String nsId) throws IOException { Configuration conf, String nsId) throws IOException {
List<ProxyAndInfo<ClientProtocol>> proxies =
getProxiesForAllNameNodesInNameservice(conf, nsId, ClientProtocol.class);
List<ClientProtocol> namenodes = new ArrayList<ClientProtocol>(
proxies.size());
for (ProxyAndInfo<ClientProtocol> proxy : proxies) {
namenodes.add(proxy.getProxy());
}
return namenodes;
}
/**
* Get an RPC proxy for each NN in an HA nameservice. Used when a given RPC
* call should be made on every NN in an HA nameservice, not just the active.
*
* @param conf configuration
* @param nsId the nameservice to get all of the proxies for.
* @param xface the protocol class.
* @return a list of RPC proxies for each NN in the nameservice.
* @throws IOException in the event of error.
*/
public static <T> List<ProxyAndInfo<T>> getProxiesForAllNameNodesInNameservice(
Configuration conf, String nsId, Class<T> xface) throws IOException {
Map<String, InetSocketAddress> nnAddresses = Map<String, InetSocketAddress> nnAddresses =
DFSUtil.getRpcAddressesForNameserviceId(conf, nsId, null); DFSUtil.getRpcAddressesForNameserviceId(conf, nsId, null);
List<ClientProtocol> namenodes = new ArrayList<ClientProtocol>(); List<ProxyAndInfo<T>> proxies = new ArrayList<ProxyAndInfo<T>>(
nnAddresses.size());
for (InetSocketAddress nnAddress : nnAddresses.values()) { for (InetSocketAddress nnAddress : nnAddresses.values()) {
NameNodeProxies.ProxyAndInfo<ClientProtocol> proxyInfo = null; NameNodeProxies.ProxyAndInfo<T> proxyInfo = null;
proxyInfo = NameNodeProxies.createNonHAProxy(conf, proxyInfo = NameNodeProxies.createNonHAProxy(conf,
nnAddress, ClientProtocol.class, nnAddress, xface,
UserGroupInformation.getCurrentUser(), false); UserGroupInformation.getCurrentUser(), false);
namenodes.add(proxyInfo.getProxy()); proxies.add(proxyInfo);
} }
return namenodes; return proxies;
} }
/** /**

View File

@ -106,10 +106,13 @@ public class NameNodeProxies {
public static class ProxyAndInfo<PROXYTYPE> { public static class ProxyAndInfo<PROXYTYPE> {
private final PROXYTYPE proxy; private final PROXYTYPE proxy;
private final Text dtService; private final Text dtService;
private final InetSocketAddress address;
public ProxyAndInfo(PROXYTYPE proxy, Text dtService) { public ProxyAndInfo(PROXYTYPE proxy, Text dtService,
InetSocketAddress address) {
this.proxy = proxy; this.proxy = proxy;
this.dtService = dtService; this.dtService = dtService;
this.address = address;
} }
public PROXYTYPE getProxy() { public PROXYTYPE getProxy() {
@ -119,6 +122,10 @@ public class NameNodeProxies {
public Text getDelegationTokenService() { public Text getDelegationTokenService() {
return dtService; return dtService;
} }
public InetSocketAddress getAddress() {
return address;
}
} }
/** /**
@ -161,7 +168,8 @@ public class NameNodeProxies {
dtService = SecurityUtil.buildTokenService( dtService = SecurityUtil.buildTokenService(
NameNode.getAddress(nameNodeUri)); NameNode.getAddress(nameNodeUri));
} }
return new ProxyAndInfo<T>(proxy, dtService); return new ProxyAndInfo<T>(proxy, dtService,
NameNode.getAddress(nameNodeUri));
} }
} }
@ -221,7 +229,8 @@ public class NameNodeProxies {
dtService = SecurityUtil.buildTokenService( dtService = SecurityUtil.buildTokenService(
NameNode.getAddress(nameNodeUri)); NameNode.getAddress(nameNodeUri));
} }
return new ProxyAndInfo<T>(proxy, dtService); return new ProxyAndInfo<T>(proxy, dtService,
NameNode.getAddress(nameNodeUri));
} else { } else {
LOG.warn("Currently creating proxy using " + LOG.warn("Currently creating proxy using " +
"LossyRetryInvocationHandler requires NN HA setup"); "LossyRetryInvocationHandler requires NN HA setup");
@ -274,7 +283,7 @@ public class NameNodeProxies {
throw new IllegalStateException(message); throw new IllegalStateException(message);
} }
return new ProxyAndInfo<T>(proxy, dtService); return new ProxyAndInfo<T>(proxy, dtService, nnAddr);
} }
private static JournalProtocol createNNProxyWithJournalProtocol( private static JournalProtocol createNNProxyWithJournalProtocol(

View File

@ -49,6 +49,7 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HAUtil; import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.NameNodeProxies; import org.apache.hadoop.hdfs.NameNodeProxies;
import org.apache.hadoop.hdfs.NameNodeProxies.ProxyAndInfo;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@ -498,25 +499,60 @@ public class DFSAdmin extends FsShell {
printUsage("-safemode"); printUsage("-safemode");
return; return;
} }
DistributedFileSystem dfs = getDFS();
boolean inSafeMode = dfs.setSafeMode(action);
// DistributedFileSystem dfs = getDFS();
// If we are waiting for safemode to exit, then poll and Configuration dfsConf = dfs.getConf();
// sleep till we are out of safemode. URI dfsUri = dfs.getUri();
// boolean isHaEnabled = HAUtil.isLogicalUri(dfsConf, dfsUri);
if (waitExitSafe) {
while (inSafeMode) { if (isHaEnabled) {
try { String nsId = dfsUri.getHost();
Thread.sleep(5000); List<ProxyAndInfo<ClientProtocol>> proxies =
} catch (java.lang.InterruptedException e) { HAUtil.getProxiesForAllNameNodesInNameservice(
throw new IOException("Wait Interrupted"); dfsConf, nsId, ClientProtocol.class);
for (ProxyAndInfo<ClientProtocol> proxy : proxies) {
ClientProtocol haNn = proxy.getProxy();
boolean inSafeMode = haNn.setSafeMode(action, false);
if (waitExitSafe) {
inSafeMode = waitExitSafeMode(haNn, inSafeMode);
} }
inSafeMode = dfs.setSafeMode(SafeModeAction.SAFEMODE_GET); System.out.println("Safe mode is " + (inSafeMode ? "ON" : "OFF")
+ " in " + proxy.getAddress());
} }
} else {
boolean inSafeMode = dfs.setSafeMode(action);
if (waitExitSafe) {
inSafeMode = waitExitSafeMode(dfs, inSafeMode);
}
System.out.println("Safe mode is " + (inSafeMode ? "ON" : "OFF"));
} }
System.out.println("Safe mode is " + (inSafeMode ? "ON" : "OFF")); }
private boolean waitExitSafeMode(DistributedFileSystem dfs, boolean inSafeMode)
throws IOException {
while (inSafeMode) {
try {
Thread.sleep(5000);
} catch (java.lang.InterruptedException e) {
throw new IOException("Wait Interrupted");
}
inSafeMode = dfs.setSafeMode(SafeModeAction.SAFEMODE_GET, false);
}
return inSafeMode;
}
private boolean waitExitSafeMode(ClientProtocol nn, boolean inSafeMode)
throws IOException {
while (inSafeMode) {
try {
Thread.sleep(5000);
} catch (java.lang.InterruptedException e) {
throw new IOException("Wait Interrupted");
}
inSafeMode = nn.setSafeMode(SafeModeAction.SAFEMODE_GET, false);
}
return inSafeMode;
} }
/** /**
@ -561,7 +597,24 @@ public class DFSAdmin extends FsShell {
int exitCode = -1; int exitCode = -1;
DistributedFileSystem dfs = getDFS(); DistributedFileSystem dfs = getDFS();
dfs.saveNamespace(); Configuration dfsConf = dfs.getConf();
URI dfsUri = dfs.getUri();
boolean isHaEnabled = HAUtil.isLogicalUri(dfsConf, dfsUri);
if (isHaEnabled) {
String nsId = dfsUri.getHost();
List<ProxyAndInfo<ClientProtocol>> proxies =
HAUtil.getProxiesForAllNameNodesInNameservice(dfsConf,
nsId, ClientProtocol.class);
for (ProxyAndInfo<ClientProtocol> proxy : proxies) {
proxy.getProxy().saveNamespace();
System.out.println("Save namespace successful for " +
proxy.getAddress());
}
} else {
dfs.saveNamespace();
System.out.println("Save namespace successful");
}
exitCode = 0; exitCode = 0;
return exitCode; return exitCode;
@ -583,15 +636,30 @@ public class DFSAdmin extends FsShell {
*/ */
public int restoreFailedStorage(String arg) throws IOException { public int restoreFailedStorage(String arg) throws IOException {
int exitCode = -1; int exitCode = -1;
if(!arg.equals("check") && !arg.equals("true") && !arg.equals("false")) { if(!arg.equals("check") && !arg.equals("true") && !arg.equals("false")) {
System.err.println("restoreFailedStorage valid args are true|false|check"); System.err.println("restoreFailedStorage valid args are true|false|check");
return exitCode; return exitCode;
} }
DistributedFileSystem dfs = getDFS(); DistributedFileSystem dfs = getDFS();
Boolean res = dfs.restoreFailedStorage(arg); Configuration dfsConf = dfs.getConf();
System.out.println("restoreFailedStorage is set to " + res); URI dfsUri = dfs.getUri();
boolean isHaEnabled = HAUtil.isLogicalUri(dfsConf, dfsUri);
if (isHaEnabled) {
String nsId = dfsUri.getHost();
List<ProxyAndInfo<ClientProtocol>> proxies =
HAUtil.getProxiesForAllNameNodesInNameservice(dfsConf,
nsId, ClientProtocol.class);
for (ProxyAndInfo<ClientProtocol> proxy : proxies) {
Boolean res = proxy.getProxy().restoreFailedStorage(arg);
System.out.println("restoreFailedStorage is set to " + res + " for "
+ proxy.getAddress());
}
} else {
Boolean res = dfs.restoreFailedStorage(arg);
System.out.println("restoreFailedStorage is set to " + res);
}
exitCode = 0; exitCode = 0;
return exitCode; return exitCode;
@ -607,7 +675,24 @@ public class DFSAdmin extends FsShell {
int exitCode = -1; int exitCode = -1;
DistributedFileSystem dfs = getDFS(); DistributedFileSystem dfs = getDFS();
dfs.refreshNodes(); Configuration dfsConf = dfs.getConf();
URI dfsUri = dfs.getUri();
boolean isHaEnabled = HAUtil.isLogicalUri(dfsConf, dfsUri);
if (isHaEnabled) {
String nsId = dfsUri.getHost();
List<ProxyAndInfo<ClientProtocol>> proxies =
HAUtil.getProxiesForAllNameNodesInNameservice(dfsConf,
nsId, ClientProtocol.class);
for (ProxyAndInfo<ClientProtocol> proxy: proxies) {
proxy.getProxy().refreshNodes();
System.out.println("Refresh nodes successful for " +
proxy.getAddress());
}
} else {
dfs.refreshNodes();
System.out.println("Refresh nodes successful");
}
exitCode = 0; exitCode = 0;
return exitCode; return exitCode;
@ -641,7 +726,24 @@ public class DFSAdmin extends FsShell {
} }
DistributedFileSystem dfs = (DistributedFileSystem) fs; DistributedFileSystem dfs = (DistributedFileSystem) fs;
dfs.setBalancerBandwidth(bandwidth); Configuration dfsConf = dfs.getConf();
URI dfsUri = dfs.getUri();
boolean isHaEnabled = HAUtil.isLogicalUri(dfsConf, dfsUri);
if (isHaEnabled) {
String nsId = dfsUri.getHost();
List<ProxyAndInfo<ClientProtocol>> proxies =
HAUtil.getProxiesForAllNameNodesInNameservice(dfsConf,
nsId, ClientProtocol.class);
for (ProxyAndInfo<ClientProtocol> proxy : proxies) {
proxy.getProxy().setBalancerBandwidth(bandwidth);
System.out.println("Balancer bandwidth is set to " + bandwidth +
" for " + proxy.getAddress());
}
} else {
dfs.setBalancerBandwidth(bandwidth);
System.out.println("Balancer bandwidth is set to " + bandwidth);
}
exitCode = 0; exitCode = 0;
return exitCode; return exitCode;
@ -937,11 +1039,18 @@ public class DFSAdmin extends FsShell {
if (!HAUtil.isAtLeastOneActive(namenodes)) { if (!HAUtil.isAtLeastOneActive(namenodes)) {
throw new IOException("Cannot finalize with no NameNode active"); throw new IOException("Cannot finalize with no NameNode active");
} }
for (ClientProtocol haNn : namenodes) {
haNn.finalizeUpgrade(); List<ProxyAndInfo<ClientProtocol>> proxies =
HAUtil.getProxiesForAllNameNodesInNameservice(dfsConf,
nsId, ClientProtocol.class);
for (ProxyAndInfo<ClientProtocol> proxy : proxies) {
proxy.getProxy().finalizeUpgrade();
System.out.println("Finalize upgrade successful for " +
proxy.getAddress());
} }
} else { } else {
dfs.finalizeUpgrade(); dfs.finalizeUpgrade();
System.out.println("Finalize upgrade successful");
} }
return 0; return 0;
@ -958,9 +1067,25 @@ public class DFSAdmin extends FsShell {
public int metaSave(String[] argv, int idx) throws IOException { public int metaSave(String[] argv, int idx) throws IOException {
String pathname = argv[idx]; String pathname = argv[idx];
DistributedFileSystem dfs = getDFS(); DistributedFileSystem dfs = getDFS();
dfs.metaSave(pathname); Configuration dfsConf = dfs.getConf();
System.out.println("Created metasave file " + pathname + " in the log " + URI dfsUri = dfs.getUri();
"directory of namenode " + dfs.getUri()); boolean isHaEnabled = HAUtil.isLogicalUri(dfsConf, dfsUri);
if (isHaEnabled) {
String nsId = dfsUri.getHost();
List<ProxyAndInfo<ClientProtocol>> proxies =
HAUtil.getProxiesForAllNameNodesInNameservice(dfsConf,
nsId, ClientProtocol.class);
for (ProxyAndInfo<ClientProtocol> proxy : proxies) {
proxy.getProxy().metaSave(pathname);
System.out.println("Created metasave file " + pathname + " in the log "
+ "directory of namenode " + proxy.getAddress());
}
} else {
dfs.metaSave(pathname);
System.out.println("Created metasave file " + pathname + " in the log " +
"directory of namenode " + dfs.getUri());
}
return 0; return 0;
} }
@ -1029,13 +1154,30 @@ public class DFSAdmin extends FsShell {
conf.set(CommonConfigurationKeys.HADOOP_SECURITY_SERVICE_USER_NAME_KEY, conf.set(CommonConfigurationKeys.HADOOP_SECURITY_SERVICE_USER_NAME_KEY,
conf.get(DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY, "")); conf.get(DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY, ""));
// Create the client DistributedFileSystem dfs = getDFS();
RefreshAuthorizationPolicyProtocol refreshProtocol = URI dfsUri = dfs.getUri();
NameNodeProxies.createProxy(conf, FileSystem.getDefaultUri(conf), boolean isHaEnabled = HAUtil.isLogicalUri(conf, dfsUri);
RefreshAuthorizationPolicyProtocol.class).getProxy();
// Refresh the authorization policy in-effect if (isHaEnabled) {
refreshProtocol.refreshServiceAcl(); // Run refreshServiceAcl for all NNs if HA is enabled
String nsId = dfsUri.getHost();
List<ProxyAndInfo<RefreshAuthorizationPolicyProtocol>> proxies =
HAUtil.getProxiesForAllNameNodesInNameservice(conf, nsId,
RefreshAuthorizationPolicyProtocol.class);
for (ProxyAndInfo<RefreshAuthorizationPolicyProtocol> proxy : proxies) {
proxy.getProxy().refreshServiceAcl();
System.out.println("Refresh service acl successful for "
+ proxy.getAddress());
}
} else {
// Create the client
RefreshAuthorizationPolicyProtocol refreshProtocol =
NameNodeProxies.createProxy(conf, FileSystem.getDefaultUri(conf),
RefreshAuthorizationPolicyProtocol.class).getProxy();
// Refresh the authorization policy in-effect
refreshProtocol.refreshServiceAcl();
System.out.println("Refresh service acl successful");
}
return 0; return 0;
} }
@ -1055,13 +1197,31 @@ public class DFSAdmin extends FsShell {
conf.set(CommonConfigurationKeys.HADOOP_SECURITY_SERVICE_USER_NAME_KEY, conf.set(CommonConfigurationKeys.HADOOP_SECURITY_SERVICE_USER_NAME_KEY,
conf.get(DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY, "")); conf.get(DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY, ""));
// Create the client DistributedFileSystem dfs = getDFS();
RefreshUserMappingsProtocol refreshProtocol = URI dfsUri = dfs.getUri();
NameNodeProxies.createProxy(conf, FileSystem.getDefaultUri(conf), boolean isHaEnabled = HAUtil.isLogicalUri(conf, dfsUri);
RefreshUserMappingsProtocol.class).getProxy();
// Refresh the user-to-groups mappings if (isHaEnabled) {
refreshProtocol.refreshUserToGroupsMappings(); // Run refreshUserToGroupsMapings for all NNs if HA is enabled
String nsId = dfsUri.getHost();
List<ProxyAndInfo<RefreshUserMappingsProtocol>> proxies =
HAUtil.getProxiesForAllNameNodesInNameservice(conf, nsId,
RefreshUserMappingsProtocol.class);
for (ProxyAndInfo<RefreshUserMappingsProtocol> proxy : proxies) {
proxy.getProxy().refreshUserToGroupsMappings();
System.out.println("Refresh user to groups mapping successful for "
+ proxy.getAddress());
}
} else {
// Create the client
RefreshUserMappingsProtocol refreshProtocol =
NameNodeProxies.createProxy(conf, FileSystem.getDefaultUri(conf),
RefreshUserMappingsProtocol.class).getProxy();
// Refresh the user-to-groups mappings
refreshProtocol.refreshUserToGroupsMappings();
System.out.println("Refresh user to groups mapping successful");
}
return 0; return 0;
} }
@ -1082,13 +1242,31 @@ public class DFSAdmin extends FsShell {
conf.set(CommonConfigurationKeys.HADOOP_SECURITY_SERVICE_USER_NAME_KEY, conf.set(CommonConfigurationKeys.HADOOP_SECURITY_SERVICE_USER_NAME_KEY,
conf.get(DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY, "")); conf.get(DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY, ""));
// Create the client DistributedFileSystem dfs = getDFS();
RefreshUserMappingsProtocol refreshProtocol = URI dfsUri = dfs.getUri();
NameNodeProxies.createProxy(conf, FileSystem.getDefaultUri(conf), boolean isHaEnabled = HAUtil.isLogicalUri(conf, dfsUri);
RefreshUserMappingsProtocol.class).getProxy();
// Refresh the user-to-groups mappings if (isHaEnabled) {
refreshProtocol.refreshSuperUserGroupsConfiguration(); // Run refreshSuperUserGroupsConfiguration for all NNs if HA is enabled
String nsId = dfsUri.getHost();
List<ProxyAndInfo<RefreshUserMappingsProtocol>> proxies =
HAUtil.getProxiesForAllNameNodesInNameservice(conf, nsId,
RefreshUserMappingsProtocol.class);
for (ProxyAndInfo<RefreshUserMappingsProtocol> proxy : proxies) {
proxy.getProxy().refreshSuperUserGroupsConfiguration();
System.out.println("Refresh super user groups configuration " +
"successful for " + proxy.getAddress());
}
} else {
// Create the client
RefreshUserMappingsProtocol refreshProtocol =
NameNodeProxies.createProxy(conf, FileSystem.getDefaultUri(conf),
RefreshUserMappingsProtocol.class).getProxy();
// Refresh the user-to-groups mappings
refreshProtocol.refreshSuperUserGroupsConfiguration();
System.out.println("Refresh super user groups configuration successful");
}
return 0; return 0;
} }
@ -1103,13 +1281,31 @@ public class DFSAdmin extends FsShell {
conf.set(CommonConfigurationKeys.HADOOP_SECURITY_SERVICE_USER_NAME_KEY, conf.set(CommonConfigurationKeys.HADOOP_SECURITY_SERVICE_USER_NAME_KEY,
conf.get(DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY, "")); conf.get(DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY, ""));
// Create the client DistributedFileSystem dfs = getDFS();
RefreshCallQueueProtocol refreshProtocol = URI dfsUri = dfs.getUri();
NameNodeProxies.createProxy(conf, FileSystem.getDefaultUri(conf), boolean isHaEnabled = HAUtil.isLogicalUri(conf, dfsUri);
RefreshCallQueueProtocol.class).getProxy();
// Refresh the call queue if (isHaEnabled) {
refreshProtocol.refreshCallQueue(); // Run refreshCallQueue for all NNs if HA is enabled
String nsId = dfsUri.getHost();
List<ProxyAndInfo<RefreshCallQueueProtocol>> proxies =
HAUtil.getProxiesForAllNameNodesInNameservice(conf, nsId,
RefreshCallQueueProtocol.class);
for (ProxyAndInfo<RefreshCallQueueProtocol> proxy : proxies) {
proxy.getProxy().refreshCallQueue();
System.out.println("Refresh call queue successful for "
+ proxy.getAddress());
}
} else {
// Create the client
RefreshCallQueueProtocol refreshProtocol =
NameNodeProxies.createProxy(conf, FileSystem.getDefaultUri(conf),
RefreshCallQueueProtocol.class).getProxy();
// Refresh the call queue
refreshProtocol.refreshCallQueue();
System.out.println("Refresh call queue successful");
}
return 0; return 0;
} }

View File

@ -0,0 +1,231 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.tools;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import com.google.common.base.Charsets;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
import org.junit.After;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class TestDFSAdminWithHA {
private final ByteArrayOutputStream out = new ByteArrayOutputStream();
private final ByteArrayOutputStream err = new ByteArrayOutputStream();
private MiniQJMHACluster cluster;
private Configuration conf;
private DFSAdmin admin;
private PrintStream originOut;
private PrintStream originErr;
private static final String NSID = "ns1";
private void assertOutputMatches(String string) {
String errOutput = new String(out.toByteArray(), Charsets.UTF_8);
String output = new String(out.toByteArray(), Charsets.UTF_8);
if (!errOutput.matches(string) && !output.matches(string)) {
fail("Expected output to match '" + string +
"' but err_output was:\n" + errOutput +
"\n and output was: \n" + output);
}
out.reset();
err.reset();
}
private void setHAConf(Configuration conf, String nn1Addr, String nn2Addr) {
conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY,
"hdfs://" + NSID);
conf.set(DFSConfigKeys.DFS_NAMESERVICES, NSID);
conf.set(DFSConfigKeys.DFS_NAMESERVICE_ID, NSID);
conf.set(DFSUtil.addKeySuffixes(
DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX, NSID), "nn1,nn2");
conf.set(DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY, "nn1");
conf.set(DFSUtil.addKeySuffixes(
DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY, NSID, "nn1"), nn1Addr);
conf.set(DFSUtil.addKeySuffixes(
DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY, NSID, "nn2"), nn2Addr);
}
private void setUpHaCluster(boolean security) throws Exception {
conf = new Configuration();
conf.setBoolean(CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION,
security);
cluster = new MiniQJMHACluster.Builder(conf).build();
setHAConf(conf, cluster.getDfsCluster().getNameNode(0).getHostAndPort(),
cluster.getDfsCluster().getNameNode(1).getHostAndPort());
cluster.getDfsCluster().getNameNode(0).getHostAndPort();
admin = new DFSAdmin();
admin.setConf(conf);
assertTrue(HAUtil.isHAEnabled(conf, "ns1"));
originOut = System.out;
originErr = System.err;
System.setOut(new PrintStream(out));
System.setErr(new PrintStream(err));
}
@After
public void tearDown() throws Exception {
System.out.flush();
System.err.flush();
System.setOut(originOut);
System.setErr(originErr);
}
@Test(timeout = 30000)
public void testSetSafeMode() throws Exception {
setUpHaCluster(false);
// Enter safemode
int exitCode = admin.run(new String[] {"-safemode", "enter"});
assertEquals(err.toString().trim(), 0, exitCode);
String message = "Safe mode is ON in.*";
assertOutputMatches(message + "\n" + message + "\n");
// Get safemode
exitCode = admin.run(new String[] {"-safemode", "get"});
assertEquals(err.toString().trim(), 0, exitCode);
message = "Safe mode is ON in.*";
assertOutputMatches(message + "\n" + message + "\n");
// Leave safemode
exitCode = admin.run(new String[] {"-safemode", "leave"});
assertEquals(err.toString().trim(), 0, exitCode);
message = "Safe mode is OFF in.*";
assertOutputMatches(message + "\n" + message + "\n");
// Get safemode
exitCode = admin.run(new String[] {"-safemode", "get"});
assertEquals(err.toString().trim(), 0, exitCode);
message = "Safe mode is OFF in.*";
assertOutputMatches(message + "\n" + message + "\n");
}
@Test (timeout = 30000)
public void testSaveNamespace() throws Exception {
setUpHaCluster(false);
// Safe mode should be turned ON in order to create namespace image.
int exitCode = admin.run(new String[] {"-safemode", "enter"});
assertEquals(err.toString().trim(), 0, exitCode);
String message = "Safe mode is ON in.*";
assertOutputMatches(message + "\n" + message + "\n");
exitCode = admin.run(new String[] {"-saveNamespace"});
assertEquals(err.toString().trim(), 0, exitCode);
message = "Save namespace successful for.*";
assertOutputMatches(message + "\n" + message + "\n");
}
@Test (timeout = 30000)
public void testRestoreFailedStorage() throws Exception {
setUpHaCluster(false);
int exitCode = admin.run(new String[] {"-restoreFailedStorage", "check"});
assertEquals(err.toString().trim(), 0, exitCode);
String message = "restoreFailedStorage is set to false for.*";
// Default is false
assertOutputMatches(message + "\n" + message + "\n");
exitCode = admin.run(new String[] {"-restoreFailedStorage", "true"});
assertEquals(err.toString().trim(), 0, exitCode);
message = "restoreFailedStorage is set to true for.*";
assertOutputMatches(message + "\n" + message + "\n");
exitCode = admin.run(new String[] {"-restoreFailedStorage", "false"});
assertEquals(err.toString().trim(), 0, exitCode);
message = "restoreFailedStorage is set to false for.*";
assertOutputMatches(message + "\n" + message + "\n");
}
@Test (timeout = 30000)
public void testRefreshNodes() throws Exception {
setUpHaCluster(false);
int exitCode = admin.run(new String[] {"-refreshNodes"});
assertEquals(err.toString().trim(), 0, exitCode);
String message = "Refresh nodes successful for.*";
assertOutputMatches(message + "\n" + message + "\n");
}
@Test (timeout = 30000)
public void testSetBalancerBandwidth() throws Exception {
setUpHaCluster(false);
int exitCode = admin.run(new String[] {"-setBalancerBandwidth", "10"});
assertEquals(err.toString().trim(), 0, exitCode);
String message = "Balancer bandwidth is set to 10 for.*";
assertOutputMatches(message + "\n" + message + "\n");
}
@Test (timeout = 30000)
public void testMetaSave() throws Exception {
setUpHaCluster(false);
int exitCode = admin.run(new String[] {"-metasave", "dfs.meta"});
assertEquals(err.toString().trim(), 0, exitCode);
String message = "Created metasave file dfs.meta in the log directory"
+ " of namenode.*";
assertOutputMatches(message + "\n" + message + "\n");
}
@Test (timeout = 30000)
public void testRefreshServiceAcl() throws Exception {
setUpHaCluster(true);
int exitCode = admin.run(new String[] {"-refreshServiceAcl"});
assertEquals(err.toString().trim(), 0, exitCode);
String message = "Refresh service acl successful for.*";
assertOutputMatches(message + "\n" + message + "\n");
}
@Test (timeout = 30000)
public void testRefreshUserToGroupsMappings() throws Exception {
setUpHaCluster(false);
int exitCode = admin.run(new String[] {"-refreshUserToGroupsMappings"});
assertEquals(err.toString().trim(), 0, exitCode);
String message = "Refresh user to groups mapping successful for.*";
assertOutputMatches(message + "\n" + message + "\n");
}
@Test (timeout = 30000)
public void testRefreshSuperUserGroupsConfiguration() throws Exception {
setUpHaCluster(false);
int exitCode = admin.run(
new String[] {"-refreshSuperUserGroupsConfiguration"});
assertEquals(err.toString().trim(), 0, exitCode);
String message = "Refresh super user groups configuration successful for.*";
assertOutputMatches(message + "\n" + message + "\n");
}
@Test (timeout = 30000)
public void testRefreshCallQueue() throws Exception {
setUpHaCluster(false);
int exitCode = admin.run(new String[] {"-refreshCallQueue"});
assertEquals(err.toString().trim(), 0, exitCode);
String message = "Refresh call queue successful for.*";
assertOutputMatches(message + "\n" + message + "\n");
}
}

View File

@ -15714,8 +15714,8 @@
</cleanup-commands> </cleanup-commands>
<comparators> <comparators>
<comparator> <comparator>
<type>ExactComparator</type> <type>RegexpComparator</type>
<expected-output></expected-output> <expected-output>Refresh service acl successful(\n)*</expected-output>
</comparator> </comparator>
</comparators> </comparators>
</test><!-- </test><!--
@ -15951,8 +15951,8 @@
</cleanup-commands> </cleanup-commands>
<comparators> <comparators>
<comparator> <comparator>
<type>ExactComparator</type> <type>RegexpComparator</type>
<expected-output></expected-output> <expected-output>Save namespace successful(\n)*</expected-output>
</comparator> </comparator>
</comparators> </comparators>
</test> </test>
@ -16367,8 +16367,8 @@
</cleanup-commands> </cleanup-commands>
<comparators> <comparators>
<comparator> <comparator>
<type>ExactComparator</type> <type>RegexpComparator</type>
<expected-output></expected-output> <expected-output>Refresh user to groups mapping successful(\n)*</expected-output>
</comparator> </comparator>
</comparators> </comparators>
</test> </test>
@ -16383,8 +16383,8 @@
</cleanup-commands> </cleanup-commands>
<comparators> <comparators>
<comparator> <comparator>
<type>ExactComparator</type> <type>RegexpComparator</type>
<expected-output></expected-output> <expected-output>Refresh super user groups configuration successful(\n)*</expected-output>
</comparator> </comparator>
</comparators> </comparators>
</test> </test>
@ -16453,8 +16453,8 @@
</cleanup-commands> </cleanup-commands>
<comparators> <comparators>
<comparator> <comparator>
<type>ExactComparator</type> <type>RegexpComparator</type>
<expected-output></expected-output> <expected-output>Balancer bandwidth is set to 104857600(\n)*</expected-output>
</comparator> </comparator>
</comparators> </comparators>
</test> </test>
@ -16469,8 +16469,8 @@
</cleanup-commands> </cleanup-commands>
<comparators> <comparators>
<comparator> <comparator>
<type>ExactComparator</type> <type>SubstringComparator</type>
<expected-output></expected-output> <expected-output>Finalize upgrade successful</expected-output>
</comparator> </comparator>
</comparators> </comparators>
</test> </test>