HDFS-9094. Add command line option to ask NameNode reload configuration. (Contributed by Xiaobing Zhou)

This commit is contained in:
Arpit Agarwal 2016-01-25 12:17:05 -08:00
parent 6eacdea0e4
commit d62b4a4de7
8 changed files with 355 additions and 127 deletions

View File

@ -38,9 +38,11 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.ReconfigurationProtocol;
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory; import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient; import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolTranslatorPB; import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.ReconfigurationProtocolTranslatorPB;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.util.IOUtilsClient; import org.apache.hadoop.hdfs.util.IOUtilsClient;
import org.apache.hadoop.hdfs.web.WebHdfsConstants; import org.apache.hadoop.hdfs.web.WebHdfsConstants;
@ -496,6 +498,12 @@ public class DFSUtilClient {
return new ClientDatanodeProtocolTranslatorPB(addr, ticket, conf, factory); return new ClientDatanodeProtocolTranslatorPB(addr, ticket, conf, factory);
} }
public static ReconfigurationProtocol createReconfigurationProtocolProxy(
InetSocketAddress addr, UserGroupInformation ticket, Configuration conf,
SocketFactory factory) throws IOException {
return new ReconfigurationProtocolTranslatorPB(addr, ticket, conf, factory);
}
/** /**
* Creates a new KeyProvider from the given Configuration. * Creates a new KeyProvider from the given Configuration.
* *

View File

@ -25,6 +25,7 @@ import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.ReconfigurationTaskStatus; import org.apache.hadoop.conf.ReconfigurationTaskStatus;
import org.apache.hadoop.io.retry.Idempotent;
/********************************************************************** /**********************************************************************
* ReconfigurationProtocol is used by HDFS admin to reload configuration * ReconfigurationProtocol is used by HDFS admin to reload configuration
@ -39,16 +40,19 @@ public interface ReconfigurationProtocol {
/** /**
* Asynchronously reload configuration on disk and apply changes. * Asynchronously reload configuration on disk and apply changes.
*/ */
@Idempotent
void startReconfiguration() throws IOException; void startReconfiguration() throws IOException;
/** /**
* Get the status of the previously issued reconfig task. * Get the status of the previously issued reconfig task.
* @see {@link org.apache.hadoop.conf.ReconfigurationTaskStatus}. * @see {@link org.apache.hadoop.conf.ReconfigurationTaskStatus}.
*/ */
@Idempotent
ReconfigurationTaskStatus getReconfigurationStatus() throws IOException; ReconfigurationTaskStatus getReconfigurationStatus() throws IOException;
/** /**
* Get a list of allowed properties for reconfiguration. * Get a list of allowed properties for reconfiguration.
*/ */
@Idempotent
List<String> listReconfigurableProperties() throws IOException; List<String> listReconfigurableProperties() throws IOException;
} }

View File

@ -953,6 +953,9 @@ Release 2.9.0 - UNRELEASED
HDFS-9674. The HTrace span for OpWriteBlock should record the maxWriteToDisk HDFS-9674. The HTrace span for OpWriteBlock should record the maxWriteToDisk
time. (cmccabe via zhz) time. (cmccabe via zhz)
HDFS-9094. Add command line option to ask NameNode reload
configuration. (Xiaobing Zhou via Arpit Agarwal)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -41,9 +41,7 @@ public final class ReconfigurationProtocolServerSideUtils {
List<String> reconfigurableProperties) { List<String> reconfigurableProperties) {
ListReconfigurablePropertiesResponseProto.Builder builder = ListReconfigurablePropertiesResponseProto.Builder builder =
ListReconfigurablePropertiesResponseProto.newBuilder(); ListReconfigurablePropertiesResponseProto.newBuilder();
for (String name : reconfigurableProperties) { builder.addAllName(reconfigurableProperties);
builder.addName(name);
}
return builder.build(); return builder.build();
} }

View File

