HDFS-5786. Support QUERY and FINALIZE actions of rolling upgrade.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-5535@1559304 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2014-01-18 02:07:09 +00:00
parent 4a1abe5a3d
commit 09b8ce5b44
10 changed files with 163 additions and 15 deletions

View File

@ -7,3 +7,5 @@ HDFS-5535 subtasks:
HDFS-5752. Add a new DFSAdmin command to query, start and finalize rolling
upgrade. (szetszwo)
HDFS-5786. Support QUERY and FINALIZE actions of rolling upgrade. (szetszwo)

View File

@ -0,0 +1,36 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.protocol;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* Exception related to rolling upgrade.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class RollingUpgradeException extends IOException {
private static final long serialVersionUID = 1L;
public RollingUpgradeException(String msg) {
super(msg);
}
}

View File

@ -28,9 +28,15 @@ import org.apache.hadoop.classification.InterfaceStability;
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class RollingUpgradeInfo {
public static final RollingUpgradeInfo EMPTY_INFO = new RollingUpgradeInfo(0);
private long startTime;
private long finalizeTime;
public RollingUpgradeInfo(long startTime) {
this(startTime, 0L);
}
public RollingUpgradeInfo(long startTime, long finalizeTime) {
this.startTime = startTime;
this.finalizeTime = finalizeTime;

View File

@ -1001,8 +1001,9 @@ public class FSEditLog implements LogsPurgeable {
logEdit(op);
}
void logUpgradeMarker() {
void logUpgradeMarker(long startTime) {
UpgradeMarkerOp op = UpgradeMarkerOp.getInstance(cache.get());
op.setStartTime(startTime);
logEdit(op);
}

View File

@ -3284,6 +3284,8 @@ public abstract class FSEditLogOp {
* Operation corresponding to upgrade
*/
static class UpgradeMarkerOp extends FSEditLogOp { // @Idempotent
private long startTime;
public UpgradeMarkerOp() {
super(OP_UPGRADE_MARKER);
}
@ -3292,20 +3294,29 @@ public abstract class FSEditLogOp {
return (UpgradeMarkerOp) cache.get(OP_UPGRADE_MARKER);
}
void setStartTime(long startTime) {
this.startTime = startTime;
}
@Override
void readFields(DataInputStream in, int logVersion) throws IOException {
startTime = in.readLong();
}
@Override
public void writeFields(DataOutputStream out) throws IOException {
FSImageSerialization.writeLong(startTime, out);
}
@Override
protected void toXml(ContentHandler contentHandler) throws SAXException {
XMLUtils.addSaxString(contentHandler, "STARTTIME",
Long.valueOf(startTime).toString());
}
@Override
void fromXml(Stanza st) throws InvalidXmlException {
this.startTime = Long.valueOf(st.getValue("STARTTIME"));
}
@Override

View File

@ -168,6 +168,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
import org.apache.hadoop.hdfs.protocol.RollingUpgradeException;
import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffReportEntry;
@ -394,6 +395,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
private final CacheManager cacheManager;
private final DatanodeStatistics datanodeStatistics;
private RollingUpgradeInfo rollingUpgradeInfo;
// Block pool ID used by this namenode
private String blockPoolId;
@ -7054,25 +7057,75 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
}
}
RollingUpgradeInfo addUpgradeMarker() throws IOException {
final long startTime;
RollingUpgradeInfo queryRollingUpgrade() throws IOException {
checkSuperuserPrivilege();
checkOperation(OperationCategory.READ);
readLock();
try {
return rollingUpgradeInfo != null? rollingUpgradeInfo
: RollingUpgradeInfo.EMPTY_INFO;
} finally {
readUnlock();
}
}
RollingUpgradeInfo startRollingUpgrade() throws IOException {
checkSuperuserPrivilege();
checkOperation(OperationCategory.WRITE);
writeLock();
try {
checkOperation(OperationCategory.WRITE);
final String err = "Failed to start rolling upgrade";
checkNameNodeSafeMode(err);
startTime = now();
getEditLog().logUpgradeMarker();
if (rollingUpgradeInfo != null) {
throw new RollingUpgradeException(err
+ " since a rolling upgrade is already in progress."
+ "\nExisting rolling upgrade info: " + rollingUpgradeInfo);
}
final CheckpointSignature cs = getFSImage().rollEditLog();
LOG.info("Successfully rolled edit log for preparing rolling upgrade."
+ " Checkpoint signature: " + cs);
rollingUpgradeInfo = new RollingUpgradeInfo(now());
getEditLog().logUpgradeMarker(rollingUpgradeInfo.getStartTime());
} finally {
writeUnlock();
}
getEditLog().logSync();
if (auditLog.isInfoEnabled() && isExternalInvocation()) {
logAuditEvent(true, "upgrade", null, null, null);
logAuditEvent(true, "startRollingUpgrade", null, null, null);
}
return new RollingUpgradeInfo(startTime, 0L);
return rollingUpgradeInfo;
}
RollingUpgradeInfo finalizeRollingUpgrade() throws IOException {
checkSuperuserPrivilege();
checkOperation(OperationCategory.WRITE);
writeLock();
final RollingUpgradeInfo returnInfo;
try {
checkOperation(OperationCategory.WRITE);
final String err = "Failed to finalize rolling upgrade";
checkNameNodeSafeMode(err);
if (rollingUpgradeInfo == null) {
throw new RollingUpgradeException(err
+ " since there is no rolling upgrade in progress.");
}
returnInfo = new RollingUpgradeInfo(rollingUpgradeInfo.getStartTime(), now());
getFSImage().saveNamespace(this);
rollingUpgradeInfo = null;
} finally {
writeUnlock();
}
if (auditLog.isInfoEnabled() && isExternalInvocation()) {
logAuditEvent(true, "finalizeRollingUpgrade", null, null, null);
}
return returnInfo;
}
long addCacheDirective(CacheDirectiveInfo directive, EnumSet<CacheFlag> flags)

View File

@ -864,10 +864,13 @@ class NameNodeRpcServer implements NamenodeProtocols {
public RollingUpgradeInfo rollingUpgrade(RollingUpgradeAction action) throws IOException {
LOG.info("rollingUpgrade " + action);
switch(action) {
case QUERY:
return namesystem.queryRollingUpgrade();
case START:
return namesystem.addUpgradeMarker();
return namesystem.startRollingUpgrade();
case FINALIZE:
return namesystem.finalizeRollingUpgrade();
default:
// TODO: support other actions.
throw new UnsupportedActionException(action + " is not yet supported.");
}
}

View File

@ -32,7 +32,7 @@ public class TestRollingUpgrade {
* Test DFSAdmin Upgrade Command.
*/
@Test
public void testDFSAdminRollingUpgradeCommand() throws Exception {
public void testDFSAdminRollingUpgradeCommands() throws Exception {
// start a cluster
final Configuration conf = new HdfsConfiguration();
MiniDFSCluster cluster = null;
@ -42,29 +42,64 @@ public class TestRollingUpgrade {
final Path foo = new Path("/foo");
final Path bar = new Path("/bar");
final Path baz = new Path("/baz");
{
final DistributedFileSystem dfs = cluster.getFileSystem();
final DFSAdmin dfsadmin = new DFSAdmin(conf);
dfs.mkdirs(foo);
{
//illegal argument
final String[] args = {"-rollingUpgrade", "abc"};
Assert.assertTrue(dfsadmin.run(args) != 0);
}
{
//query rolling upgrade
final String[] args = {"-rollingUpgrade"};
Assert.assertEquals(0, dfsadmin.run(args));
}
{
//start rolling upgrade
final String[] args = {"-rollingUpgrade", "start"};
dfsadmin.run(args);
Assert.assertEquals(0, dfsadmin.run(args));
}
{
//query rolling upgrade
final String[] args = {"-rollingUpgrade", "query"};
Assert.assertEquals(0, dfsadmin.run(args));
}
dfs.mkdirs(bar);
{
//finalize rolling upgrade
final String[] args = {"-rollingUpgrade", "finalize"};
Assert.assertEquals(0, dfsadmin.run(args));
}
dfs.mkdirs(baz);
{
//query rolling upgrade
final String[] args = {"-rollingUpgrade"};
Assert.assertEquals(0, dfsadmin.run(args));
}
Assert.assertTrue(dfs.exists(foo));
Assert.assertTrue(dfs.exists(bar));
Assert.assertTrue(dfs.exists(baz));
}
cluster.restartNameNode();
{
final DistributedFileSystem dfs = cluster.getFileSystem();
Assert.assertTrue(dfs.exists(foo));
Assert.assertFalse(dfs.exists(bar));
Assert.assertTrue(dfs.exists(bar));
Assert.assertTrue(dfs.exists(baz));
}
} finally {
if(cluster != null) cluster.shutdown();

View File

@ -35,6 +35,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.common.Util;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
import org.apache.hadoop.util.Time;
/**
* OfflineEditsViewerHelper is a helper class for TestOfflineEditsViewer,
@ -128,7 +129,7 @@ public class OfflineEditsViewerHelper {
DFSTestUtil.runOperations(cluster, dfs, cluster.getConfiguration(0),
dfs.getDefaultBlockSize(), 0);
cluster.getNamesystem().addUpgradeMarker();
cluster.getNamesystem().getEditLog().logUpgradeMarker(Time.now());
// Force a roll so we get an OP_END_LOG_SEGMENT txn
return cluster.getNameNodeRpc().rollEditLog();

View File

@ -53,7 +53,7 @@ public class TestEditLogUpgradeMarker {
dfs.mkdirs(foo);
//add marker
namesystem.addUpgradeMarker();
namesystem.startRollingUpgrade();
dfs.mkdirs(bar);