HDFS-6004. Change DFSAdmin for rolling upgrade commands. (Contributed by szetszwo)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-5535@1571463 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
dbf14320c0
commit
c066cef587
|
@ -79,3 +79,6 @@ HDFS-5535 subtasks:
|
|||
HDFS-6005. Simplify Datanode rollback and downgrade. (Suresh Srinivas via
|
||||
Arpit Agarwal)
|
||||
|
||||
HDFS-6004. Change DFSAdmin for rolling upgrade commands. (szetszwo via
|
||||
Arpit Agarwal)
|
||||
|
||||
|
|
|
@ -85,7 +85,7 @@ public class HdfsConstants {
|
|||
}
|
||||
|
||||
public static enum RollingUpgradeAction {
|
||||
QUERY, START, FINALIZE;
|
||||
QUERY, PREPARE, FINALIZE;
|
||||
|
||||
private static final Map<String, RollingUpgradeAction> MAP
|
||||
= new HashMap<String, RollingUpgradeAction>();
|
||||
|
|
|
@ -28,19 +28,22 @@ import org.apache.hadoop.classification.InterfaceStability;
|
|||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Evolving
|
||||
public class RollingUpgradeInfo extends RollingUpgradeStatus {
|
||||
private long startTime;
|
||||
private long finalizeTime;
|
||||
private final long startTime;
|
||||
private final long finalizeTime;
|
||||
private final boolean createdRollbackImages;
|
||||
|
||||
public RollingUpgradeInfo(String blockPoolId, long startTime) {
|
||||
this(blockPoolId, startTime, 0L);
|
||||
}
|
||||
|
||||
public RollingUpgradeInfo(String blockPoolId, long startTime, long finalizeTime) {
|
||||
public RollingUpgradeInfo(String blockPoolId, boolean createdRollbackImages,
|
||||
long startTime, long finalizeTime) {
|
||||
super(blockPoolId);
|
||||
this.createdRollbackImages = createdRollbackImages;
|
||||
this.startTime = startTime;
|
||||
this.finalizeTime = finalizeTime;
|
||||
}
|
||||
|
||||
public boolean createdRollbackImages() {
|
||||
return createdRollbackImages;
|
||||
}
|
||||
|
||||
public boolean isStarted() {
|
||||
return startTime != 0;
|
||||
}
|
||||
|
|
|
@ -1458,7 +1458,7 @@ public class PBHelper {
|
|||
switch (a) {
|
||||
case QUERY:
|
||||
return RollingUpgradeActionProto.QUERY;
|
||||
case START:
|
||||
case PREPARE:
|
||||
return RollingUpgradeActionProto.START;
|
||||
case FINALIZE:
|
||||
return RollingUpgradeActionProto.FINALIZE;
|
||||
|
@ -1472,7 +1472,7 @@ public class PBHelper {
|
|||
case QUERY:
|
||||
return RollingUpgradeAction.QUERY;
|
||||
case START:
|
||||
return RollingUpgradeAction.START;
|
||||
return RollingUpgradeAction.PREPARE;
|
||||
case FINALIZE:
|
||||
return RollingUpgradeAction.FINALIZE;
|
||||
default:
|
||||
|
@ -1494,6 +1494,7 @@ public class PBHelper {
|
|||
public static RollingUpgradeInfoProto convert(RollingUpgradeInfo info) {
|
||||
return RollingUpgradeInfoProto.newBuilder()
|
||||
.setStatus(convertRollingUpgradeStatus(info))
|
||||
.setCreatedRollbackImages(info.createdRollbackImages())
|
||||
.setStartTime(info.getStartTime())
|
||||
.setFinalizeTime(info.getFinalizeTime())
|
||||
.build();
|
||||
|
@ -1502,6 +1503,7 @@ public class PBHelper {
|
|||
public static RollingUpgradeInfo convert(RollingUpgradeInfoProto proto) {
|
||||
RollingUpgradeStatusProto status = proto.getStatus();
|
||||
return new RollingUpgradeInfo(status.getBlockPoolId(),
|
||||
proto.getCreatedRollbackImages(),
|
||||
proto.getStartTime(), proto.getFinalizeTime());
|
||||
}
|
||||
|
||||
|
|
|
@ -281,7 +281,7 @@ public final class FSImageFormatProtobuf {
|
|||
fsn.setLastAllocatedBlockId(s.getLastAllocatedBlockId());
|
||||
imgTxId = s.getTransactionId();
|
||||
if (s.hasRollingUpgradeStartTime()) {
|
||||
fsn.setRollingUpgradeInfo(s.getRollingUpgradeStartTime());
|
||||
fsn.setRollingUpgradeInfo(true, s.getRollingUpgradeStartTime());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -7176,11 +7176,12 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
getFSImage().saveNamespace(this, NameNodeFile.IMAGE_ROLLBACK, null);
|
||||
LOG.info("Successfully saved namespace for preparing rolling upgrade.");
|
||||
}
|
||||
setRollingUpgradeInfo(startTime);
|
||||
setRollingUpgradeInfo(true, startTime);
|
||||
}
|
||||
|
||||
void setRollingUpgradeInfo(long startTime) {
|
||||
rollingUpgradeInfo = new RollingUpgradeInfo(blockPoolId, startTime);
|
||||
void setRollingUpgradeInfo(boolean createdRollbackImages, long startTime) {
|
||||
rollingUpgradeInfo = new RollingUpgradeInfo(blockPoolId,
|
||||
createdRollbackImages, startTime, 0L);
|
||||
}
|
||||
|
||||
RollingUpgradeInfo getRollingUpgradeInfo() {
|
||||
|
@ -7234,7 +7235,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
|
||||
final long startTime = rollingUpgradeInfo.getStartTime();
|
||||
rollingUpgradeInfo = null;
|
||||
return new RollingUpgradeInfo(blockPoolId, startTime, finalizeTime);
|
||||
return new RollingUpgradeInfo(blockPoolId, true, startTime, finalizeTime);
|
||||
}
|
||||
|
||||
long addCacheDirective(CacheDirectiveInfo directive, EnumSet<CacheFlag> flags)
|
||||
|
|
|
@ -868,7 +868,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
|
|||
switch(action) {
|
||||
case QUERY:
|
||||
return namesystem.queryRollingUpgrade();
|
||||
case START:
|
||||
case PREPARE:
|
||||
return namesystem.startRollingUpgrade();
|
||||
case FINALIZE:
|
||||
return namesystem.finalizeRollingUpgrade();
|
||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.tools;
|
|||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.PrintStream;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.URI;
|
||||
import java.net.URL;
|
||||
|
@ -280,11 +281,11 @@ public class DFSAdmin extends FsShell {
|
|||
|
||||
private static class RollingUpgradeCommand {
|
||||
static final String NAME = "rollingUpgrade";
|
||||
static final String USAGE = "-"+NAME+" [<query|start|finalize>]";
|
||||
static final String USAGE = "-"+NAME+" [<query|prepare|finalize>]";
|
||||
static final String DESCRIPTION = USAGE + ":\n"
|
||||
+ " query: query current rolling upgrade status.\n"
|
||||
+ " start: start rolling upgrade."
|
||||
+ " finalize: finalize rolling upgrade.";
|
||||
+ " query: query the current rolling upgrade status.\n"
|
||||
+ " prepare: prepare a new rolling upgrade."
|
||||
+ " finalize: finalize the current rolling upgrade.";
|
||||
|
||||
/** Check if a command is the rollingUpgrade command
|
||||
*
|
||||
|
@ -295,6 +296,27 @@ public class DFSAdmin extends FsShell {
|
|||
return ("-"+NAME).equals(cmd);
|
||||
}
|
||||
|
||||
private static void printMessage(RollingUpgradeInfo info,
|
||||
PrintStream out) {
|
||||
if (info != null && info.isStarted()) {
|
||||
if (!info.createdRollbackImages()) {
|
||||
out.println(
|
||||
"Preparing for upgrade. Data is being saved for rollback."
|
||||
+ "\nRun \"dfsadmin -rollingUpgrade query\" to check the status"
|
||||
+ "\nfor proceeding with rolling upgrade");
|
||||
out.println(info);
|
||||
} else if (!info.isFinalized()) {
|
||||
out.println("Proceed with rolling upgrade:");
|
||||
out.println(info);
|
||||
} else {
|
||||
out.println("Rolling upgrade is finalized.");
|
||||
out.println(info);
|
||||
}
|
||||
} else {
|
||||
out.println("There is no rolling upgrade in progress.");
|
||||
}
|
||||
}
|
||||
|
||||
static int run(DistributedFileSystem dfs, String[] argv, int idx) throws IOException {
|
||||
final RollingUpgradeAction action = RollingUpgradeAction.fromString(
|
||||
argv.length >= 2? argv[1]: "");
|
||||
|
@ -308,24 +330,15 @@ public class DFSAdmin extends FsShell {
|
|||
final RollingUpgradeInfo info = dfs.rollingUpgrade(action);
|
||||
switch(action){
|
||||
case QUERY:
|
||||
if (info != null && info.isStarted()) {
|
||||
System.out.println("Rolling upgrade is in progress:");
|
||||
System.out.println(info);
|
||||
} else {
|
||||
System.out.println("There is no rolling upgrade in progress.");
|
||||
}
|
||||
break;
|
||||
case START:
|
||||
case PREPARE:
|
||||
Preconditions.checkState(info.isStarted());
|
||||
System.out.println("Rolling upgrade is started:");
|
||||
System.out.println(info);
|
||||
break;
|
||||
case FINALIZE:
|
||||
Preconditions.checkState(info.isFinalized());
|
||||
System.out.println("Rolling upgrade is finalized:");
|
||||
System.out.println(info);
|
||||
break;
|
||||
}
|
||||
printMessage(info, System.out);
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
@ -745,17 +758,17 @@ public class DFSAdmin extends FsShell {
|
|||
String disallowSnapshot = "-disallowSnapshot <snapshotDir>:\n" +
|
||||
"\tDo not allow snapshots to be taken on a directory any more.\n";
|
||||
|
||||
String shutdownDatanode = "-shutdownDatanode <datanode_host:ipc_port> [upgrade]\n" +
|
||||
"\tShut down the datanode. If an optional argument \"upgrade\" is\n" +
|
||||
"\tpassed, the clients will be advised to wait for the datanode to\n" +
|
||||
"\trestart and the fast start-up mode will be enabled. Clients will\n" +
|
||||
"\ttimeout and ignore the datanode, if the restart does not happen\n" +
|
||||
"\tin time. The fast start-up mode will also be disabled, if restart\n" +
|
||||
"\tis delayed too much.\n";
|
||||
String shutdownDatanode = "-shutdownDatanode <datanode_host:ipc_port> [upgrade]\n"
|
||||
+ "\tSubmit a shutdown request for the given datanode. If an optional\n"
|
||||
+ "\t\"upgrade\" argument is specified, clients accessing the datanode\n"
|
||||
+ "\twill be advised to wait for it to restart and the fast start-up\n"
|
||||
+ "\tmode will be enabled. When the restart does not happen in time,\n"
|
||||
+ "\tclients will timeout and ignore the datanode. In such case, the\n"
|
||||
+ "\tfast start-up mode will also be disabled.\n";
|
||||
|
||||
String getDatanodeInfo = "-getDatanodeInfo <datanode_host:ipc_port>\n" +
|
||||
"\tCheck the datanode for liveness. If the datanode responds,\n" +
|
||||
"\timore information about the datanode is printed.\n";
|
||||
String getDatanodeInfo = "-getDatanodeInfo <datanode_host:ipc_port>\n"
|
||||
+ "\tGet the information about the given datanode. This command can\n"
|
||||
+ "\tbe used for checking if a datanode is alive.\n";
|
||||
|
||||
String help = "-help [cmd]: \tDisplays help for the given command or all commands if none\n" +
|
||||
"\t\tis specified.\n";
|
||||
|
@ -1401,10 +1414,11 @@ public class DFSAdmin extends FsShell {
|
|||
}
|
||||
|
||||
private int shutdownDatanode(String[] argv, int i) throws IOException {
|
||||
ClientDatanodeProtocol dnProxy = getDataNodeProxy(argv[i]);
|
||||
final String dn = argv[i];
|
||||
ClientDatanodeProtocol dnProxy = getDataNodeProxy(dn);
|
||||
boolean upgrade = false;
|
||||
if (argv.length-1 == i+1) {
|
||||
if ("upgrade".equals(argv[i+1])) {
|
||||
if ("upgrade".equalsIgnoreCase(argv[i+1])) {
|
||||
upgrade = true;
|
||||
} else {
|
||||
printUsage("-shutdownDatanode");
|
||||
|
@ -1412,6 +1426,7 @@ public class DFSAdmin extends FsShell {
|
|||
}
|
||||
}
|
||||
dnProxy.shutdownDatanode(upgrade);
|
||||
System.out.println("Submitted a shutdown request to datanode " + dn);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -347,6 +347,7 @@ message RollingUpgradeInfoProto {
|
|||
required RollingUpgradeStatusProto status = 1;
|
||||
required uint64 startTime = 2;
|
||||
required uint64 finalizeTime = 3;
|
||||
required bool createdRollbackImages = 4;
|
||||
}
|
||||
|
||||
message RollingUpgradeResponseProto {
|
||||
|
|
|
@ -236,7 +236,7 @@ public class TestDFSUpgrade {
|
|||
try {
|
||||
final DistributedFileSystem dfs = cluster.getFileSystem();
|
||||
dfs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE);
|
||||
dfs.rollingUpgrade(RollingUpgradeAction.START);
|
||||
dfs.rollingUpgrade(RollingUpgradeAction.PREPARE);
|
||||
fail();
|
||||
} catch(RemoteException re) {
|
||||
assertEquals(InconsistentFSStateException.class.getName(),
|
||||
|
@ -379,7 +379,7 @@ public class TestDFSUpgrade {
|
|||
try {
|
||||
final DistributedFileSystem dfs = cluster.getFileSystem();
|
||||
dfs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE);
|
||||
dfs.rollingUpgrade(RollingUpgradeAction.START);
|
||||
dfs.rollingUpgrade(RollingUpgradeAction.PREPARE);
|
||||
fail();
|
||||
} catch(RemoteException re) {
|
||||
assertEquals(InconsistentFSStateException.class.getName(),
|
||||
|
|
|
@ -83,7 +83,7 @@ public class TestRollingUpgrade {
|
|||
runCmd(dfsadmin, true, "-rollingUpgrade");
|
||||
|
||||
//start rolling upgrade
|
||||
runCmd(dfsadmin, true, "-rollingUpgrade", "start");
|
||||
runCmd(dfsadmin, true, "-rollingUpgrade", "prepare");
|
||||
|
||||
//query rolling upgrade
|
||||
runCmd(dfsadmin, true, "-rollingUpgrade", "query");
|
||||
|
@ -182,7 +182,7 @@ public class TestRollingUpgrade {
|
|||
dfs.mkdirs(foo);
|
||||
|
||||
//start rolling upgrade
|
||||
info1 = dfs.rollingUpgrade(RollingUpgradeAction.START);
|
||||
info1 = dfs.rollingUpgrade(RollingUpgradeAction.PREPARE);
|
||||
LOG.info("START\n" + info1);
|
||||
|
||||
//query rolling upgrade
|
||||
|
@ -293,7 +293,7 @@ public class TestRollingUpgrade {
|
|||
final DistributedFileSystem dfs = cluster.getFileSystem();
|
||||
|
||||
//start rolling upgrade
|
||||
dfs.rollingUpgrade(RollingUpgradeAction.START);
|
||||
dfs.rollingUpgrade(RollingUpgradeAction.PREPARE);
|
||||
|
||||
dfs.mkdirs(bar);
|
||||
|
||||
|
@ -358,7 +358,7 @@ public class TestRollingUpgrade {
|
|||
dfs.mkdirs(foo);
|
||||
|
||||
// start rolling upgrade
|
||||
RollingUpgradeInfo info = dfs.rollingUpgrade(RollingUpgradeAction.START);
|
||||
RollingUpgradeInfo info = dfs.rollingUpgrade(RollingUpgradeAction.PREPARE);
|
||||
Assert.assertTrue(info.isStarted());
|
||||
dfs.mkdirs(bar);
|
||||
dfs.close();
|
||||
|
@ -417,7 +417,7 @@ public class TestRollingUpgrade {
|
|||
.getStorage();
|
||||
|
||||
// start rolling upgrade
|
||||
RollingUpgradeInfo info = dfs.rollingUpgrade(RollingUpgradeAction.START);
|
||||
RollingUpgradeInfo info = dfs.rollingUpgrade(RollingUpgradeAction.PREPARE);
|
||||
Assert.assertTrue(info.isStarted());
|
||||
dfs.mkdirs(bar);
|
||||
// The NN should have a copy of the fsimage in case of rollbacks.
|
||||
|
|
|
@ -102,7 +102,7 @@ public class TestRollingUpgradeRollback {
|
|||
|
||||
// start rolling upgrade
|
||||
Assert.assertEquals(0,
|
||||
dfsadmin.run(new String[] { "-rollingUpgrade", "start" }));
|
||||
dfsadmin.run(new String[] { "-rollingUpgrade", "prepare" }));
|
||||
// create new directory
|
||||
dfs.mkdirs(bar);
|
||||
|
||||
|
@ -161,7 +161,7 @@ public class TestRollingUpgradeRollback {
|
|||
|
||||
// start rolling upgrade
|
||||
Assert.assertEquals(0,
|
||||
dfsadmin.run(new String[] { "-rollingUpgrade", "start" }));
|
||||
dfsadmin.run(new String[] { "-rollingUpgrade", "prepare" }));
|
||||
// create new directory
|
||||
dfs.mkdirs(bar);
|
||||
dfs.close();
|
||||
|
@ -216,7 +216,7 @@ public class TestRollingUpgradeRollback {
|
|||
dfs.mkdirs(foo);
|
||||
|
||||
// start rolling upgrade
|
||||
RollingUpgradeInfo info = dfs.rollingUpgrade(RollingUpgradeAction.START);
|
||||
RollingUpgradeInfo info = dfs.rollingUpgrade(RollingUpgradeAction.PREPARE);
|
||||
Assert.assertTrue(info.isStarted());
|
||||
|
||||
// create new directory
|
||||
|
|
|
@ -18,11 +18,26 @@
|
|||
|
||||
package org.apache.hadoop.hdfs.server.datanode;
|
||||
|
||||
import static org.hamcrest.core.Is.is;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertThat;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.*;
|
||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster.Builder;
|
||||
import org.apache.hadoop.hdfs.TestRollingUpgrade;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||
|
@ -30,13 +45,6 @@ import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
|||
import org.apache.hadoop.hdfs.tools.DFSAdmin;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
|
||||
import static org.apache.hadoop.hdfs.MiniDFSCluster.*;
|
||||
import static org.hamcrest.core.Is.is;
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
/**
|
||||
* Ensure that the DataNode correctly handles rolling upgrade
|
||||
* finalize and rollback.
|
||||
|
@ -142,7 +150,7 @@ public class TestDataNodeRollingUpgrade {
|
|||
private void startRollingUpgrade() throws Exception {
|
||||
LOG.info("Starting rolling upgrade");
|
||||
final DFSAdmin dfsadmin = new DFSAdmin(conf);
|
||||
TestRollingUpgrade.runCmd(dfsadmin, true, "-rollingUpgrade", "start");
|
||||
TestRollingUpgrade.runCmd(dfsadmin, true, "-rollingUpgrade", "prepare");
|
||||
triggerHeartBeats();
|
||||
|
||||
// Ensure datanode rolling upgrade is started
|
||||
|
|
Loading…
Reference in New Issue