@ -42,6 +42,8 @@ import com.google.common.collect.Lists;
import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.ReconfigurationException;
import org.apache.hadoop.conf.ReconfigurationTaskStatus;
import org.apache.hadoop.crypto.CryptoProtocolVersion; import org.apache.hadoop.crypto.CryptoProtocolVersion;
import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries; import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries;
import org.apache.hadoop.fs.CacheFlag; import org.apache.hadoop.fs.CacheFlag;
@ -111,12 +113,15 @@ import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ClientNamenodeProtocol; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ClientNamenodeProtocol;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeProtocolService; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeProtocolService;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.NamenodeProtocolService; import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.NamenodeProtocolService;
import org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.ReconfigurationProtocolService;
import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB; import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB; import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolPB; import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolServerSideTranslatorPB; import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB; import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolServerSideTranslatorPB; import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.ReconfigurationProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.ReconfigurationProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys; import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
@ -286,6 +291,11 @@ class NameNodeRpcServer implements NamenodeProtocols {
BlockingService haPbService = HAServiceProtocolService BlockingService haPbService = HAServiceProtocolService
.newReflectiveBlockingService(haServiceProtocolXlator); .newReflectiveBlockingService(haServiceProtocolXlator);
ReconfigurationProtocolServerSideTranslatorPB reconfigurationProtocolXlator
= new ReconfigurationProtocolServerSideTranslatorPB(this);
BlockingService reconfigurationPbService = ReconfigurationProtocolService
.newReflectiveBlockingService(reconfigurationProtocolXlator);
TraceAdminProtocolServerSideTranslatorPB traceAdminXlator = TraceAdminProtocolServerSideTranslatorPB traceAdminXlator =
new TraceAdminProtocolServerSideTranslatorPB(this); new TraceAdminProtocolServerSideTranslatorPB(this);
BlockingService traceAdminService = TraceAdminService BlockingService traceAdminService = TraceAdminService
@ -319,6 +329,8 @@ class NameNodeRpcServer implements NamenodeProtocols {
// Add all the RPC protocols that the namenode implements // Add all the RPC protocols that the namenode implements
DFSUtil.addPBProtocol(conf, HAServiceProtocolPB.class, haPbService, DFSUtil.addPBProtocol(conf, HAServiceProtocolPB.class, haPbService,
serviceRpcServer); serviceRpcServer);
DFSUtil.addPBProtocol(conf, ReconfigurationProtocolPB.class,
reconfigurationPbService, serviceRpcServer);
DFSUtil.addPBProtocol(conf, NamenodeProtocolPB.class, NNPbService, DFSUtil.addPBProtocol(conf, NamenodeProtocolPB.class, NNPbService,
serviceRpcServer); serviceRpcServer);
DFSUtil.addPBProtocol(conf, DatanodeProtocolPB.class, dnProtoPbService, DFSUtil.addPBProtocol(conf, DatanodeProtocolPB.class, dnProtoPbService,
@ -403,6 +415,8 @@ class NameNodeRpcServer implements NamenodeProtocols {
// Add all the RPC protocols that the namenode implements // Add all the RPC protocols that the namenode implements
DFSUtil.addPBProtocol(conf, HAServiceProtocolPB.class, haPbService, DFSUtil.addPBProtocol(conf, HAServiceProtocolPB.class, haPbService,
clientRpcServer); clientRpcServer);
DFSUtil.addPBProtocol(conf, ReconfigurationProtocolPB.class,
reconfigurationPbService, clientRpcServer);
DFSUtil.addPBProtocol(conf, NamenodeProtocolPB.class, NNPbService, DFSUtil.addPBProtocol(conf, NamenodeProtocolPB.class, NNPbService,
clientRpcServer); clientRpcServer);
DFSUtil.addPBProtocol(conf, DatanodeProtocolPB.class, dnProtoPbService, DFSUtil.addPBProtocol(conf, DatanodeProtocolPB.class, dnProtoPbService,
@ -2173,4 +2187,25 @@ class NameNodeRpcServer implements NamenodeProtocols {
checkNNStartup(); checkNNStartup();
return namesystem.getErasureCodingPolicy(src); return namesystem.getErasureCodingPolicy(src);
} }
@Override // ReconfigurationProtocol
public void startReconfiguration() {
throw new UnsupportedOperationException(
"Namenode startReconfiguration is not implemented.",
new ReconfigurationException());
}
@Override // ReconfigurationProtocol
public ReconfigurationTaskStatus getReconfigurationStatus() {
throw new UnsupportedOperationException(
" Namenode getReconfigurationStatus is not implemented.",
new ReconfigurationException());
}
@Override // ReconfigurationProtocol
public List<String> listReconfigurableProperties() {
throw new UnsupportedOperationException(
" Namenode listReconfigurableProperties is not implemented.",
new ReconfigurationException());
}
} }

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.hdfs.server.protocol;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.ha.HAServiceProtocol; import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.ReconfigurationProtocol;
import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol; import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol;
import org.apache.hadoop.security.RefreshUserMappingsProtocol; import org.apache.hadoop.security.RefreshUserMappingsProtocol;
import org.apache.hadoop.ipc.RefreshCallQueueProtocol; import org.apache.hadoop.ipc.RefreshCallQueueProtocol;
@ -35,6 +36,7 @@ public interface NamenodeProtocols
DatanodeProtocol, DatanodeProtocol,
NamenodeProtocol, NamenodeProtocol,
RefreshAuthorizationPolicyProtocol, RefreshAuthorizationPolicyProtocol,
ReconfigurationProtocol,
RefreshUserMappingsProtocol, RefreshUserMappingsProtocol,
RefreshCallQueueProtocol, RefreshCallQueueProtocol,
GenericRefreshProtocol, GenericRefreshProtocol,

View File

@ -37,6 +37,7 @@ import java.util.TreeSet;
import com.google.common.base.Joiner; import com.google.common.base.Joiner;
import com.google.common.base.Optional; import com.google.common.base.Optional;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
@ -69,6 +70,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction; import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.apache.hadoop.hdfs.protocol.ReconfigurationProtocol;
import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo; import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
import org.apache.hadoop.hdfs.protocol.SnapshotException; import org.apache.hadoop.hdfs.protocol.SnapshotException;
import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode;
@ -414,7 +416,8 @@ public class DFSAdmin extends FsShell {
"\t[-refreshSuperUserGroupsConfiguration]\n" + "\t[-refreshSuperUserGroupsConfiguration]\n" +
"\t[-refreshCallQueue]\n" + "\t[-refreshCallQueue]\n" +
"\t[-refresh <host:ipc_port> <key> [arg1..argn]\n" + "\t[-refresh <host:ipc_port> <key> [arg1..argn]\n" +
"\t[-reconfig <datanode|...> <host:ipc_port> <start|status|properties>]\n" + "\t[-reconfig <namenode|datanode> <host:ipc_port> " +
"<start|status|properties>]\n" +
"\t[-printTopology]\n" + "\t[-printTopology]\n" +
"\t[-refreshNamenodes datanode_host:ipc_port]\n"+ "\t[-refreshNamenodes datanode_host:ipc_port]\n"+
"\t[-deleteBlockPool datanode_host:ipc_port blockpoolId [force]]\n"+ "\t[-deleteBlockPool datanode_host:ipc_port blockpoolId [force]]\n"+
@ -1028,12 +1031,12 @@ public class DFSAdmin extends FsShell {
String refreshCallQueue = "-refreshCallQueue: Reload the call queue from config\n"; String refreshCallQueue = "-refreshCallQueue: Reload the call queue from config\n";
String reconfig = "-reconfig <datanode|...> <host:ipc_port> <start|status|properties>:\n" + String reconfig = "-reconfig <namenode|datanode> <host:ipc_port> " +
"<start|status|properties>:\n" +
"\tStarts or gets the status of a reconfiguration operation, \n" + "\tStarts or gets the status of a reconfiguration operation, \n" +
"\tor gets a list of reconfigurable properties.\n" + "\tor gets a list of reconfigurable properties.\n" +
"\tThe second parameter specifies the node type.\n" +
"\tCurrently, only reloading DataNode's configuration is supported.\n";
"\tThe second parameter specifies the node type\n";
String genericRefresh = "-refresh: Arguments are <hostname:port> <resource_identifier> [arg1..argn]\n" + String genericRefresh = "-refresh: Arguments are <hostname:port> <resource_identifier> [arg1..argn]\n" +
"\tTriggers a runtime-refresh of the resource specified by <resource_identifier>\n" + "\tTriggers a runtime-refresh of the resource specified by <resource_identifier>\n" +
"\ton <hostname:port>. All other args after are sent to the host.\n"; "\ton <hostname:port>. All other args after are sent to the host.\n";
@ -1494,104 +1497,186 @@ public class DFSAdmin extends FsShell {
String nodeType = argv[i]; String nodeType = argv[i];
String address = argv[i + 1]; String address = argv[i + 1];
String op = argv[i + 2]; String op = argv[i + 2];
if ("start".equals(op)) { if ("start".equals(op)) {
return startReconfiguration(nodeType, address); return startReconfiguration(nodeType, address, System.out, System.err);
} else if ("status".equals(op)) { } else if ("status".equals(op)) {
return getReconfigurationStatus(nodeType, address, System.out, System.err); return getReconfigurationStatus(nodeType, address, System.out, System.err);
} else if ("properties".equals(op)) { } else if ("properties".equals(op)) {
return getReconfigurableProperties( return getReconfigurableProperties(nodeType, address, System.out,
nodeType, address, System.out, System.err); System.err);
} }
System.err.println("Unknown operation: " + op); System.err.println("Unknown operation: " + op);
return -1; return -1;
} }
int startReconfiguration(String nodeType, String address) throws IOException { int startReconfiguration(final String nodeThpe, final String address)
if ("datanode".equals(nodeType)) { throws IOException {
ClientDatanodeProtocol dnProxy = getDataNodeProxy(address); return startReconfiguration(nodeThpe, address, System.out, System.err);
dnProxy.startReconfiguration(); }
System.out.println("Started reconfiguration task on DataNode " + address);
int startReconfiguration(final String nodeType, final String address,
final PrintStream out, final PrintStream err) throws IOException {
String outMsg = null;
String errMsg = null;
int ret = 0;
try {
ret = startReconfigurationDispatch(nodeType, address, out, err);
outMsg = String.format("Started reconfiguration task on node [%s].",
address);
} catch (IOException e) {
errMsg = String.format("Node [%s] reconfiguring: %s.", address,
e.toString());
}
if (errMsg != null) {
err.println(errMsg);
return 1;
} else {
out.println(outMsg);
return ret;
}
}
int startReconfigurationDispatch(final String nodeType,
final String address, final PrintStream out, final PrintStream err)
throws IOException {
if ("namenode".equals(nodeType)) {
ReconfigurationProtocol reconfProxy = getNameNodeProxy(address);
reconfProxy.startReconfiguration();
return 0;
} else if ("datanode".equals(nodeType)) {
ClientDatanodeProtocol reconfProxy = getDataNodeProxy(address);
reconfProxy.startReconfiguration();
return 0; return 0;
} else { } else {
System.err.println("Node type " + nodeType + System.err.println("Node type " + nodeType
" does not support reconfiguration."); + " does not support reconfiguration.");
return 1; return 1;
} }
} }
int getReconfigurationStatus(String nodeType, String address, int getReconfigurationStatus(final String nodeType, final String address,
PrintStream out, PrintStream err) throws IOException { final PrintStream out, final PrintStream err) throws IOException {
if ("datanode".equals(nodeType)) { String outMsg = null;
ClientDatanodeProtocol dnProxy = getDataNodeProxy(address); String errMsg = null;
try { ReconfigurationTaskStatus status = null;
ReconfigurationTaskStatus status = dnProxy.getReconfigurationStatus();
out.print("Reconfiguring status for DataNode[" + address + "]: ");
if (!status.hasTask()) {
out.println("no task was found.");
return 0;
}
out.print("started at " + new Date(status.getStartTime()));
if (!status.stopped()) {
out.println(" and is still running.");
return 0;
}
out.println(" and finished at " + try {
new Date(status.getEndTime()).toString() + "."); status = getReconfigurationStatusDispatch(nodeType, address, out, err);
if (status.getStatus() == null) { outMsg = String.format("Reconfiguring status for node [%s]: ", address);
// Nothing to report. } catch (IOException e) {
return 0; errMsg = String.format("Node [%s] reloading configuration: %s.", address,
e.toString());
}
if (errMsg != null) {
err.println(errMsg);
return 1;
} else {
out.print(outMsg);
}
if (status != null) {
if (!status.hasTask()) {
out.println("no task was found.");
return 0;
}
out.print("started at " + new Date(status.getStartTime()));
if (!status.stopped()) {
out.println(" and is still running.");
return 0;
}
out.println(" and finished at "
+ new Date(status.getEndTime()).toString() + ".");
if (status.getStatus() == null) {
// Nothing to report.
return 0;
}
for (Map.Entry<PropertyChange, Optional<String>> result : status
.getStatus().entrySet()) {
if (!result.getValue().isPresent()) {
out.printf(
"SUCCESS: Changed property %s%n\tFrom: \"%s\"%n\tTo: \"%s\"%n",
result.getKey().prop, result.getKey().oldVal,
result.getKey().newVal);
} else {
final String errorMsg = result.getValue().get();
out.printf(
"FAILED: Change property %s%n\tFrom: \"%s\"%n\tTo: \"%s\"%n",
result.getKey().prop, result.getKey().oldVal,
result.getKey().newVal);
out.println("\tError: " + errorMsg + ".");
} }
for (Map.Entry<PropertyChange, Optional<String>> result :
status.getStatus().entrySet()) {
if (!result.getValue().isPresent()) {
out.printf(
"SUCCESS: Changed property %s%n\tFrom: \"%s\"%n\tTo: \"%s\"%n",
result.getKey().prop, result.getKey().oldVal,
result.getKey().newVal);
} else {
final String errorMsg = result.getValue().get();
out.printf(
"FAILED: Change property %s%n\tFrom: \"%s\"%n\tTo: \"%s\"%n",
result.getKey().prop, result.getKey().oldVal,
result.getKey().newVal);
out.println("\tError: " + errorMsg + ".");
}
}
} catch (IOException e) {
err.println("DataNode reloading configuration: " + e + ".");
return 1;
} }
} else { } else {
err.println("Node type " + nodeType +
" does not support reconfiguration.");
return 1; return 1;
} }
return 0; return 0;
} }
int getReconfigurableProperties(String nodeType, String address, ReconfigurationTaskStatus getReconfigurationStatusDispatch(
PrintStream out, PrintStream err) throws IOException { final String nodeType, final String address, final PrintStream out,
if ("datanode".equals(nodeType)) { final PrintStream err) throws IOException {
ClientDatanodeProtocol dnProxy = getDataNodeProxy(address); if ("namenode".equals(nodeType)) {
try { ReconfigurationProtocol reconfProxy = getNameNodeProxy(address);
List<String> properties = return reconfProxy.getReconfigurationStatus();
dnProxy.listReconfigurableProperties(); } else if ("datanode".equals(nodeType)) {
out.println( ClientDatanodeProtocol reconfProxy = getDataNodeProxy(address);
"Configuration properties that are allowed to be reconfigured:"); return reconfProxy.getReconfigurationStatus();
for (String name : properties) {
out.println(name);
}
} catch (IOException e) {
err.println("DataNode reconfiguration: " + e + ".");
return 1;
}
} else { } else {
err.println("Node type " + nodeType + err.println("Node type " + nodeType
" does not support reconfiguration."); + " does not support reconfiguration.");
return 1; return null;
}
}
int getReconfigurableProperties(final String nodeType, final String address,
final PrintStream out, final PrintStream err) throws IOException {
String outMsg = null;
String errMsg = null;
List<String> properties = null;
try {
properties = getReconfigurablePropertiesDispatch(nodeType, address, out,
err);
outMsg = String.format("Node [%s] Reconfigurable properties:", address);
} catch (IOException e) {
errMsg = String.format("Node [%s] reconfiguration: %s.", address,
e.toString());
}
if (errMsg != null) {
err.println(errMsg);
return 1;
} else if (properties == null) {
return 1;
} else {
out.println(outMsg);
for (String name : properties) {
out.println(name);
}
return 0;
}
}
List<String> getReconfigurablePropertiesDispatch(final String nodeType,
final String address, final PrintStream out, final PrintStream err)
throws IOException {
if ("namenode".equals(nodeType)) {
ReconfigurationProtocol reconfProxy = getNameNodeProxy(address);
return reconfProxy.listReconfigurableProperties();
} else if ("datanode".equals(nodeType)) {
ClientDatanodeProtocol reconfProxy = getDataNodeProxy(address);
return reconfProxy.listReconfigurableProperties();
} else {
err.println("Node type " + nodeType
+ " does not support reconfiguration.");
return null;
} }
return 0;
} }
public int genericRefresh(String[] argv, int i) throws IOException { public int genericRefresh(String[] argv, int i) throws IOException {
@ -1712,7 +1797,7 @@ public class DFSAdmin extends FsShell {
+ " [-refreshCallQueue]"); + " [-refreshCallQueue]");
} else if ("-reconfig".equals(cmd)) { } else if ("-reconfig".equals(cmd)) {
System.err.println("Usage: hdfs dfsadmin" System.err.println("Usage: hdfs dfsadmin"
+ " [-reconfig <datanode|...> <host:port> <start|status>]"); + " [-reconfig <namenode|datanode> <host:port> <start|status>]");
} else if ("-refresh".equals(cmd)) { } else if ("-refresh".equals(cmd)) {
System.err.println("Usage: hdfs dfsadmin" System.err.println("Usage: hdfs dfsadmin"
+ " [-refresh <hostname:port> <resource_identifier> [arg1..argn]"); + " [-refresh <hostname:port> <resource_identifier> [arg1..argn]");
@ -2028,6 +2113,23 @@ public class DFSAdmin extends FsShell {
NetUtils.getSocketFactory(conf, ClientDatanodeProtocol.class)); NetUtils.getSocketFactory(conf, ClientDatanodeProtocol.class));
return dnProtocol; return dnProtocol;
} }
private ReconfigurationProtocol getNameNodeProxy(String node)
throws IOException {
InetSocketAddress nodeAddr = NetUtils.createSocketAddr(node);
// Get the current configuration
Configuration conf = getConf();
// For namenode proxy the server principal should be NN's one.
conf.set(CommonConfigurationKeys.HADOOP_SECURITY_SERVICE_USER_NAME_KEY,
conf.get(DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY, ""));
// Create the client
ReconfigurationProtocol reconfigProtocol = DFSUtilClient
.createReconfigurationProtocolProxy(nodeAddr, getUGI(), conf,
NetUtils.getSocketFactory(conf, ReconfigurationProtocol.class));
return reconfigProtocol;
}
private int deleteBlockPool(String[] argv, int i) throws IOException { private int deleteBlockPool(String[] argv, int i) throws IOException {
ClientDatanodeProtocol dnProxy = getDataNodeProxy(argv[i]); ClientDatanodeProtocol dnProxy = getDataNodeProxy(argv[i]);

View File

@ -20,6 +20,9 @@ package org.apache.hadoop.hdfs.tools;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.ReconfigurationUtil; import org.apache.hadoop.conf.ReconfigurationUtil;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
@ -27,6 +30,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.common.Storage; import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation; import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -52,10 +56,12 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
public class TestDFSAdmin { public class TestDFSAdmin {
private static final Log LOG = LogFactory.getLog(DFSAdmin.class);
private Configuration conf = null; private Configuration conf = null;
private MiniDFSCluster cluster; private MiniDFSCluster cluster;
private DFSAdmin admin; private DFSAdmin admin;
private DataNode datanode; private DataNode datanode;
private NameNode namenode;
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
@ -80,21 +86,64 @@ public class TestDFSAdmin {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive(); cluster.waitActive();
datanode = cluster.getDataNodes().get(0); datanode = cluster.getDataNodes().get(0);
namenode = cluster.getNameNode();
} }
private List<String> getReconfigureStatus(String nodeType, String address) private void startReconfiguration(String nodeType, String address,
throws IOException { final List<String> outs, final List<String> errs) throws IOException {
reconfigurationOutErrFormatter("startReconfiguration", nodeType,
address, outs, errs);
}
private void getReconfigurableProperties(String nodeType, String address,
final List<String> outs, final List<String> errs) throws IOException {
reconfigurationOutErrFormatter("getReconfigurableProperties", nodeType,
address, outs, errs);
}
private void getReconfigurationStatus(String nodeType, String address,
final List<String> outs, final List<String> errs) throws IOException {
reconfigurationOutErrFormatter("getReconfigurationStatus", nodeType,
address, outs, errs);
}
private void reconfigurationOutErrFormatter(String methodName,
String nodeType, String address, final List<String> outs,
final List<String> errs) throws IOException {
ByteArrayOutputStream bufOut = new ByteArrayOutputStream(); ByteArrayOutputStream bufOut = new ByteArrayOutputStream();
PrintStream out = new PrintStream(bufOut); PrintStream out = new PrintStream(bufOut);
ByteArrayOutputStream bufErr = new ByteArrayOutputStream(); ByteArrayOutputStream bufErr = new ByteArrayOutputStream();
PrintStream err = new PrintStream(bufErr); PrintStream err = new PrintStream(bufErr);
admin.getReconfigurationStatus(nodeType, address, out, err);
Scanner scanner = new Scanner(bufOut.toString()); if (methodName.equals("getReconfigurableProperties")) {
List<String> outputs = Lists.newArrayList(); admin.getReconfigurableProperties(nodeType, address, out, err);
while (scanner.hasNextLine()) { } else if (methodName.equals("getReconfigurationStatus")) {
outputs.add(scanner.nextLine()); admin.getReconfigurationStatus(nodeType, address, out, err);
} else if (methodName.equals("startReconfiguration")) {
admin.startReconfiguration(nodeType, address, out, err);
} }
return outputs;
Scanner scanner = new Scanner(bufOut.toString());
while (scanner.hasNextLine()) {
outs.add(scanner.nextLine());
}
scanner.close();
scanner = new Scanner(bufErr.toString());
while (scanner.hasNextLine()) {
errs.add(scanner.nextLine());
}
scanner.close();
}
@Test(timeout = 30000)
public void testDataNodeGetReconfigurableProperties() throws IOException {
final int port = datanode.getIpcPort();
final String address = "localhost:" + port;
final List<String> outs = Lists.newArrayList();
final List<String> errs = Lists.newArrayList();
getReconfigurableProperties("datanode", address, outs, errs);
assertEquals(3, outs.size());
assertEquals(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, outs.get(1));
} }
/** /**
@ -103,7 +152,7 @@ public class TestDFSAdmin {
* @throws IOException * @throws IOException
* @throws InterruptedException * @throws InterruptedException
*/ */
private void testGetReconfigurationStatus(boolean expectedSuccuss) private void testDataNodeGetReconfigurationStatus(boolean expectedSuccuss)
throws IOException, InterruptedException { throws IOException, InterruptedException {
ReconfigurationUtil ru = mock(ReconfigurationUtil.class); ReconfigurationUtil ru = mock(ReconfigurationUtil.class);
datanode.setReconfigurationUtil(ru); datanode.setReconfigurationUtil(ru);
@ -130,21 +179,25 @@ public class TestDFSAdmin {
assertThat(admin.startReconfiguration("datanode", address), is(0)); assertThat(admin.startReconfiguration("datanode", address), is(0));
List<String> outputs = null;
int count = 100; int count = 100;
final List<String> outs = Lists.newArrayList();
final List<String> errs = Lists.newArrayList();
while (count > 0) { while (count > 0) {
outputs = getReconfigureStatus("datanode", address); outs.clear();
if (!outputs.isEmpty() && outputs.get(0).contains("finished")) { errs.clear();
getReconfigurationStatus("datanode", address, outs, errs);
if (!outs.isEmpty() && outs.get(0).contains("finished")) {
break; break;
} }
count--; count--;
Thread.sleep(100); Thread.sleep(100);
} }
LOG.info(String.format("count=%d", count));
assertTrue(count > 0); assertTrue(count > 0);
if (expectedSuccuss) { if (expectedSuccuss) {
assertThat(outputs.size(), is(4)); assertThat(outs.size(), is(4));
} else { } else {
assertThat(outputs.size(), is(6)); assertThat(outs.size(), is(6));
} }
List<StorageLocation> locations = DataNode.getStorageLocations( List<StorageLocation> locations = DataNode.getStorageLocations(
@ -160,55 +213,78 @@ public class TestDFSAdmin {
int offset = 1; int offset = 1;
if (expectedSuccuss) { if (expectedSuccuss) {
assertThat(outputs.get(offset), assertThat(outs.get(offset),
containsString("SUCCESS: Changed property " + containsString("SUCCESS: Changed property " +
DFS_DATANODE_DATA_DIR_KEY)); DFS_DATANODE_DATA_DIR_KEY));
} else { } else {
assertThat(outputs.get(offset), assertThat(outs.get(offset),
containsString("FAILED: Change property " + containsString("FAILED: Change property " +
DFS_DATANODE_DATA_DIR_KEY)); DFS_DATANODE_DATA_DIR_KEY));
} }
assertThat(outputs.get(offset + 1), assertThat(outs.get(offset + 1),
is(allOf(containsString("From:"), containsString("data1"), is(allOf(containsString("From:"), containsString("data1"),
containsString("data2")))); containsString("data2"))));
assertThat(outputs.get(offset + 2), assertThat(outs.get(offset + 2),
is(not(anyOf(containsString("data1"), containsString("data2"))))); is(not(anyOf(containsString("data1"), containsString("data2")))));
assertThat(outputs.get(offset + 2), assertThat(outs.get(offset + 2),
is(allOf(containsString("To"), containsString("data_new")))); is(allOf(containsString("To"), containsString("data_new"))));
} }
@Test(timeout = 30000) @Test(timeout = 30000)
public void testGetReconfigurationStatus() public void testDataNodeGetReconfigurationStatus() throws IOException,
throws IOException, InterruptedException { InterruptedException {
testGetReconfigurationStatus(true); testDataNodeGetReconfigurationStatus(true);
restartCluster(); restartCluster();
testGetReconfigurationStatus(false); testDataNodeGetReconfigurationStatus(false);
}
private List<String> getReconfigurationAllowedProperties(
String nodeType, String address)
throws IOException {
ByteArrayOutputStream bufOut = new ByteArrayOutputStream();
PrintStream out = new PrintStream(bufOut);
ByteArrayOutputStream bufErr = new ByteArrayOutputStream();
PrintStream err = new PrintStream(bufErr);
admin.getReconfigurableProperties(nodeType, address, out, err);
Scanner scanner = new Scanner(bufOut.toString());
List<String> outputs = Lists.newArrayList();
while (scanner.hasNextLine()) {
outputs.add(scanner.nextLine());
}
return outputs;
} }
@Test(timeout = 30000) @Test(timeout = 30000)
public void testGetReconfigAllowedProperties() throws IOException { public void testNameNodeStartReconfiguration() throws IOException {
final int port = datanode.getIpcPort(); final String address = namenode.getHostAndPort();
final String address = "localhost:" + port; final List<String> outs = Lists.newArrayList();
List<String> outputs = final List<String> errs = Lists.newArrayList();
getReconfigurationAllowedProperties("datanode", address); startReconfiguration("namenode", address, outs, errs);
assertEquals(3, outputs.size()); assertEquals(0, outs.size());
assertEquals(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, assertTrue(errs.size() > 1);
outputs.get(1)); assertThat(
errs.get(0),
is(allOf(containsString("Namenode"), containsString("reconfiguring:"),
containsString("startReconfiguration"),
containsString("is not implemented"),
containsString("UnsupportedOperationException"))));
}
@Test(timeout = 30000)
public void testNameNodeGetReconfigurableProperties() throws IOException {
final String address = namenode.getHostAndPort();
final List<String> outs = Lists.newArrayList();
final List<String> errs = Lists.newArrayList();
getReconfigurableProperties("namenode", address, outs, errs);
assertEquals(0, outs.size());
assertTrue(errs.size() > 1);
assertThat(
errs.get(0),
is(allOf(containsString("Namenode"),
containsString("reconfiguration:"),
containsString("listReconfigurableProperties"),
containsString("is not implemented"),
containsString("UnsupportedOperationException"))));
}
@Test(timeout = 30000)
public void testNameNodeGetReconfigurationStatus() throws IOException {
final String address = namenode.getHostAndPort();
final List<String> outs = Lists.newArrayList();
final List<String> errs = Lists.newArrayList();
getReconfigurationStatus("namenode", address, outs, errs);
assertEquals(0, outs.size());
assertTrue(errs.size() > 1);
assertThat(
errs.get(0),
is(allOf(containsString("Namenode"),
containsString("reloading configuration:"),
containsString("getReconfigurationStatus"),
containsString("is not implemented"),
containsString("UnsupportedOperationException"))));
} }
} }