diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java index 62c5d8167a7..8f6ed14ca75 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java @@ -38,9 +38,11 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.protocol.ReconfigurationProtocol; import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory; import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient; import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolTranslatorPB; +import org.apache.hadoop.hdfs.protocolPB.ReconfigurationProtocolTranslatorPB; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.util.IOUtilsClient; import org.apache.hadoop.hdfs.web.WebHdfsConstants; @@ -496,6 +498,12 @@ public class DFSUtilClient { return new ClientDatanodeProtocolTranslatorPB(addr, ticket, conf, factory); } + public static ReconfigurationProtocol createReconfigurationProtocolProxy( + InetSocketAddress addr, UserGroupInformation ticket, Configuration conf, + SocketFactory factory) throws IOException { + return new ReconfigurationProtocolTranslatorPB(addr, ticket, conf, factory); + } + /** * Creates a new KeyProvider from the given Configuration. * diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ReconfigurationProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ReconfigurationProtocol.java index 75dc8777f78..837043882f3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ReconfigurationProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ReconfigurationProtocol.java @@ -25,6 +25,7 @@ import java.util.List; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.ReconfigurationTaskStatus; +import org.apache.hadoop.io.retry.Idempotent; /********************************************************************** * ReconfigurationProtocol is used by HDFS admin to reload configuration @@ -39,16 +40,19 @@ public interface ReconfigurationProtocol { /** * Asynchronously reload configuration on disk and apply changes. */ + @Idempotent void startReconfiguration() throws IOException; /** * Get the status of the previously issued reconfig task. * @see {@link org.apache.hadoop.conf.ReconfigurationTaskStatus}. */ + @Idempotent ReconfigurationTaskStatus getReconfigurationStatus() throws IOException; /** * Get a list of allowed properties for reconfiguration. */ + @Idempotent List listReconfigurableProperties() throws IOException; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 49b4d8a617e..e5285b68223 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -953,6 +953,9 @@ Release 2.9.0 - UNRELEASED HDFS-9674. The HTrace span for OpWriteBlock should record the maxWriteToDisk time. (cmccabe via zhz) + HDFS-9094. Add command line option to ask NameNode reload + configuration. (Xiaobing Zhou via Arpit Agarwal) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ReconfigurationProtocolServerSideUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ReconfigurationProtocolServerSideUtils.java index b2be9cd58b7..9e24204563b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ReconfigurationProtocolServerSideUtils.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ReconfigurationProtocolServerSideUtils.java @@ -41,9 +41,7 @@ public final class ReconfigurationProtocolServerSideUtils { List reconfigurableProperties) { ListReconfigurablePropertiesResponseProto.Builder builder = ListReconfigurablePropertiesResponseProto.newBuilder(); - for (String name : reconfigurableProperties) { - builder.addName(name); - } + builder.addAllName(reconfigurableProperties); return builder.build(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index 77852606f70..c1646c5005f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -42,6 +42,8 @@ import com.google.common.collect.Lists; import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.ReconfigurationException; +import org.apache.hadoop.conf.ReconfigurationTaskStatus; import org.apache.hadoop.crypto.CryptoProtocolVersion; import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries; import org.apache.hadoop.fs.CacheFlag; @@ -111,12 +113,15 @@ import org.apache.hadoop.hdfs.protocol.UnresolvedPathException; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ClientNamenodeProtocol; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeProtocolService; import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.NamenodeProtocolService; +import org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.ReconfigurationProtocolService; import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB; import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB; import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolPB; import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolServerSideTranslatorPB; import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB; import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolServerSideTranslatorPB; +import org.apache.hadoop.hdfs.protocolPB.ReconfigurationProtocolPB; +import org.apache.hadoop.hdfs.protocolPB.ReconfigurationProtocolServerSideTranslatorPB; import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; @@ -286,6 +291,11 @@ class NameNodeRpcServer implements NamenodeProtocols { BlockingService haPbService = HAServiceProtocolService .newReflectiveBlockingService(haServiceProtocolXlator); + ReconfigurationProtocolServerSideTranslatorPB reconfigurationProtocolXlator + = new ReconfigurationProtocolServerSideTranslatorPB(this); + BlockingService reconfigurationPbService = ReconfigurationProtocolService + .newReflectiveBlockingService(reconfigurationProtocolXlator); + TraceAdminProtocolServerSideTranslatorPB traceAdminXlator = new TraceAdminProtocolServerSideTranslatorPB(this); BlockingService traceAdminService = TraceAdminService @@ -319,6 +329,8 @@ class NameNodeRpcServer implements NamenodeProtocols { // Add all the RPC protocols that the namenode implements DFSUtil.addPBProtocol(conf, HAServiceProtocolPB.class, haPbService, serviceRpcServer); + DFSUtil.addPBProtocol(conf, ReconfigurationProtocolPB.class, + reconfigurationPbService, serviceRpcServer); DFSUtil.addPBProtocol(conf, NamenodeProtocolPB.class, NNPbService, serviceRpcServer); DFSUtil.addPBProtocol(conf, DatanodeProtocolPB.class, dnProtoPbService, @@ -403,6 +415,8 @@ class NameNodeRpcServer implements NamenodeProtocols { // Add all the RPC protocols that the namenode implements DFSUtil.addPBProtocol(conf, HAServiceProtocolPB.class, haPbService, clientRpcServer); + DFSUtil.addPBProtocol(conf, ReconfigurationProtocolPB.class, + reconfigurationPbService, clientRpcServer); DFSUtil.addPBProtocol(conf, NamenodeProtocolPB.class, NNPbService, clientRpcServer); DFSUtil.addPBProtocol(conf, DatanodeProtocolPB.class, dnProtoPbService, @@ -2173,4 +2187,25 @@ class NameNodeRpcServer implements NamenodeProtocols { checkNNStartup(); return namesystem.getErasureCodingPolicy(src); } + + @Override // ReconfigurationProtocol + public void startReconfiguration() { + throw new UnsupportedOperationException( + "Namenode startReconfiguration is not implemented.", + new ReconfigurationException()); + } + + @Override // ReconfigurationProtocol + public ReconfigurationTaskStatus getReconfigurationStatus() { + throw new UnsupportedOperationException( + " Namenode getReconfigurationStatus is not implemented.", + new ReconfigurationException()); + } + + @Override // ReconfigurationProtocol + public List listReconfigurableProperties() { + throw new UnsupportedOperationException( + " Namenode listReconfigurableProperties is not implemented.", + new ReconfigurationException()); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocols.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocols.java index 23b6f2ecf0f..4a3d83dee7e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocols.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocols.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hdfs.server.protocol; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.ha.HAServiceProtocol; import org.apache.hadoop.hdfs.protocol.ClientProtocol; +import org.apache.hadoop.hdfs.protocol.ReconfigurationProtocol; import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol; import org.apache.hadoop.security.RefreshUserMappingsProtocol; import org.apache.hadoop.ipc.RefreshCallQueueProtocol; @@ -35,6 +36,7 @@ public interface NamenodeProtocols DatanodeProtocol, NamenodeProtocol, RefreshAuthorizationPolicyProtocol, + ReconfigurationProtocol, RefreshUserMappingsProtocol, RefreshCallQueueProtocol, GenericRefreshProtocol, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java index 5da3bc5fede..9c782e9d622 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java @@ -37,6 +37,7 @@ import java.util.TreeSet; import com.google.common.base.Joiner; import com.google.common.base.Optional; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -69,6 +70,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction; import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; +import org.apache.hadoop.hdfs.protocol.ReconfigurationProtocol; import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo; import org.apache.hadoop.hdfs.protocol.SnapshotException; import org.apache.hadoop.hdfs.server.namenode.NameNode; @@ -414,7 +416,8 @@ public class DFSAdmin extends FsShell { "\t[-refreshSuperUserGroupsConfiguration]\n" + "\t[-refreshCallQueue]\n" + "\t[-refresh [arg1..argn]\n" + - "\t[-reconfig ]\n" + + "\t[-reconfig " + + "]\n" + "\t[-printTopology]\n" + "\t[-refreshNamenodes datanode_host:ipc_port]\n"+ "\t[-deleteBlockPool datanode_host:ipc_port blockpoolId [force]]\n"+ @@ -1028,12 +1031,12 @@ public class DFSAdmin extends FsShell { String refreshCallQueue = "-refreshCallQueue: Reload the call queue from config\n"; - String reconfig = "-reconfig :\n" + + String reconfig = "-reconfig " + + ":\n" + "\tStarts or gets the status of a reconfiguration operation, \n" + "\tor gets a list of reconfigurable properties.\n" + - "\tThe second parameter specifies the node type.\n" + - "\tCurrently, only reloading DataNode's configuration is supported.\n"; + "\tThe second parameter specifies the node type\n"; String genericRefresh = "-refresh: Arguments are [arg1..argn]\n" + "\tTriggers a runtime-refresh of the resource specified by \n" + "\ton . All other args after are sent to the host.\n"; @@ -1494,104 +1497,186 @@ public class DFSAdmin extends FsShell { String nodeType = argv[i]; String address = argv[i + 1]; String op = argv[i + 2]; + if ("start".equals(op)) { - return startReconfiguration(nodeType, address); + return startReconfiguration(nodeType, address, System.out, System.err); } else if ("status".equals(op)) { return getReconfigurationStatus(nodeType, address, System.out, System.err); } else if ("properties".equals(op)) { - return getReconfigurableProperties( - nodeType, address, System.out, System.err); + return getReconfigurableProperties(nodeType, address, System.out, + System.err); } System.err.println("Unknown operation: " + op); return -1; } - int startReconfiguration(String nodeType, String address) throws IOException { - if ("datanode".equals(nodeType)) { - ClientDatanodeProtocol dnProxy = getDataNodeProxy(address); - dnProxy.startReconfiguration(); - System.out.println("Started reconfiguration task on DataNode " + address); + int startReconfiguration(final String nodeThpe, final String address) + throws IOException { + return startReconfiguration(nodeThpe, address, System.out, System.err); + } + + int startReconfiguration(final String nodeType, final String address, + final PrintStream out, final PrintStream err) throws IOException { + String outMsg = null; + String errMsg = null; + int ret = 0; + + try { + ret = startReconfigurationDispatch(nodeType, address, out, err); + outMsg = String.format("Started reconfiguration task on node [%s].", + address); + } catch (IOException e) { + errMsg = String.format("Node [%s] reconfiguring: %s.", address, + e.toString()); + } + + if (errMsg != null) { + err.println(errMsg); + return 1; + } else { + out.println(outMsg); + return ret; + } + } + + int startReconfigurationDispatch(final String nodeType, + final String address, final PrintStream out, final PrintStream err) + throws IOException { + if ("namenode".equals(nodeType)) { + ReconfigurationProtocol reconfProxy = getNameNodeProxy(address); + reconfProxy.startReconfiguration(); + return 0; + } else if ("datanode".equals(nodeType)) { + ClientDatanodeProtocol reconfProxy = getDataNodeProxy(address); + reconfProxy.startReconfiguration(); return 0; } else { - System.err.println("Node type " + nodeType + - " does not support reconfiguration."); + System.err.println("Node type " + nodeType + + " does not support reconfiguration."); return 1; } } - int getReconfigurationStatus(String nodeType, String address, - PrintStream out, PrintStream err) throws IOException { - if ("datanode".equals(nodeType)) { - ClientDatanodeProtocol dnProxy = getDataNodeProxy(address); - try { - ReconfigurationTaskStatus status = dnProxy.getReconfigurationStatus(); - out.print("Reconfiguring status for DataNode[" + address + "]: "); - if (!status.hasTask()) { - out.println("no task was found."); - return 0; - } - out.print("started at " + new Date(status.getStartTime())); - if (!status.stopped()) { - out.println(" and is still running."); - return 0; - } + int getReconfigurationStatus(final String nodeType, final String address, + final PrintStream out, final PrintStream err) throws IOException { + String outMsg = null; + String errMsg = null; + ReconfigurationTaskStatus status = null; - out.println(" and finished at " + - new Date(status.getEndTime()).toString() + "."); - if (status.getStatus() == null) { - // Nothing to report. - return 0; + try { + status = getReconfigurationStatusDispatch(nodeType, address, out, err); + outMsg = String.format("Reconfiguring status for node [%s]: ", address); + } catch (IOException e) { + errMsg = String.format("Node [%s] reloading configuration: %s.", address, + e.toString()); + } + + if (errMsg != null) { + err.println(errMsg); + return 1; + } else { + out.print(outMsg); + } + + if (status != null) { + if (!status.hasTask()) { + out.println("no task was found."); + return 0; + } + out.print("started at " + new Date(status.getStartTime())); + if (!status.stopped()) { + out.println(" and is still running."); + return 0; + } + + out.println(" and finished at " + + new Date(status.getEndTime()).toString() + "."); + if (status.getStatus() == null) { + // Nothing to report. + return 0; + } + for (Map.Entry> result : status + .getStatus().entrySet()) { + if (!result.getValue().isPresent()) { + out.printf( + "SUCCESS: Changed property %s%n\tFrom: \"%s\"%n\tTo: \"%s\"%n", + result.getKey().prop, result.getKey().oldVal, + result.getKey().newVal); + } else { + final String errorMsg = result.getValue().get(); + out.printf( + "FAILED: Change property %s%n\tFrom: \"%s\"%n\tTo: \"%s\"%n", + result.getKey().prop, result.getKey().oldVal, + result.getKey().newVal); + out.println("\tError: " + errorMsg + "."); } - for (Map.Entry> result : - status.getStatus().entrySet()) { - if (!result.getValue().isPresent()) { - out.printf( - "SUCCESS: Changed property %s%n\tFrom: \"%s\"%n\tTo: \"%s\"%n", - result.getKey().prop, result.getKey().oldVal, - result.getKey().newVal); - } else { - final String errorMsg = result.getValue().get(); - out.printf( - "FAILED: Change property %s%n\tFrom: \"%s\"%n\tTo: \"%s\"%n", - result.getKey().prop, result.getKey().oldVal, - result.getKey().newVal); - out.println("\tError: " + errorMsg + "."); - } - } - } catch (IOException e) { - err.println("DataNode reloading configuration: " + e + "."); - return 1; } } else { - err.println("Node type " + nodeType + - " does not support reconfiguration."); return 1; } + return 0; } - int getReconfigurableProperties(String nodeType, String address, - PrintStream out, PrintStream err) throws IOException { - if ("datanode".equals(nodeType)) { - ClientDatanodeProtocol dnProxy = getDataNodeProxy(address); - try { - List properties = - dnProxy.listReconfigurableProperties(); - out.println( - "Configuration properties that are allowed to be reconfigured:"); - for (String name : properties) { - out.println(name); - } - } catch (IOException e) { - err.println("DataNode reconfiguration: " + e + "."); - return 1; - } + ReconfigurationTaskStatus getReconfigurationStatusDispatch( + final String nodeType, final String address, final PrintStream out, + final PrintStream err) throws IOException { + if ("namenode".equals(nodeType)) { + ReconfigurationProtocol reconfProxy = getNameNodeProxy(address); + return reconfProxy.getReconfigurationStatus(); + } else if ("datanode".equals(nodeType)) { + ClientDatanodeProtocol reconfProxy = getDataNodeProxy(address); + return reconfProxy.getReconfigurationStatus(); } else { - err.println("Node type " + nodeType + - " does not support reconfiguration."); - return 1; + err.println("Node type " + nodeType + + " does not support reconfiguration."); + return null; + } + } + + int getReconfigurableProperties(final String nodeType, final String address, + final PrintStream out, final PrintStream err) throws IOException { + String outMsg = null; + String errMsg = null; + List properties = null; + + try { + properties = getReconfigurablePropertiesDispatch(nodeType, address, out, + err); + outMsg = String.format("Node [%s] Reconfigurable properties:", address); + } catch (IOException e) { + errMsg = String.format("Node [%s] reconfiguration: %s.", address, + e.toString()); + } + + if (errMsg != null) { + err.println(errMsg); + return 1; + } else if (properties == null) { + return 1; + } else { + out.println(outMsg); + for (String name : properties) { + out.println(name); + } + return 0; + } + } + + List getReconfigurablePropertiesDispatch(final String nodeType, + final String address, final PrintStream out, final PrintStream err) + throws IOException { + if ("namenode".equals(nodeType)) { + ReconfigurationProtocol reconfProxy = getNameNodeProxy(address); + return reconfProxy.listReconfigurableProperties(); + } else if ("datanode".equals(nodeType)) { + ClientDatanodeProtocol reconfProxy = getDataNodeProxy(address); + return reconfProxy.listReconfigurableProperties(); + } else { + err.println("Node type " + nodeType + + " does not support reconfiguration."); + return null; } - return 0; } public int genericRefresh(String[] argv, int i) throws IOException { @@ -1712,7 +1797,7 @@ public class DFSAdmin extends FsShell { + " [-refreshCallQueue]"); } else if ("-reconfig".equals(cmd)) { System.err.println("Usage: hdfs dfsadmin" - + " [-reconfig ]"); + + " [-reconfig ]"); } else if ("-refresh".equals(cmd)) { System.err.println("Usage: hdfs dfsadmin" + " [-refresh [arg1..argn]"); @@ -2028,6 +2113,23 @@ public class DFSAdmin extends FsShell { NetUtils.getSocketFactory(conf, ClientDatanodeProtocol.class)); return dnProtocol; } + + private ReconfigurationProtocol getNameNodeProxy(String node) + throws IOException { + InetSocketAddress nodeAddr = NetUtils.createSocketAddr(node); + // Get the current configuration + Configuration conf = getConf(); + + // For namenode proxy the server principal should be NN's one. + conf.set(CommonConfigurationKeys.HADOOP_SECURITY_SERVICE_USER_NAME_KEY, + conf.get(DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY, "")); + + // Create the client + ReconfigurationProtocol reconfigProtocol = DFSUtilClient + .createReconfigurationProtocolProxy(nodeAddr, getUGI(), conf, + NetUtils.getSocketFactory(conf, ReconfigurationProtocol.class)); + return reconfigProtocol; + } private int deleteBlockPool(String[] argv, int i) throws IOException { ClientDatanodeProtocol dnProxy = getDataNodeProxy(argv[i]); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java index 3a30ccf1b92..a3ed4f6b29c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java @@ -20,6 +20,9 @@ package org.apache.hadoop.hdfs.tools; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY; import com.google.common.collect.Lists; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.ReconfigurationUtil; import org.apache.hadoop.hdfs.DFSConfigKeys; @@ -27,6 +30,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.server.common.Storage; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.StorageLocation; +import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -52,10 +56,12 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; public class TestDFSAdmin { + private static final Log LOG = LogFactory.getLog(DFSAdmin.class); private Configuration conf = null; private MiniDFSCluster cluster; private DFSAdmin admin; private DataNode datanode; + private NameNode namenode; @Before public void setUp() throws Exception { @@ -80,21 +86,64 @@ public class TestDFSAdmin { cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); cluster.waitActive(); datanode = cluster.getDataNodes().get(0); + namenode = cluster.getNameNode(); } - private List getReconfigureStatus(String nodeType, String address) - throws IOException { + private void startReconfiguration(String nodeType, String address, + final List outs, final List errs) throws IOException { + reconfigurationOutErrFormatter("startReconfiguration", nodeType, + address, outs, errs); + } + + private void getReconfigurableProperties(String nodeType, String address, + final List outs, final List errs) throws IOException { + reconfigurationOutErrFormatter("getReconfigurableProperties", nodeType, + address, outs, errs); + } + + private void getReconfigurationStatus(String nodeType, String address, + final List outs, final List errs) throws IOException { + reconfigurationOutErrFormatter("getReconfigurationStatus", nodeType, + address, outs, errs); + } + + private void reconfigurationOutErrFormatter(String methodName, + String nodeType, String address, final List outs, + final List errs) throws IOException { ByteArrayOutputStream bufOut = new ByteArrayOutputStream(); PrintStream out = new PrintStream(bufOut); ByteArrayOutputStream bufErr = new ByteArrayOutputStream(); PrintStream err = new PrintStream(bufErr); - admin.getReconfigurationStatus(nodeType, address, out, err); - Scanner scanner = new Scanner(bufOut.toString()); - List outputs = Lists.newArrayList(); - while (scanner.hasNextLine()) { - outputs.add(scanner.nextLine()); + + if (methodName.equals("getReconfigurableProperties")) { + admin.getReconfigurableProperties(nodeType, address, out, err); + } else if (methodName.equals("getReconfigurationStatus")) { + admin.getReconfigurationStatus(nodeType, address, out, err); + } else if (methodName.equals("startReconfiguration")) { + admin.startReconfiguration(nodeType, address, out, err); } - return outputs; + + Scanner scanner = new Scanner(bufOut.toString()); + while (scanner.hasNextLine()) { + outs.add(scanner.nextLine()); + } + scanner.close(); + scanner = new Scanner(bufErr.toString()); + while (scanner.hasNextLine()) { + errs.add(scanner.nextLine()); + } + scanner.close(); + } + + @Test(timeout = 30000) + public void testDataNodeGetReconfigurableProperties() throws IOException { + final int port = datanode.getIpcPort(); + final String address = "localhost:" + port; + final List outs = Lists.newArrayList(); + final List errs = Lists.newArrayList(); + getReconfigurableProperties("datanode", address, outs, errs); + assertEquals(3, outs.size()); + assertEquals(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, outs.get(1)); } /** @@ -103,7 +152,7 @@ public class TestDFSAdmin { * @throws IOException * @throws InterruptedException */ - private void testGetReconfigurationStatus(boolean expectedSuccuss) + private void testDataNodeGetReconfigurationStatus(boolean expectedSuccuss) throws IOException, InterruptedException { ReconfigurationUtil ru = mock(ReconfigurationUtil.class); datanode.setReconfigurationUtil(ru); @@ -130,21 +179,25 @@ public class TestDFSAdmin { assertThat(admin.startReconfiguration("datanode", address), is(0)); - List outputs = null; int count = 100; + final List outs = Lists.newArrayList(); + final List errs = Lists.newArrayList(); while (count > 0) { - outputs = getReconfigureStatus("datanode", address); - if (!outputs.isEmpty() && outputs.get(0).contains("finished")) { + outs.clear(); + errs.clear(); + getReconfigurationStatus("datanode", address, outs, errs); + if (!outs.isEmpty() && outs.get(0).contains("finished")) { break; } count--; Thread.sleep(100); } + LOG.info(String.format("count=%d", count)); assertTrue(count > 0); if (expectedSuccuss) { - assertThat(outputs.size(), is(4)); + assertThat(outs.size(), is(4)); } else { - assertThat(outputs.size(), is(6)); + assertThat(outs.size(), is(6)); } List locations = DataNode.getStorageLocations( @@ -160,55 +213,78 @@ public class TestDFSAdmin { int offset = 1; if (expectedSuccuss) { - assertThat(outputs.get(offset), + assertThat(outs.get(offset), containsString("SUCCESS: Changed property " + DFS_DATANODE_DATA_DIR_KEY)); } else { - assertThat(outputs.get(offset), + assertThat(outs.get(offset), containsString("FAILED: Change property " + DFS_DATANODE_DATA_DIR_KEY)); } - assertThat(outputs.get(offset + 1), + assertThat(outs.get(offset + 1), is(allOf(containsString("From:"), containsString("data1"), containsString("data2")))); - assertThat(outputs.get(offset + 2), + assertThat(outs.get(offset + 2), is(not(anyOf(containsString("data1"), containsString("data2"))))); - assertThat(outputs.get(offset + 2), + assertThat(outs.get(offset + 2), is(allOf(containsString("To"), containsString("data_new")))); } @Test(timeout = 30000) - public void testGetReconfigurationStatus() - throws IOException, InterruptedException { - testGetReconfigurationStatus(true); + public void testDataNodeGetReconfigurationStatus() throws IOException, + InterruptedException { + testDataNodeGetReconfigurationStatus(true); restartCluster(); - testGetReconfigurationStatus(false); - } - - private List getReconfigurationAllowedProperties( - String nodeType, String address) - throws IOException { - ByteArrayOutputStream bufOut = new ByteArrayOutputStream(); - PrintStream out = new PrintStream(bufOut); - ByteArrayOutputStream bufErr = new ByteArrayOutputStream(); - PrintStream err = new PrintStream(bufErr); - admin.getReconfigurableProperties(nodeType, address, out, err); - Scanner scanner = new Scanner(bufOut.toString()); - List outputs = Lists.newArrayList(); - while (scanner.hasNextLine()) { - outputs.add(scanner.nextLine()); - } - return outputs; + testDataNodeGetReconfigurationStatus(false); } @Test(timeout = 30000) - public void testGetReconfigAllowedProperties() throws IOException { - final int port = datanode.getIpcPort(); - final String address = "localhost:" + port; - List outputs = - getReconfigurationAllowedProperties("datanode", address); - assertEquals(3, outputs.size()); - assertEquals(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, - outputs.get(1)); + public void testNameNodeStartReconfiguration() throws IOException { + final String address = namenode.getHostAndPort(); + final List outs = Lists.newArrayList(); + final List errs = Lists.newArrayList(); + startReconfiguration("namenode", address, outs, errs); + assertEquals(0, outs.size()); + assertTrue(errs.size() > 1); + assertThat( + errs.get(0), + is(allOf(containsString("Namenode"), containsString("reconfiguring:"), + containsString("startReconfiguration"), + containsString("is not implemented"), + containsString("UnsupportedOperationException")))); + } + + @Test(timeout = 30000) + public void testNameNodeGetReconfigurableProperties() throws IOException { + final String address = namenode.getHostAndPort(); + final List outs = Lists.newArrayList(); + final List errs = Lists.newArrayList(); + getReconfigurableProperties("namenode", address, outs, errs); + assertEquals(0, outs.size()); + assertTrue(errs.size() > 1); + assertThat( + errs.get(0), + is(allOf(containsString("Namenode"), + containsString("reconfiguration:"), + containsString("listReconfigurableProperties"), + containsString("is not implemented"), + containsString("UnsupportedOperationException")))); + } + + @Test(timeout = 30000) + public void testNameNodeGetReconfigurationStatus() throws IOException { + final String address = namenode.getHostAndPort(); + final List outs = Lists.newArrayList(); + final List errs = Lists.newArrayList(); + getReconfigurationStatus("namenode", address, outs, errs); + assertEquals(0, outs.size()); + assertTrue(errs.size() > 1); + assertThat( + errs.get(0), + is(allOf(containsString("Namenode"), + containsString("reloading configuration:"), + containsString("getReconfigurationStatus"), + containsString("is not implemented"), + containsString("UnsupportedOperationException")))); } } \ No newline at end of file