From 1a68f13521e6902943bea7b16369ac9a4e6943f5 Mon Sep 17 00:00:00 2001 From: Kihwal Lee Date: Fri, 14 Feb 2014 21:20:02 +0000 Subject: [PATCH] HDFS-5585. Provide admin commands for data node upgrade. Contributed by Kihwal Lee. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-5535@1568523 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop-hdfs/CHANGES_HDFS-5535.txt | 1 + .../hdfs/protocol/ClientDatanodeProtocol.java | 18 ++++++ ...atanodeProtocolServerSideTranslatorPB.java | 31 +++++++++ .../ClientDatanodeProtocolTranslatorPB.java | 29 +++++++++ .../hadoop/hdfs/protocolPB/PBHelper.java | 15 +++++ .../hadoop/hdfs/server/datanode/DataNode.java | 35 ++++++++++ .../apache/hadoop/hdfs/tools/DFSAdmin.java | 64 +++++++++++++++++++ .../main/proto/ClientDatanodeProtocol.proto | 28 ++++++++ .../hadoop-hdfs/src/main/proto/hdfs.proto | 9 +++ .../hadoop/hdfs/TestRollingUpgrade.java | 32 ++++++++++ 10 files changed, 262 insertions(+) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-5535.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-5535.txt index e94277daa4f..c1b2d758398 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-5535.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-5535.txt @@ -39,3 +39,4 @@ HDFS-5535 subtasks: HDFS-5494. Merge Protobuf-based-FSImage code from trunk - fix build break after the merge. (Jing Zhao via Arpit Agarwal) + HDFS-5585. Provide admin commands for data node upgrade (kihwal) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java index f2b8cc308a6..3ced32cd8e7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java @@ -124,4 +124,22 @@ public interface ClientDatanodeProtocol { */ HdfsBlocksMetadata getHdfsBlocksMetadata(List blocks, List> tokens) throws IOException; + + /** + * Shuts down a datanode. + * + * @param forUpgrade If true, data node does extra prep work before shutting + * down. The work includes advising clients to wait and saving + * certain states for quick restart. This should only be used when + * the stored data will remain the same during upgrade/restart. + * @throws IOException + */ + void shutdownDatanode(boolean forUpgrade) throws IOException; + + /** + * Obtains datanode info + * + * @return software/config version and uptime of the datanode + */ + DatanodeLocalInfo getDatanodeInfo() throws IOException; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java index 210f3345972..08617793b7f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java @@ -30,6 +30,8 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.Delete import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.DeleteBlockPoolResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBlockLocalPathInfoRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBlockLocalPathInfoResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetDatanodeInfoRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetDatanodeInfoResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetHdfsBlockLocationsRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetHdfsBlockLocationsResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetHdfsBlockLocationsResponseProto.Builder; @@ -37,10 +39,13 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetRep import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReplicaVisibleLengthResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.RefreshNamenodesRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.RefreshNamenodesResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.ShutdownDatanodeRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.ShutdownDatanodeResponseProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExtendedBlockProto; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.security.proto.SecurityProtos.TokenProto; import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.util.VersionInfo; import com.google.protobuf.ByteString; import com.google.protobuf.RpcController; @@ -58,6 +63,8 @@ public class ClientDatanodeProtocolServerSideTranslatorPB implements RefreshNamenodesResponseProto.newBuilder().build(); private final static DeleteBlockPoolResponseProto DELETE_BLOCKPOOL_RESP = DeleteBlockPoolResponseProto.newBuilder().build(); + private final static ShutdownDatanodeResponseProto SHUTDOWN_DATANODE_RESP = + ShutdownDatanodeResponseProto.newBuilder().build(); private final ClientDatanodeProtocol impl; @@ -152,4 +159,28 @@ public class ClientDatanodeProtocolServerSideTranslatorPB implements builder.addAllVolumeIndexes(resp.getVolumeIndexes()); return builder.build(); } + + @Override + public ShutdownDatanodeResponseProto shutdownDatanode( + RpcController unused, ShutdownDatanodeRequestProto request) + throws ServiceException { + try { + impl.shutdownDatanode(request.getForUpgrade()); + } catch (IOException e) { + throw new ServiceException(e); + } + return SHUTDOWN_DATANODE_RESP; + } + + public GetDatanodeInfoResponseProto getDatanodeInfo(RpcController unused, + GetDatanodeInfoRequestProto request) throws ServiceException { + GetDatanodeInfoResponseProto res; + try { + res = GetDatanodeInfoResponseProto.newBuilder() + .setLocalInfo(PBHelper.convert(impl.getDatanodeInfo())).build(); + } catch (IOException e) { + throw new ServiceException(e); + } + return res; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java index dedba5af276..06f7cdc2e29 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java @@ -34,16 +34,20 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdfs.protocol.DatanodeLocalInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.DeleteBlockPoolRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBlockLocalPathInfoRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBlockLocalPathInfoResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetDatanodeInfoRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetDatanodeInfoResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetHdfsBlockLocationsRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetHdfsBlockLocationsResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReplicaVisibleLengthRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.RefreshNamenodesRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.ShutdownDatanodeRequestProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExtendedBlockProto; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.ipc.ProtobufHelper; @@ -79,6 +83,8 @@ public class ClientDatanodeProtocolTranslatorPB implements private final ClientDatanodeProtocolPB rpcProxy; private final static RefreshNamenodesRequestProto VOID_REFRESH_NAMENODES = RefreshNamenodesRequestProto.newBuilder().build(); + private final static GetDatanodeInfoRequestProto VOID_GET_DATANODE_INFO = + GetDatanodeInfoRequestProto.newBuilder().build(); public ClientDatanodeProtocolTranslatorPB(DatanodeID datanodeid, Configuration conf, int socketTimeout, boolean connectToDnViaHostname, @@ -255,4 +261,27 @@ public class ClientDatanodeProtocolTranslatorPB implements return new HdfsBlocksMetadata(blocks.toArray(new ExtendedBlock[] {}), volumeIds, volumeIndexes); } + + @Override + public void shutdownDatanode(boolean forUpgrade) throws IOException { + ShutdownDatanodeRequestProto request = ShutdownDatanodeRequestProto + .newBuilder().setForUpgrade(forUpgrade).build(); + try { + rpcProxy.shutdownDatanode(NULL_CONTROLLER, request); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + + @Override + public DatanodeLocalInfo getDatanodeInfo() throws IOException { + GetDatanodeInfoResponseProto response; + try { + response = rpcProxy.getDatanodeInfo(NULL_CONTROLLER, VOID_GET_DATANODE_INFO); + return PBHelper.convert(response.getLocalInfo()); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java index 0d03f2d6916..554bc64ed5d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java @@ -49,6 +49,7 @@ import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates; +import org.apache.hadoop.hdfs.protocol.DatanodeLocalInfo; import org.apache.hadoop.hdfs.protocol.DirectoryListing; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; @@ -107,6 +108,7 @@ import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeIDProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeInfoProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeInfoProto.AdminState; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeInfosProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeLocalInfoProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DirectoryListingProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExportedBlockKeysProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExtendedBlockProto; @@ -1935,6 +1937,19 @@ public class PBHelper { return HdfsProtos.ChecksumTypeProto.valueOf(type.id); } + public static DatanodeLocalInfoProto convert(DatanodeLocalInfo info) { + DatanodeLocalInfoProto.Builder builder = DatanodeLocalInfoProto.newBuilder(); + builder.setSoftwareVersion(info.getSoftwareVersion()); + builder.setConfigVersion(info.getConfigVersion()); + builder.setUptime(info.getUptime()); + return builder.build(); + } + + public static DatanodeLocalInfo convert(DatanodeLocalInfoProto proto) { + return new DatanodeLocalInfo(proto.getSoftwareVersion(), + proto.getConfigVersion(), proto.getUptime()); + } + public static InputStream vintPrefixed(final InputStream input) throws IOException { final int firstByte = input.read(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index d67ecac8260..62185298a83 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -29,6 +29,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.io.PrintStream; +import java.lang.management.ManagementFactory; import java.net.InetSocketAddress; import java.net.Socket; import java.net.SocketException; @@ -73,6 +74,7 @@ import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.DatanodeLocalInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata; import org.apache.hadoop.hdfs.protocol.HdfsConstants; @@ -272,6 +274,7 @@ public class DataNode extends Configured private SecureResources secureResources = null; private List dataDirs; private Configuration conf; + private String confVersion; private final long maxNumberOfBlocksToLog; private final List usersWithLocalPathAccess; @@ -300,6 +303,11 @@ public class DataNode extends Configured DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT); + confVersion = "core-" + + conf.get("hadoop.common.configuration.version", "UNSPECIFIED") + + ",hdfs-" + + conf.get("hadoop.hdfs.configuration.version", "UNSPECIFIED"); + // Determine whether we should try to pass file descriptors to clients. if (conf.getBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT)) { @@ -2472,6 +2480,33 @@ public class DataNode extends Configured data.deleteBlockPool(blockPoolId, force); } + @Override // ClientDatanodeProtocol + public void shutdownDatanode(boolean forUpgrade) throws IOException { + LOG.info("shutdownDatanode command received (upgrade=" + forUpgrade + + "). Shutting down Datanode..."); + + // Delay start the shutdown process so that the rpc response can be + // sent back. + Thread shutdownThread = new Thread() { + @Override public void run() { + try { + Thread.sleep(1000); + } catch (InterruptedException ie) { } + shutdown(); + } + }; + + shutdownThread.setDaemon(true); + shutdownThread.start(); + } + + @Override //ClientDatanodeProtocol + public DatanodeLocalInfo getDatanodeInfo() { + long uptime = ManagementFactory.getRuntimeMXBean().getUptime()/1000; + return new DatanodeLocalInfo(VersionInfo.getVersion(), + confVersion, uptime); + } + /** * @param addr rpc address of the namenode * @return true if the datanode is connected to a NameNode at the diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java index 269efb5c74e..ae96c0f5c97 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java @@ -49,6 +49,7 @@ import org.apache.hadoop.hdfs.NameNodeProxies; import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.DatanodeLocalInfo; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction; @@ -645,6 +646,8 @@ public class DFSAdmin extends FsShell { "\t[-fetchImage ]\n" + "\t[-allowSnapshot ]\n" + "\t[-disallowSnapshot ]\n" + + "\t[-shutdownDatanode [upgrade]]\n" + + "\t[-getDatanodeInfo \n" + "\t[-help [cmd]]\n"; String report ="-report: \tReports basic filesystem information and statistics.\n"; @@ -741,6 +744,18 @@ public class DFSAdmin extends FsShell { String disallowSnapshot = "-disallowSnapshot :\n" + "\tDo not allow snapshots to be taken on a directory any more.\n"; + + String shutdownDatanode = "-shutdownDatanode [upgrade]\n" + + "\tShut down the datanode. If an optional argument \"upgrade\" is\n" + + "\tpassed, the clients will be advised to wait for the datanode to\n" + + "\trestart and the fast start-up mode will be enabled. Clients will\n" + + "\ttimeout and ignore the datanode, if the restart does not happen\n" + + "\tin time. The fast start-up mode will also be disabled, if restart\n" + + "\tis delayed too much.\n"; + + String getDatanodeInfo = "-getDatanodeInfo \n" + + "\tCheck the datanode for liveness. If the datanode responds,\n" + + "\timore information about the datanode is printed.\n"; String help = "-help [cmd]: \tDisplays help for the given command or all commands if none\n" + "\t\tis specified.\n"; @@ -791,6 +806,10 @@ public class DFSAdmin extends FsShell { System.out.println(allowSnapshot); } else if ("disallowSnapshot".equalsIgnoreCase(cmd)) { System.out.println(disallowSnapshot); + } else if ("shutdownDatanode".equalsIgnoreCase(cmd)) { + System.out.println(shutdownDatanode); + } else if ("getDatanodeInfo".equalsIgnoreCase(cmd)) { + System.out.println(getDatanodeInfo); } else if ("help".equals(cmd)) { System.out.println(help); } else { @@ -818,6 +837,8 @@ public class DFSAdmin extends FsShell { System.out.println(fetchImage); System.out.println(allowSnapshot); System.out.println(disallowSnapshot); + System.out.println(shutdownDatanode); + System.out.println(getDatanodeInfo); System.out.println(help); System.out.println(); ToolRunner.printGenericCommandUsage(System.out); @@ -1100,6 +1121,8 @@ public class DFSAdmin extends FsShell { System.err.println(" ["+ClearSpaceQuotaCommand.USAGE+"]"); System.err.println(" [-setBalancerBandwidth ]"); System.err.println(" [-fetchImage ]"); + System.err.println(" [-shutdownDatanode [upgrade]]"); + System.err.println(" [-getDatanodeInfo ]"); System.err.println(" [-help [cmd]]"); System.err.println(); ToolRunner.printGenericCommandUsage(System.err); @@ -1216,6 +1239,16 @@ public class DFSAdmin extends FsShell { printUsage(cmd); return exitCode; } + } else if ("-shutdownDatanode".equals(cmd)) { + if ((argv.length != 2) && (argv.length != 3)) { + printUsage(cmd); + return exitCode; + } + } else if ("-getDatanodeInfo".equals(cmd)) { + if (argv.length != 2) { + printUsage(cmd); + return exitCode; + } } // initialize DFSAdmin @@ -1279,6 +1312,10 @@ public class DFSAdmin extends FsShell { exitCode = setBalancerBandwidth(argv, i); } else if ("-fetchImage".equals(cmd)) { exitCode = fetchImage(argv, i); + } else if ("-shutdownDatanode".equals(cmd)) { + exitCode = shutdownDatanode(argv, i); + } else if ("-getDatanodeInfo".equals(cmd)) { + exitCode = getDatanodeInfo(argv, i); } else if ("-help".equals(cmd)) { if (i < argv.length) { printHelp(argv[i]); @@ -1363,6 +1400,33 @@ public class DFSAdmin extends FsShell { return 0; } + private int shutdownDatanode(String[] argv, int i) throws IOException { + ClientDatanodeProtocol dnProxy = getDataNodeProxy(argv[i]); + boolean upgrade = false; + if (argv.length-1 == i+1) { + if ("upgrade".equals(argv[i+1])) { + upgrade = true; + } else { + printUsage("-shutdownDatanode"); + return -1; + } + } + dnProxy.shutdownDatanode(upgrade); + return 0; + } + + private int getDatanodeInfo(String[] argv, int i) throws IOException { + ClientDatanodeProtocol dnProxy = getDataNodeProxy(argv[i]); + try { + DatanodeLocalInfo dnInfo = dnProxy.getDatanodeInfo(); + System.out.println(dnInfo.getDatanodeLocalReport()); + } catch (IOException ioe) { + System.err.println("Datanode unreachable."); + return -1; + } + return 0; + } + /** * main() has some simple utility methods. * @param argv Command line parameters. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientDatanodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientDatanodeProtocol.proto index e62dbbc01d5..3d2b5a367ff 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientDatanodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientDatanodeProtocol.proto @@ -121,6 +121,28 @@ message GetHdfsBlockLocationsResponseProto { repeated uint32 volumeIndexes = 2; } +/** + * forUpgrade - if true, clients are advised to wait for restart and quick + * upgrade restart is instrumented. Otherwise, datanode does + * the regular shutdown. + */ +message ShutdownDatanodeRequestProto { + required bool forUpgrade = 1; +} + +message ShutdownDatanodeResponseProto { +} + +/** + * Ping datanode for liveness and quick info + */ +message GetDatanodeInfoRequestProto { +} + +message GetDatanodeInfoResponseProto { + required DatanodeLocalInfoProto localInfo = 1; +} + /** * Protocol used from client to the Datanode. * See the request and response for details of rpc call. @@ -158,4 +180,10 @@ service ClientDatanodeProtocolService { */ rpc getHdfsBlockLocations(GetHdfsBlockLocationsRequestProto) returns(GetHdfsBlockLocationsResponseProto); + + rpc shutdownDatanode(ShutdownDatanodeRequestProto) + returns(ShutdownDatanodeResponseProto); + + rpc getDatanodeInfo(GetDatanodeInfoRequestProto) + returns(GetDatanodeInfoResponseProto); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto index b94c6220954..384182748d7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto @@ -60,6 +60,15 @@ message DatanodeIDProto { optional uint32 infoSecurePort = 7 [default = 0]; // datanode https port } +/** + * Datanode local information + */ +message DatanodeLocalInfoProto { + required string softwareVersion = 1; + required string configVersion = 2; + required uint64 uptime = 3; +} + /** * DatanodeInfo array */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRollingUpgrade.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRollingUpgrade.java index 77658f185da..39fd39042dd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRollingUpgrade.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRollingUpgrade.java @@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction; import org.apache.hadoop.hdfs.protocol.RollingUpgradeException; import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo; @@ -262,4 +263,35 @@ public class TestRollingUpgrade { RollingUpgradeStartupOption.STARTED.name()}; SecondaryNameNode.main(args); } + + @Test + public void testDFSAdminDatanodeUpgradeControlCommands() throws Exception { + // start a cluster + final Configuration conf = new HdfsConfiguration(); + MiniDFSCluster cluster = null; + try { + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); + cluster.waitActive(); + final DFSAdmin dfsadmin = new DFSAdmin(conf); + DataNode dn = cluster.getDataNodes().get(0); + + // check the datanode + final String dnAddr = dn.getDatanodeId().getIpcAddr(false); + final String[] args1 = {"-getDatanodeInfo", dnAddr}; + Assert.assertEquals(0, dfsadmin.run(args1)); + + // issue shutdown to the datanode. + final String[] args2 = {"-shutdownDatanode", dnAddr, "upgrade" }; + Assert.assertEquals(0, dfsadmin.run(args2)); + + // the datanode should be down. + Thread.sleep(2000); + Assert.assertFalse("DataNode should exit", dn.isDatanodeUp()); + + // ping should fail. + Assert.assertEquals(-1, dfsadmin.run(args1)); + } finally { + if (cluster != null) cluster.shutdown(); + } + } }