HDFS-2686. Remove DistributedUpgrade related code. Contributed by Suresh Srinivas

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1375800 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Suresh Srinivas 2012-08-21 21:18:40 +00:00
parent e8477759ac
commit 6c0ccb5989
38 changed files with 75 additions and 1347 deletions

View File

@ -120,6 +120,8 @@ Trunk (unreleased changes)
HDFS-3803. Change BlockPoolSliceScanner chatty INFO log to DEBUG.
(Andrew Purtell via suresh)
HDFS-2686. Remove DistributedUpgrade related code. (suresh)
OPTIMIZATIONS
BUG FIXES

View File

@ -695,8 +695,9 @@ public boolean restoreFailedStorage(String arg)
public void finalizeUpgrade() throws IOException;
/**
* Report distributed upgrade progress or force current upgrade to proceed.
* <em>Method no longer used - retained only for backward compatibility</em>
*
* Report distributed upgrade progress or force current upgrade to proceed.
* @param action {@link HdfsConstants.UpgradeAction} to perform
* @return upgrade status information or null if no upgrades are in progress
* @throws IOException

View File

@ -389,8 +389,8 @@ public static BlockKey[] convertBlockKeys(List<BlockKeyProto> list) {
public static NamespaceInfo convert(NamespaceInfoProto info) {
StorageInfoProto storage = info.getStorageInfo();
return new NamespaceInfo(storage.getNamespceID(), storage.getClusterID(),
info.getBlockPoolID(), storage.getCTime(), info.getDistUpgradeVersion(),
info.getBuildVersion(), info.getSoftwareVersion());
info.getBlockPoolID(), storage.getCTime(), info.getBuildVersion(),
info.getSoftwareVersion());
}
public static NamenodeCommand convert(NamenodeCommandProto cmd) {
@ -898,7 +898,7 @@ public static NamespaceInfoProto convert(NamespaceInfo info) {
return NamespaceInfoProto.newBuilder()
.setBlockPoolID(info.getBlockPoolID())
.setBuildVersion(info.getBuildVersion())
.setDistUpgradeVersion(info.getDistributedUpgradeVersion())
.setUnused(0)
.setStorageInfo(PBHelper.convert((StorageInfo)info))
.setSoftwareVersion(info.getSoftwareVersion()).build();
}

View File

@ -1,91 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.common;
import java.io.IOException;
import java.util.SortedSet;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
/**
* Generic upgrade manager.
*
* {@link #broadcastCommand} is the command that should be
*
*/
@InterfaceAudience.Private
public abstract class UpgradeManager {
protected SortedSet<Upgradeable> currentUpgrades = null;
protected boolean upgradeState = false; // true if upgrade is in progress
protected int upgradeVersion = 0;
protected UpgradeCommand broadcastCommand = null;
public synchronized UpgradeCommand getBroadcastCommand() {
return this.broadcastCommand;
}
public synchronized boolean getUpgradeState() {
return this.upgradeState;
}
public synchronized int getUpgradeVersion(){
return this.upgradeVersion;
}
public synchronized void setUpgradeState(boolean uState, int uVersion) {
this.upgradeState = uState;
this.upgradeVersion = uVersion;
}
public SortedSet<Upgradeable> getDistributedUpgrades() throws IOException {
return UpgradeObjectCollection.getDistributedUpgrades(
getUpgradeVersion(), getType());
}
public synchronized short getUpgradeStatus() {
if(currentUpgrades == null)
return 100;
return currentUpgrades.first().getUpgradeStatus();
}
public synchronized boolean initializeUpgrade() throws IOException {
currentUpgrades = getDistributedUpgrades();
if(currentUpgrades == null) {
// set new upgrade state
setUpgradeState(false, HdfsConstants.LAYOUT_VERSION);
return false;
}
Upgradeable curUO = currentUpgrades.first();
// set and write new upgrade state into disk
setUpgradeState(true, curUO.getVersion());
return true;
}
public synchronized boolean isUpgradeCompleted() {
if (currentUpgrades == null) {
return true;
}
return false;
}
public abstract HdfsServerConstants.NodeType getType();
public abstract boolean startUpgrade() throws IOException;
public abstract void completeUpgrade() throws IOException;
}

View File

@ -1,74 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.common;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.server.common.UpgradeObjectCollection.UOSignature;
/**
* Abstract upgrade object.
*
* Contains default implementation of common methods of {@link Upgradeable}
* interface.
*/
@InterfaceAudience.Private
public abstract class UpgradeObject implements Upgradeable {
protected short status;
@Override
public short getUpgradeStatus() {
return status;
}
@Override
public String getDescription() {
return "Upgrade object for " + getType() + " layout version " + getVersion();
}
@Override
public UpgradeStatusReport getUpgradeStatusReport(boolean details)
throws IOException {
return new UpgradeStatusReport(getVersion(), getUpgradeStatus(), false);
}
@Override
public int compareTo(Upgradeable o) {
if(this.getVersion() != o.getVersion())
return (getVersion() > o.getVersion() ? -1 : 1);
int res = this.getType().toString().compareTo(o.getType().toString());
if(res != 0)
return res;
return getClass().getCanonicalName().compareTo(
o.getClass().getCanonicalName());
}
@Override
public boolean equals(Object o) {
if (!(o instanceof UpgradeObject)) {
return false;
}
return this.compareTo((UpgradeObject)o) == 0;
}
@Override
public int hashCode() {
return new UOSignature(this).hashCode();
}
}

View File

@ -1,135 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.common;
import java.io.IOException;
import java.util.SortedSet;
import java.util.TreeSet;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.util.StringUtils;
/**
* Collection of upgrade objects.
*
* Upgrade objects should be registered here before they can be used.
*/
@InterfaceAudience.Private
public class UpgradeObjectCollection {
static {
initialize();
// Registered distributed upgrade objects here
// registerUpgrade(new UpgradeObject());
}
static class UOSignature implements Comparable<UOSignature> {
int version;
HdfsServerConstants.NodeType type;
String className;
UOSignature(Upgradeable uo) {
this.version = uo.getVersion();
this.type = uo.getType();
this.className = uo.getClass().getCanonicalName();
}
int getVersion() {
return version;
}
HdfsServerConstants.NodeType getType() {
return type;
}
String getClassName() {
return className;
}
Upgradeable instantiate() throws IOException {
try {
return (Upgradeable)Class.forName(getClassName()).newInstance();
} catch(ClassNotFoundException e) {
throw new IOException(StringUtils.stringifyException(e));
} catch(InstantiationException e) {
throw new IOException(StringUtils.stringifyException(e));
} catch(IllegalAccessException e) {
throw new IOException(StringUtils.stringifyException(e));
}
}
@Override
public int compareTo(UOSignature o) {
if(this.version != o.version)
return (version < o.version ? -1 : 1);
int res = this.getType().toString().compareTo(o.getType().toString());
if(res != 0)
return res;
return className.compareTo(o.className);
}
@Override
public boolean equals(Object o) {
if (!(o instanceof UOSignature)) {
return false;
}
return this.compareTo((UOSignature)o) == 0;
}
@Override
public int hashCode() {
return version ^ ((type==null)?0:type.hashCode())
^ ((className==null)?0:className.hashCode());
}
}
/**
* Static collection of upgrade objects sorted by version.
* Layout versions are negative therefore newer versions will go first.
*/
static SortedSet<UOSignature> upgradeTable;
static final void initialize() {
upgradeTable = new TreeSet<UOSignature>();
}
static void registerUpgrade(Upgradeable uo) {
// Registered distributed upgrade objects here
upgradeTable.add(new UOSignature(uo));
}
public static SortedSet<Upgradeable> getDistributedUpgrades(int versionFrom,
HdfsServerConstants.NodeType type
) throws IOException {
assert HdfsConstants.LAYOUT_VERSION <= versionFrom : "Incorrect version "
+ versionFrom + ". Expected to be <= " + HdfsConstants.LAYOUT_VERSION;
SortedSet<Upgradeable> upgradeObjects = new TreeSet<Upgradeable>();
for(UOSignature sig : upgradeTable) {
if(sig.getVersion() < HdfsConstants.LAYOUT_VERSION)
continue;
if(sig.getVersion() > versionFrom)
break;
if(sig.getType() != type )
continue;
upgradeObjects.add(sig.instantiate());
}
if(upgradeObjects.size() == 0)
return null;
return upgradeObjects;
}
}

View File

@ -1,100 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.common;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
/**
* Common interface for distributed upgrade objects.
*
* Each upgrade object corresponds to a layout version,
* which is the latest version that should be upgraded using this object.
* That is all components whose layout version is greater or equal to the
* one returned by {@link #getVersion()} must be upgraded with this object.
*/
@InterfaceAudience.Private
public interface Upgradeable extends Comparable<Upgradeable> {
/**
* Get the layout version of the upgrade object.
* @return layout version
*/
int getVersion();
/**
* Get the type of the software component, which this object is upgrading.
* @return type
*/
HdfsServerConstants.NodeType getType();
/**
* Description of the upgrade object for displaying.
* @return description
*/
String getDescription();
/**
* Upgrade status determines a percentage of the work done out of the total
* amount required by the upgrade.
*
* 100% means that the upgrade is completed.
* Any value < 100 means it is not complete.
*
* The return value should provide at least 2 values, e.g. 0 and 100.
* @return integer value in the range [0, 100].
*/
short getUpgradeStatus();
/**
* Prepare for the upgrade.
* E.g. initialize upgrade data structures and set status to 0.
*
* Returns an upgrade command that is used for broadcasting to other cluster
* components.
* E.g. name-node informs data-nodes that they must perform a distributed upgrade.
*
* @return an UpgradeCommand for broadcasting.
* @throws IOException
*/
UpgradeCommand startUpgrade() throws IOException;
/**
* Complete upgrade.
* E.g. cleanup upgrade data structures or write metadata to disk.
*
* Returns an upgrade command that is used for broadcasting to other cluster
* components.
* E.g. data-nodes inform the name-node that they completed the upgrade
* while other data-nodes are still upgrading.
*
* @throws IOException
*/
UpgradeCommand completeUpgrade() throws IOException;
/**
* Get status report for the upgrade.
*
* @param details true if upgradeStatus details need to be included,
* false otherwise
* @return {@link UpgradeStatusReport}
* @throws IOException
*/
UpgradeStatusReport getUpgradeStatusReport(boolean details) throws IOException;
}

View File

@ -74,7 +74,6 @@ class BPOfferService {
*/
DatanodeRegistration bpRegistration;
UpgradeManagerDatanode upgradeManager = null;
private final DataNode dn;
/**
@ -260,33 +259,6 @@ void join() {
}
}
synchronized UpgradeManagerDatanode getUpgradeManager() {
if(upgradeManager == null)
upgradeManager =
new UpgradeManagerDatanode(dn, getBlockPoolId());
return upgradeManager;
}
void processDistributedUpgradeCommand(UpgradeCommand comm)
throws IOException {
UpgradeManagerDatanode upgradeManager = getUpgradeManager();
upgradeManager.processUpgradeCommand(comm);
}
/**
* Start distributed upgrade if it should be initiated by the data-node.
*/
synchronized void startDistributedUpgradeIfNeeded() throws IOException {
UpgradeManagerDatanode um = getUpgradeManager();
if(!um.getUpgradeState())
return;
um.setUpgradeState(false, um.getUpgradeVersion());
um.startUpgrade();
return;
}
DataNode getDataNode() {
return dn;
}
@ -374,9 +346,6 @@ synchronized void shutdownActor(BPServiceActor actor) {
if (bpServices.isEmpty()) {
dn.shutdownBlockPool(this);
if(upgradeManager != null)
upgradeManager.shutdownUpgrade();
}
}
@ -593,7 +562,7 @@ assert getBlockPoolId().equals(bp) :
break;
case UpgradeCommand.UC_ACTION_START_UPGRADE:
// start distributed upgrade here
processDistributedUpgradeCommand((UpgradeCommand)cmd);
LOG.warn("Distibuted upgrade is no longer supported");
break;
case DatanodeProtocol.DNA_RECOVERBLOCK:
String who = "NameNode at " + actor.getNNSocketAddress();

View File

@ -324,7 +324,7 @@ void notifyNamenodeDeletedBlock(ReceivedDeletedBlockInfo bInfo) {
* Run an immediate block report on this thread. Used by tests.
*/
@VisibleForTesting
void triggerBlockReportForTests() throws IOException {
void triggerBlockReportForTests() {
synchronized (pendingIncrementalBR) {
lastBlockReport = 0;
lastHeartbeat = 0;
@ -340,7 +340,7 @@ void triggerBlockReportForTests() throws IOException {
}
@VisibleForTesting
void triggerHeartbeatForTests() throws IOException {
void triggerHeartbeatForTests() {
synchronized (pendingIncrementalBR) {
lastHeartbeat = 0;
pendingIncrementalBR.notifyAll();
@ -355,7 +355,7 @@ void triggerHeartbeatForTests() throws IOException {
}
@VisibleForTesting
void triggerDeletionReportForTests() throws IOException {
void triggerDeletionReportForTests() {
synchronized (pendingIncrementalBR) {
lastDeletedReport = 0;
pendingIncrementalBR.notifyAll();
@ -670,7 +670,6 @@ public void run() {
while (shouldRun()) {
try {
bpos.startDistributedUpgradeIfNeeded();
offerService();
} catch (Exception ex) {
LOG.error("Exception in BPOfferService for " + this, ex);

View File

@ -138,7 +138,7 @@ void recoverTransitionRead(DataNode datanode, NamespaceInfo nsInfo,
// During startup some of them can upgrade or roll back
// while others could be up-to-date for the regular startup.
for (int idx = 0; idx < getNumStorageDirs(); idx++) {
doTransition(datanode, getStorageDir(idx), nsInfo, startOpt);
doTransition(getStorageDir(idx), nsInfo, startOpt);
assert getLayoutVersion() == nsInfo.getLayoutVersion()
: "Data-node and name-node layout versions must be the same.";
assert getCTime() == nsInfo.getCTime()
@ -232,7 +232,7 @@ protected void setFieldsFromProperties(Properties props, StorageDirectory sd)
* @param startOpt startup option
* @throws IOException
*/
private void doTransition(DataNode datanode, StorageDirectory sd,
private void doTransition(StorageDirectory sd,
NamespaceInfo nsInfo, StartupOption startOpt) throws IOException {
if (startOpt == StartupOption.ROLLBACK)
doRollback(sd, nsInfo); // rollback if applicable
@ -254,13 +254,9 @@ private void doTransition(DataNode datanode, StorageDirectory sd,
+ blockpoolID);
}
if (this.layoutVersion == HdfsConstants.LAYOUT_VERSION
&& this.cTime == nsInfo.getCTime())
&& this.cTime == nsInfo.getCTime()) {
return; // regular startup
// verify necessity of a distributed upgrade
UpgradeManagerDatanode um =
datanode.getUpgradeManagerDatanode(nsInfo.getBlockPoolID());
verifyDistributedUpgradeProgress(um, nsInfo);
}
if (this.layoutVersion > HdfsConstants.LAYOUT_VERSION
|| this.cTime < nsInfo.getCTime()) {
doUpgrade(sd, nsInfo); // upgrade
@ -476,13 +472,6 @@ private void linkAllBlocks(File fromDir, File toDir) throws IOException {
LOG.info( hardLink.linkStats.report() );
}
private void verifyDistributedUpgradeProgress(UpgradeManagerDatanode um,
NamespaceInfo nsInfo) throws IOException {
assert um != null : "DataNode.upgradeManager is null.";
um.setUpgradeState(false, getLayoutVersion());
um.initializeUpgrade(nsInfo);
}
/**
* gets the data node storage directory based on block pool storage
*

View File

@ -99,13 +99,8 @@ public void run() {
}
// Wait for at least one block pool to be up
private void waitForInit(String bpid) {
UpgradeManagerDatanode um = null;
if(bpid != null && !bpid.equals(""))
um = datanode.getUpgradeManagerDatanode(bpid);
while ((um != null && ! um.isUpgradeCompleted())
|| (getBlockPoolSetSize() < datanode.getAllBpOs().length)
private void waitForInit() {
while ((getBlockPoolSetSize() < datanode.getAllBpOs().length)
|| (getBlockPoolSetSize() < 1)) {
try {
Thread.sleep(5000);
@ -129,7 +124,7 @@ private BlockPoolSliceScanner getNextBPScanner(String currentBpId) {
String nextBpId = null;
while ((nextBpId == null) && datanode.shouldRun
&& !blockScannerThread.isInterrupted()) {
waitForInit(currentBpId);
waitForInit();
synchronized (this) {
if (getBlockPoolSetSize() > 0) {
// Find nextBpId by the minimum of the last scan time

View File

@ -502,7 +502,7 @@ private synchronized void initDirectoryScanner(Configuration conf) {
reason = "verifcation is not supported by SimulatedFSDataset";
}
if (reason == null) {
directoryScanner = new DirectoryScanner(this, data, conf);
directoryScanner = new DirectoryScanner(data, conf);
directoryScanner.start();
} else {
LOG.info("Periodic Directory Tree Verification scan is disabled because " +
@ -1218,17 +1218,8 @@ int getXmitsInProgress() {
return xmitsInProgress.get();
}
UpgradeManagerDatanode getUpgradeManagerDatanode(String bpid) {
BPOfferService bpos = blockPoolManager.get(bpid);
if(bpos==null) {
return null;
}
return bpos.getUpgradeManager();
}
private void transferBlock( ExtendedBlock block,
DatanodeInfo xferTargets[]
) throws IOException {
private void transferBlock(ExtendedBlock block, DatanodeInfo xferTargets[])
throws IOException {
BPOfferService bpos = getBPOSForBlock(block);
DatanodeRegistration bpReg = getDNRegistrationForBP(block.getBlockPoolId());
@ -1866,8 +1857,7 @@ public String toString() {
private void recoverBlock(RecoveringBlock rBlock) throws IOException {
ExtendedBlock block = rBlock.getBlock();
String blookPoolId = block.getBlockPoolId();
DatanodeInfo[] targets = rBlock.getLocations();
DatanodeID[] datanodeids = (DatanodeID[])targets;
DatanodeID[] datanodeids = rBlock.getLocations();
List<BlockRecord> syncList = new ArrayList<BlockRecord>(datanodeids.length);
int errorCount = 0;

View File

@ -396,10 +396,6 @@ private void doTransition( DataNode datanode,
if (this.layoutVersion == HdfsConstants.LAYOUT_VERSION
&& this.cTime == nsInfo.getCTime())
return; // regular startup
// verify necessity of a distributed upgrade
UpgradeManagerDatanode um =
datanode.getUpgradeManagerDatanode(nsInfo.getBlockPoolID());
verifyDistributedUpgradeProgress(um, nsInfo);
// do upgrade
if (this.layoutVersion > HdfsConstants.LAYOUT_VERSION
@ -708,14 +704,6 @@ public boolean accept(File dir, String name) {
new File(to, otherNames[i]), oldLV, hl);
}
private void verifyDistributedUpgradeProgress(UpgradeManagerDatanode um,
NamespaceInfo nsInfo
) throws IOException {
assert um != null : "DataNode.upgradeManager is null.";
um.setUpgradeState(false, getLayoutVersion());
um.initializeUpgrade(nsInfo);
}
/**
* Add bpStorage into bpStorageMap
*/

View File

@ -56,7 +56,6 @@
public class DirectoryScanner implements Runnable {
private static final Log LOG = LogFactory.getLog(DirectoryScanner.class);
private final DataNode datanode;
private final FsDatasetSpi<?> dataset;
private final ExecutorService reportCompileThreadPool;
private final ScheduledExecutorService masterThread;
@ -222,8 +221,7 @@ public long getGenStamp() {
}
}
DirectoryScanner(DataNode dn, FsDatasetSpi<?> dataset, Configuration conf) {
this.datanode = dn;
DirectoryScanner(FsDatasetSpi<?> dataset, Configuration conf) {
this.dataset = dataset;
int interval = conf.getInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY,
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_DEFAULT);
@ -271,17 +269,6 @@ public void run() {
return;
}
String[] bpids = dataset.getBlockPoolList();
for(String bpid : bpids) {
UpgradeManagerDatanode um =
datanode.getUpgradeManagerDatanode(bpid);
if (um != null && !um.isUpgradeCompleted()) {
//If distributed upgrades underway, exit and wait for next cycle.
LOG.warn("this cycle terminating immediately because Distributed Upgrade is in process");
return;
}
}
//We're are okay to run - do it
reconcile();

View File

@ -1,158 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode;
import java.io.IOException;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.common.UpgradeManager;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
import org.apache.hadoop.util.Daemon;
/**
* Upgrade manager for data-nodes.
*
* Distributed upgrades for a data-node are performed in a separate thread.
* The upgrade starts when the data-node receives the start upgrade command
* from the namenode. At that point the manager finds a respective upgrade
* object and starts a daemon in order to perform the upgrade defined by the
* object.
*/
class UpgradeManagerDatanode extends UpgradeManager {
DataNode dataNode = null;
Daemon upgradeDaemon = null;
String bpid = null;
UpgradeManagerDatanode(DataNode dataNode, String bpid) {
super();
this.dataNode = dataNode;
this.bpid = bpid;
}
@Override
public HdfsServerConstants.NodeType getType() {
return HdfsServerConstants.NodeType.DATA_NODE;
}
synchronized void initializeUpgrade(NamespaceInfo nsInfo) throws IOException {
if( ! super.initializeUpgrade())
return; // distr upgrade is not needed
DataNode.LOG.info("\n Distributed upgrade for DataNode "
+ dataNode.getDisplayName()
+ " version " + getUpgradeVersion() + " to current LV "
+ HdfsConstants.LAYOUT_VERSION + " is initialized.");
UpgradeObjectDatanode curUO = (UpgradeObjectDatanode)currentUpgrades.first();
curUO.setDatanode(dataNode, this.bpid);
upgradeState = curUO.preUpgradeAction(nsInfo);
// upgradeState is true if the data-node should start the upgrade itself
}
/**
* Start distributed upgrade.
* Instantiates distributed upgrade objects.
*
* @return true if distributed upgrade is required or false otherwise
* @throws IOException
*/
@Override
public synchronized boolean startUpgrade() throws IOException {
if(upgradeState) { // upgrade is already in progress
assert currentUpgrades != null :
"UpgradeManagerDatanode.currentUpgrades is null.";
UpgradeObjectDatanode curUO = (UpgradeObjectDatanode)currentUpgrades.first();
curUO.startUpgrade();
return true;
}
if(broadcastCommand != null) {
if(broadcastCommand.getVersion() > this.getUpgradeVersion()) {
// stop broadcasting, the cluster moved on
// start upgrade for the next version
broadcastCommand = null;
} else {
// the upgrade has been finished by this data-node,
// but the cluster is still running it,
// reply with the broadcast command
assert currentUpgrades == null :
"UpgradeManagerDatanode.currentUpgrades is not null.";
assert upgradeDaemon == null :
"UpgradeManagerDatanode.upgradeDaemon is not null.";
DatanodeProtocol nn = dataNode.getActiveNamenodeForBP(bpid);
nn.processUpgradeCommand(broadcastCommand);
return true;
}
}
if(currentUpgrades == null)
currentUpgrades = getDistributedUpgrades();
if(currentUpgrades == null) {
DataNode.LOG.info("\n Distributed upgrade for DataNode version "
+ getUpgradeVersion() + " to current LV "
+ HdfsConstants.LAYOUT_VERSION + " cannot be started. "
+ "The upgrade object is not defined.");
return false;
}
upgradeState = true;
UpgradeObjectDatanode curUO = (UpgradeObjectDatanode)currentUpgrades.first();
curUO.setDatanode(dataNode, this.bpid);
curUO.startUpgrade();
upgradeDaemon = new Daemon(curUO);
upgradeDaemon.start();
DataNode.LOG.info("\n Distributed upgrade for DataNode "
+ dataNode.getDisplayName()
+ " version " + getUpgradeVersion() + " to current LV "
+ HdfsConstants.LAYOUT_VERSION + " is started.");
return true;
}
synchronized void processUpgradeCommand(UpgradeCommand command
) throws IOException {
assert command.getAction() == UpgradeCommand.UC_ACTION_START_UPGRADE :
"Only start upgrade action can be processed at this time.";
this.upgradeVersion = command.getVersion();
// Start distributed upgrade
if(startUpgrade()) // upgrade started
return;
throw new IOException(
"Distributed upgrade for DataNode " + dataNode.getDisplayName()
+ " version " + getUpgradeVersion() + " to current LV "
+ HdfsConstants.LAYOUT_VERSION + " cannot be started. "
+ "The upgrade object is not defined.");
}
@Override
public synchronized void completeUpgrade() throws IOException {
assert currentUpgrades != null :
"UpgradeManagerDatanode.currentUpgrades is null.";
UpgradeObjectDatanode curUO = (UpgradeObjectDatanode)currentUpgrades.first();
broadcastCommand = curUO.completeUpgrade();
upgradeState = false;
currentUpgrades = null;
upgradeDaemon = null;
DataNode.LOG.info("\n Distributed upgrade for DataNode "
+ dataNode.getDisplayName()
+ " version " + getUpgradeVersion() + " to current LV "
+ HdfsConstants.LAYOUT_VERSION + " is complete.");
}
synchronized void shutdownUpgrade() {
if(upgradeDaemon != null)
upgradeDaemon.interrupt();
}
}

View File

@ -1,142 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.common.UpgradeObject;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
import java.io.IOException;
import java.net.SocketTimeoutException;
/**
* Base class for data-node upgrade objects.
* Data-node upgrades are run in separate threads.
*/
@InterfaceAudience.Private
public abstract class UpgradeObjectDatanode extends UpgradeObject implements Runnable {
private DataNode dataNode = null;
private String bpid = null;
@Override
public HdfsServerConstants.NodeType getType() {
return HdfsServerConstants.NodeType.DATA_NODE;
}
protected DataNode getDatanode() {
return dataNode;
}
protected DatanodeProtocol getNamenode() throws IOException {
return dataNode.getActiveNamenodeForBP(bpid);
}
void setDatanode(DataNode dataNode, String bpid) {
this.dataNode = dataNode;
this.bpid = bpid;
}
/**
* Specifies how the upgrade is performed.
* @throws IOException
*/
public abstract void doUpgrade() throws IOException;
/**
* Specifies what to do before the upgrade is started.
*
* The default implementation checks whether the data-node missed the upgrade
* and throws an exception if it did. This leads to the data-node shutdown.
*
* Data-nodes usually start distributed upgrade when the name-node replies
* to its heartbeat with a start upgrade command.
* Sometimes though, e.g. when a data-node missed the upgrade and wants to
* catchup with the rest of the cluster, it is necessary to initiate the
* upgrade directly on the data-node, since the name-node might not ever
* start it. An override of this method should then return true.
* And the upgrade will start after data-ndoe registration but before sending
* its first heartbeat.
*
* @param nsInfo name-node versions, verify that the upgrade
* object can talk to this name-node version if necessary.
*
* @throws IOException
* @return true if data-node itself should start the upgrade or
* false if it should wait until the name-node starts the upgrade.
*/
boolean preUpgradeAction(NamespaceInfo nsInfo) throws IOException {
int nsUpgradeVersion = nsInfo.getDistributedUpgradeVersion();
if(nsUpgradeVersion >= getVersion())
return false; // name-node will perform the upgrade
// Missed the upgrade. Report problem to the name-node and throw exception
String errorMsg =
"\n Data-node missed a distributed upgrade and will shutdown."
+ "\n " + getDescription() + "."
+ " Name-node version = " + nsInfo.getLayoutVersion() + ".";
DataNode.LOG.fatal( errorMsg );
String bpid = nsInfo.getBlockPoolID();
dataNode.trySendErrorReport(bpid, DatanodeProtocol.NOTIFY, errorMsg);
throw new IOException(errorMsg);
}
@Override
public void run() {
assert dataNode != null : "UpgradeObjectDatanode.dataNode is null";
while(dataNode.shouldRun) {
try {
doUpgrade();
} catch(Exception e) {
DataNode.LOG.error("Exception in doUpgrade", e);
}
break;
}
// report results
if(getUpgradeStatus() < 100) {
DataNode.LOG.info("\n Distributed upgrade for DataNode version "
+ getVersion() + " to current LV "
+ HdfsConstants.LAYOUT_VERSION + " cannot be completed.");
}
// Complete the upgrade by calling the manager method
try {
UpgradeManagerDatanode upgradeManager =
dataNode.getUpgradeManagerDatanode(bpid);
if(upgradeManager != null)
upgradeManager.completeUpgrade();
} catch(IOException e) {
DataNode.LOG.error("Exception in completeUpgrade", e);
}
}
/**
* Complete upgrade and return a status complete command for broadcasting.
*
* Data-nodes finish upgrade at different times.
* The data-node needs to re-confirm with the name-node that the upgrade
* is complete while other nodes are still upgrading.
*/
@Override
public UpgradeCommand completeUpgrade() throws IOException {
return new UpgradeCommand(UpgradeCommand.UC_ACTION_REPORT_STATUS,
getVersion(), (short)100);
}
}

View File

@ -96,8 +96,6 @@ public class FSImage implements Closeable {
/**
* Construct an FSImage
* @param conf Configuration
* @see #FSImage(Configuration conf,
* Collection imageDirs, Collection editsDirs)
* @throws IOException if default directories are invalid.
*/
public FSImage(Configuration conf) throws IOException {
@ -191,8 +189,6 @@ boolean recoverTransitionRead(StartupOption startOpt, FSNamesystem target,
throw new IOException(
"All specified directories are not accessible or do not exist.");
storage.setUpgradeManager(target.upgradeManager);
// 1. For each data directory calculate its state and
// check whether all is consistent before transitioning.
Map<StorageDirectory, StorageState> dataDirStates =
@ -227,9 +223,6 @@ boolean recoverTransitionRead(StartupOption startOpt, FSNamesystem target,
storage.processStartupOptionsForUpgrade(startOpt, layoutVersion);
// check whether distributed upgrade is required and/or should be continued
storage.verifyDistributedUpgradeProgress(startOpt);
// 2. Format unformatted dirs.
for (Iterator<StorageDirectory> it = storage.dirIterator(); it.hasNext();) {
StorageDirectory sd = it.next();
@ -320,13 +313,6 @@ private boolean recoverStorageDirs(StartupOption startOpt,
}
private void doUpgrade(FSNamesystem target) throws IOException {
if(storage.getDistributedUpgradeState()) {
// only distributed upgrade need to continue
// don't do version upgrade
this.loadFSImage(target, null);
storage.initializeDistributedUpgrade();
return;
}
// Upgrade is allowed only if there are
// no previous fs states in any of the directories
for (Iterator<StorageDirectory> it = storage.dirIterator(); it.hasNext();) {
@ -409,7 +395,6 @@ private void doUpgrade(FSNamesystem target) throws IOException {
+ storage.getRemovedStorageDirs().size()
+ " storage directory(ies), previously logged.");
}
storage.initializeDistributedUpgrade();
}
private void doRollback() throws IOException {
@ -472,8 +457,6 @@ private void doRollback() throws IOException {
LOG.info("Rollback of " + sd.getRoot()+ " is complete.");
}
isUpgradeFinalized = true;
// check whether name-node can start in regular mode
storage.verifyDistributedUpgradeProgress(StartupOption.REGULAR);
}
private void doFinalize(StorageDirectory sd) throws IOException {

View File

@ -108,7 +108,6 @@
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FileAlreadyExistsException;
@ -136,7 +135,6 @@
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.UpgradeAction;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
@ -160,7 +158,6 @@
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirType;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
import org.apache.hadoop.hdfs.server.common.Util;
import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
@ -179,7 +176,6 @@
import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.Server;
@ -942,8 +938,7 @@ NamespaceInfo getNamespaceInfo() {
NamespaceInfo unprotectedGetNamespaceInfo() {
return new NamespaceInfo(dir.fsImage.getStorage().getNamespaceID(),
getClusterId(), getBlockPoolId(),
dir.fsImage.getStorage().getCTime(),
upgradeManager.getUpgradeVersion());
dir.fsImage.getStorage().getCTime());
}
/**
@ -3387,13 +3382,6 @@ HeartbeatResponse handleHeartbeat(DatanodeRegistration nodeReg,
DatanodeCommand[] cmds = blockManager.getDatanodeManager().handleHeartbeat(
nodeReg, blockPoolId, capacity, dfsUsed, remaining, blockPoolUsed,
xceiverCount, maxTransfer, failedVolumes);
if (cmds == null || cmds.length == 0) {
DatanodeCommand cmd = upgradeManager.getBroadcastCommand();
if (cmd != null) {
cmds = new DatanodeCommand[] {cmd};
}
}
return new HeartbeatResponse(cmds, createHaStatusHeartbeat());
} finally {
readUnlock();
@ -3834,24 +3822,9 @@ private void enter() {
/**
* Leave safe mode.
* <p>
* Switch to manual safe mode if distributed upgrade is required.<br>
* Check for invalid, under- & over-replicated blocks in the end of startup.
*/
private synchronized void leave(boolean checkForUpgrades) {
if(checkForUpgrades) {
// verify whether a distributed upgrade needs to be started
boolean needUpgrade = false;
try {
needUpgrade = upgradeManager.startUpgrade();
} catch(IOException e) {
FSNamesystem.LOG.error("IOException in startDistributedUpgradeIfNeeded", e);
}
if(needUpgrade) {
// switch to manual safe mode
safeMode = new SafeModeInfo(false);
return;
}
}
private synchronized void leave() {
// if not done yet, initialize replication queues.
// In the standby, do not populate repl queues
if (!isPopulatingReplQueues() && !isInStandbyState()) {
@ -3945,7 +3918,7 @@ private void checkMode() {
// the threshold is reached
if (!isOn() || // safe mode is off
extension <= 0 || threshold <= 0) { // don't need to wait
this.leave(true); // leave safe mode
this.leave(); // leave safe mode
return;
}
if (reached > 0) { // threshold has already been reached before
@ -4049,10 +4022,6 @@ String getTurnOffTip() {
leaveMsg = "Safe mode will be turned off automatically";
}
if(isManual()) {
if(upgradeManager.getUpgradeState())
return leaveMsg + " upon completion of " +
"the distributed upgrade: upgrade progress = " +
upgradeManager.getUpgradeStatus() + "%";
leaveMsg = "Use \"hdfs dfsadmin -safemode leave\" to turn safe mode off";
}
@ -4187,13 +4156,7 @@ public void run() {
LOG.info("NameNode is being shutdown, exit SafeModeMonitor thread. ");
} else {
// leave safe mode and stop the monitor
try {
leaveSafeMode(true);
} catch(SafeModeException es) { // should never happen
String msg = "SafeModeMonitor may not run during distributed upgrade.";
assert false : msg;
throw new RuntimeException(msg, es);
}
leaveSafeMode();
}
smmthread = null;
}
@ -4204,7 +4167,7 @@ boolean setSafeMode(SafeModeAction action) throws IOException {
checkSuperuserPrivilege();
switch(action) {
case SAFEMODE_LEAVE: // leave safe mode
leaveSafeMode(false);
leaveSafeMode();
break;
case SAFEMODE_ENTER: // enter safe mode
enterSafeMode(false);
@ -4389,17 +4352,14 @@ void enterSafeMode(boolean resourcesLow) throws IOException {
* Leave safe mode.
* @throws IOException
*/
void leaveSafeMode(boolean checkForUpgrades) throws SafeModeException {
void leaveSafeMode() {
writeLock();
try {
if (!isInSafeMode()) {
NameNode.stateChangeLog.info("STATE* Safe mode is already OFF.");
return;
}
if(upgradeManager.getUpgradeState())
throw new SafeModeException("Distributed upgrade is in progress",
safeMode);
safeMode.leave(checkForUpgrades);
safeMode.leave();
} finally {
writeUnlock();
}
@ -4474,18 +4434,6 @@ private boolean isValidBlock(Block b) {
return (blockManager.getBlockCollection(b) != null);
}
// Distributed upgrade manager
final UpgradeManagerNamenode upgradeManager = new UpgradeManagerNamenode(this);
UpgradeStatusReport distributedUpgradeProgress(UpgradeAction action
) throws IOException {
return upgradeManager.distributedUpgradeProgress(action);
}
UpgradeCommand processDistributedUpgradeCommand(UpgradeCommand comm) throws IOException {
return upgradeManager.processUpgradeCommand(comm);
}
PermissionStatus createFsOwnerPermissions(FsPermission permission) {
return new PermissionStatus(fsOwner.getShortUserName(), supergroup, permission);
}

View File

@ -32,8 +32,6 @@
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSUtil;
@ -45,7 +43,6 @@
import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.StorageErrorReporter;
import org.apache.hadoop.hdfs.server.common.UpgradeManager;
import org.apache.hadoop.hdfs.server.common.Util;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.util.PersistentLongFile;
@ -65,8 +62,6 @@
@InterfaceAudience.Private
public class NNStorage extends Storage implements Closeable,
StorageErrorReporter {
private static final Log LOG = LogFactory.getLog(NNStorage.class.getName());
static final String DEPRECATED_MESSAGE_DIGEST_PROPERTY = "imageMD5Digest";
static final String LOCAL_URI_SCHEME = "file";
@ -112,7 +107,6 @@ public boolean isOfType(StorageDirType type) {
}
}
private UpgradeManager upgradeManager = null;
protected String blockpoolID = ""; // id of the block pool
/**
@ -551,11 +545,8 @@ public void format(NamespaceInfo nsInfo) throws IOException {
public static NamespaceInfo newNamespaceInfo()
throws UnknownHostException {
return new NamespaceInfo(
newNamespaceID(),
newClusterID(),
newBlockPoolID(),
0L, 0);
return new NamespaceInfo(newNamespaceID(), newClusterID(),
newBlockPoolID(), 0L);
}
public void format() throws IOException {
@ -600,13 +591,6 @@ protected void setFieldsFromProperties(
String sbpid = props.getProperty("blockpoolID");
setBlockPoolID(sd.getRoot(), sbpid);
}
String sDUS, sDUV;
sDUS = props.getProperty("distributedUpgradeState");
sDUV = props.getProperty("distributedUpgradeVersion");
setDistributedUpgradeState(
sDUS == null? false : Boolean.parseBoolean(sDUS),
sDUV == null? getLayoutVersion() : Integer.parseInt(sDUV));
setDeprecatedPropertiesForUpgrade(props);
}
@ -653,13 +637,6 @@ protected void setPropertiesFromFields(Properties props,
if (LayoutVersion.supports(Feature.FEDERATION, layoutVersion)) {
props.setProperty("blockpoolID", blockpoolID);
}
boolean uState = getDistributedUpgradeState();
int uVersion = getDistributedUpgradeVersion();
if(uState && uVersion != getLayoutVersion()) {
props.setProperty("distributedUpgradeState", Boolean.toString(uState));
props.setProperty("distributedUpgradeVersion",
Integer.toString(uVersion));
}
}
static File getStorageFile(StorageDirectory sd, NameNodeFile type, long imageTxId) {
@ -732,7 +709,7 @@ File findFinalizedEditsFile(long startTxId, long endTxId)
* Return the first readable image file for the given txid, or null
* if no such image can be found
*/
File findImageFile(long txid) throws IOException {
File findImageFile(long txid) {
return findFile(NameNodeDirType.IMAGE,
getImageFileName(txid));
}
@ -753,76 +730,6 @@ private File findFile(NameNodeDirType dirType, String name) {
return null;
}
/**
* Set the upgrade manager for use in a distributed upgrade.
* @param um The upgrade manager
*/
void setUpgradeManager(UpgradeManager um) {
upgradeManager = um;
}
/**
* @return The current distribued upgrade state.
*/
boolean getDistributedUpgradeState() {
return upgradeManager == null ? false : upgradeManager.getUpgradeState();
}
/**
* @return The current upgrade version.
*/
int getDistributedUpgradeVersion() {
return upgradeManager == null ? 0 : upgradeManager.getUpgradeVersion();
}
/**
* Set the upgrade state and version.
* @param uState the new state.
* @param uVersion the new version.
*/
private void setDistributedUpgradeState(boolean uState, int uVersion) {
if (upgradeManager != null) {
upgradeManager.setUpgradeState(uState, uVersion);
}
}
/**
* Verify that the distributed upgrade state is valid.
* @param startOpt the option the namenode was started with.
*/
void verifyDistributedUpgradeProgress(StartupOption startOpt
) throws IOException {
if(startOpt == StartupOption.ROLLBACK || startOpt == StartupOption.IMPORT)
return;
assert upgradeManager != null : "FSNameSystem.upgradeManager is null.";
if(startOpt != StartupOption.UPGRADE) {
if(upgradeManager.getUpgradeState())
throw new IOException(
"\n Previous distributed upgrade was not completed. "
+ "\n Please restart NameNode with -upgrade option.");
if(upgradeManager.getDistributedUpgrades() != null)
throw new IOException("\n Distributed upgrade for NameNode version "
+ upgradeManager.getUpgradeVersion()
+ " to current LV " + HdfsConstants.LAYOUT_VERSION
+ " is required.\n Please restart NameNode"
+ " with -upgrade option.");
}
}
/**
* Initialize a distributed upgrade.
*/
void initializeDistributedUpgrade() throws IOException {
if(! upgradeManager.initializeUpgrade())
return;
// write new upgrade state into disk
writeAll();
LOG.info("\n Distributed upgrade for NameNode version "
+ upgradeManager.getUpgradeVersion() + " to current LV "
+ HdfsConstants.LAYOUT_VERSION + " is initialized.");
}
/**
* Disable the check for pre-upgradable layouts. Needed for BackupImage.
* @param val Whether to disable the preupgradeable layout check.
@ -1099,7 +1006,6 @@ public NamespaceInfo getNamespaceInfo() {
getNamespaceID(),
getClusterID(),
getBlockPoolID(),
getCTime(),
getDistributedUpgradeVersion());
getCTime());
}
}

View File

@ -742,8 +742,8 @@ public void finalizeUpgrade() throws IOException {
@Override // ClientProtocol
public UpgradeStatusReport distributedUpgradeProgress(UpgradeAction action)
throws IOException {
namesystem.checkOperation(OperationCategory.READ);
return namesystem.distributedUpgradeProgress(action);
throw new UnsupportedActionException(
"Deprecated method. No longer supported");
}
@Override // ClientProtocol
@ -917,8 +917,10 @@ public NamespaceInfo versionRequest() throws IOException {
}
@Override // DatanodeProtocol
public UpgradeCommand processUpgradeCommand(UpgradeCommand comm) throws IOException {
return namesystem.processDistributedUpgradeCommand(comm);
public UpgradeCommand processUpgradeCommand(UpgradeCommand comm)
throws IOException {
throw new UnsupportedActionException(
"Deprecated method, no longer supported");
}
/**

View File

@ -120,19 +120,6 @@ static String getInodeLimitText(FSNamesystem fsn) {
return str;
}
static String getUpgradeStatusText(FSNamesystem fsn) {
String statusText = "";
try {
UpgradeStatusReport status = fsn
.distributedUpgradeProgress(UpgradeAction.GET_STATUS);
statusText = (status == null ? "There are no upgrades in progress."
: status.getStatusText(false));
} catch (IOException e) {
statusText = "Upgrade status unknown.";
}
return statusText;
}
/** Return a table containing version information. */
static String getVersionTable(FSNamesystem fsn) {
return "<div class='dfstable'><table>"
@ -141,8 +128,6 @@ static String getVersionTable(FSNamesystem fsn) {
+ VersionInfo.getVersion() + ", " + VersionInfo.getRevision()
+ "</td></tr>\n" + "\n <tr><td class='col1'>Compiled:</td><td>" + VersionInfo.getDate()
+ " by " + VersionInfo.getUser() + " from " + VersionInfo.getBranch()
+ "</td></tr>\n <tr><td class='col1'>Upgrades:</td><td>"
+ getUpgradeStatusText(fsn)
+ "</td></tr>\n <tr><td class='col1'>Cluster ID:</td><td>" + fsn.getClusterId()
+ "</td></tr>\n <tr><td class='col1'>Block Pool ID:</td><td>" + fsn.getBlockPoolId()
+ "</td></tr>\n</table></div>";

View File

@ -1,147 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import java.io.IOException;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.UpgradeAction;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
import org.apache.hadoop.hdfs.server.common.UpgradeManager;
import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
/**
* Upgrade manager for name-nodes.
*
* Distributed upgrades for a name-node starts when the safe mode conditions
* are met and the name-node is about to exit it.
* At this point the name-node enters manual safe mode which will remain
* on until the upgrade is completed.
* After that the name-nodes processes upgrade commands from data-nodes
* and updates its status.
*/
class UpgradeManagerNamenode extends UpgradeManager {
@Override
public HdfsServerConstants.NodeType getType() {
return HdfsServerConstants.NodeType.NAME_NODE;
}
private final FSNamesystem namesystem;
UpgradeManagerNamenode(FSNamesystem namesystem) {
this.namesystem = namesystem;
}
/**
* Start distributed upgrade.
* Instantiates distributed upgrade objects.
*
* @return true if distributed upgrade is required or false otherwise
* @throws IOException
*/
@Override
public synchronized boolean startUpgrade() throws IOException {
if(!upgradeState) {
initializeUpgrade();
if(!upgradeState) return false;
// write new upgrade state into disk
namesystem.getFSImage().getStorage().writeAll();
}
assert currentUpgrades != null : "currentUpgrades is null";
this.broadcastCommand = currentUpgrades.first().startUpgrade();
NameNode.LOG.info("\n Distributed upgrade for NameNode version "
+ getUpgradeVersion() + " to current LV "
+ HdfsConstants.LAYOUT_VERSION + " is started.");
return true;
}
synchronized UpgradeCommand processUpgradeCommand(UpgradeCommand command
) throws IOException {
if(NameNode.LOG.isDebugEnabled()) {
NameNode.LOG.debug("\n Distributed upgrade for NameNode version "
+ getUpgradeVersion() + " to current LV "
+ HdfsConstants.LAYOUT_VERSION + " is processing upgrade command: "
+ command.getAction() + " status = " + getUpgradeStatus() + "%");
}
if(currentUpgrades == null) {
NameNode.LOG.info("Ignoring upgrade command: "
+ command.getAction() + " version " + command.getVersion()
+ ". No distributed upgrades are currently running on the NameNode");
return null;
}
UpgradeObjectNamenode curUO = (UpgradeObjectNamenode)currentUpgrades.first();
if(command.getVersion() != curUO.getVersion())
throw new IncorrectVersionException(command.getVersion(),
"UpgradeCommand", curUO.getVersion());
UpgradeCommand reply = curUO.processUpgradeCommand(command);
if(curUO.getUpgradeStatus() < 100) {
return reply;
}
// current upgrade is done
curUO.completeUpgrade();
NameNode.LOG.info("\n Distributed upgrade for NameNode version "
+ curUO.getVersion() + " to current LV "
+ HdfsConstants.LAYOUT_VERSION + " is complete.");
// proceede with the next one
currentUpgrades.remove(curUO);
if(currentUpgrades.isEmpty()) { // all upgrades are done
completeUpgrade();
} else { // start next upgrade
curUO = (UpgradeObjectNamenode)currentUpgrades.first();
this.broadcastCommand = curUO.startUpgrade();
}
return reply;
}
@Override
public synchronized void completeUpgrade() throws IOException {
// set and write new upgrade state into disk
setUpgradeState(false, HdfsConstants.LAYOUT_VERSION);
namesystem.getFSImage().getStorage().writeAll();
currentUpgrades = null;
broadcastCommand = null;
namesystem.leaveSafeMode(false);
}
synchronized UpgradeStatusReport distributedUpgradeProgress
(UpgradeAction action) throws IOException {
boolean isFinalized = false;
if(currentUpgrades == null) { // no upgrades are in progress
FSImage fsimage = namesystem.getFSImage();
isFinalized = fsimage.isUpgradeFinalized();
if(isFinalized) // upgrade is finalized
return null; // nothing to report
return new UpgradeStatusReport(fsimage.getStorage().getLayoutVersion(),
(short)101, isFinalized);
}
UpgradeObjectNamenode curUO = (UpgradeObjectNamenode)currentUpgrades.first();
boolean details = false;
switch(action) {
case GET_STATUS:
break;
case DETAILED_STATUS:
details = true;
break;
case FORCE_PROCEED:
curUO.forceProceed();
}
return curUO.getUpgradeStatusReport(details);
}
}

View File

@ -1,66 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.common.UpgradeObject;
import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
/**
* Base class for name-node upgrade objects.
* Data-node upgrades are run in separate threads.
*/
@InterfaceAudience.Private
public abstract class UpgradeObjectNamenode extends UpgradeObject {
/**
* Process an upgrade command.
* RPC has only one very generic command for all upgrade related inter
* component communications.
* The actual command recognition and execution should be handled here.
* The reply is sent back also as an UpgradeCommand.
*
* @param command
* @return the reply command which is analyzed on the client side.
*/
public abstract UpgradeCommand processUpgradeCommand(UpgradeCommand command
) throws IOException;
@Override
public HdfsServerConstants.NodeType getType() {
return HdfsServerConstants.NodeType.NAME_NODE;
}
/**
*/
@Override
public UpgradeCommand startUpgrade() throws IOException {
// broadcast that data-nodes must start the upgrade
return new UpgradeCommand(UpgradeCommand.UC_ACTION_START_UPGRADE,
getVersion(), (short)0);
}
public void forceProceed() throws IOException {
// do nothing by default
NameNode.LOG.info("forceProceed() is not defined for the upgrade. "
+ getDescription());
}
}

View File

@ -37,7 +37,6 @@
@InterfaceStability.Evolving
public class NamespaceInfo extends StorageInfo {
String buildVersion;
int distributedUpgradeVersion;
String blockPoolID = ""; // id of the block pool
String softwareVersion;
@ -47,17 +46,16 @@ public NamespaceInfo() {
}
public NamespaceInfo(int nsID, String clusterID, String bpID,
long cT, int duVersion, String buildVersion, String softwareVersion) {
long cT, String buildVersion, String softwareVersion) {
super(HdfsConstants.LAYOUT_VERSION, nsID, clusterID, cT);
blockPoolID = bpID;
this.buildVersion = buildVersion;
this.distributedUpgradeVersion = duVersion;
this.softwareVersion = softwareVersion;
}
public NamespaceInfo(int nsID, String clusterID, String bpID,
long cT, int duVersion) {
this(nsID, clusterID, bpID, cT, duVersion, Storage.getBuildVersion(),
long cT) {
this(nsID, clusterID, bpID, cT, Storage.getBuildVersion(),
VersionInfo.getVersion());
}
@ -65,10 +63,6 @@ public String getBuildVersion() {
return buildVersion;
}
public int getDistributedUpgradeVersion() {
return distributedUpgradeVersion;
}
public String getBlockPoolID() {
return blockPoolID;
}

View File

@ -47,8 +47,6 @@
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.UpgradeAction;
import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.TransferFsImage;
import org.apache.hadoop.ipc.RPC;
@ -303,15 +301,9 @@ public void report() throws IOException {
long remaining = ds.getRemaining();
long presentCapacity = used + remaining;
boolean mode = dfs.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_GET);
UpgradeStatusReport status =
dfs.distributedUpgradeProgress(UpgradeAction.GET_STATUS);
if (mode) {
System.out.println("Safe mode is ON");
}
if (status != null) {
System.out.println(status.getStatusText(false));
}
System.out.println("Configured Capacity: " + capacity
+ " (" + StringUtils.byteDesc(capacity) + ")");
System.out.println("Present Capacity: " + presentCapacity
@ -578,10 +570,6 @@ private void printHelp(String cmd) {
"\t\tfollowed by Namenode doing the same.\n" +
"\t\tThis completes the upgrade process.\n";
String upgradeProgress = "-upgradeProgress <status|details|force>: \n" +
"\t\trequest current distributed upgrade status, \n" +
"\t\ta detailed status or force the upgrade to proceed.\n";
String metaSave = "-metasave <filename>: \tSave Namenode's primary data structures\n" +
"\t\tto <filename> in the directory specified by hadoop.log.dir property.\n" +
"\t\t<filename> will contain one line for each of the following\n" +
@ -643,8 +631,6 @@ private void printHelp(String cmd) {
System.out.println(refreshNodes);
} else if ("finalizeUpgrade".equals(cmd)) {
System.out.println(finalizeUpgrade);
} else if ("upgradeProgress".equals(cmd)) {
System.out.println(upgradeProgress);
} else if ("metasave".equals(cmd)) {
System.out.println(metaSave);
} else if (SetQuotaCommand.matches("-"+cmd)) {
@ -681,7 +667,6 @@ private void printHelp(String cmd) {
System.out.println(restoreFailedStorage);
System.out.println(refreshNodes);
System.out.println(finalizeUpgrade);
System.out.println(upgradeProgress);
System.out.println(metaSave);
System.out.println(SetQuotaCommand.DESCRIPTION);
System.out.println(ClearQuotaCommand.DESCRIPTION);
@ -714,41 +699,6 @@ public int finalizeUpgrade() throws IOException {
return 0;
}
/**
* Command to request current distributed upgrade status,
* a detailed status, or to force the upgrade to proceed.
*
* Usage: java DFSAdmin -upgradeProgress [status | details | force]
* @exception IOException
*/
public int upgradeProgress(String[] argv, int idx) throws IOException {
if (idx != argv.length - 1) {
printUsage("-upgradeProgress");
return -1;
}
UpgradeAction action;
if ("status".equalsIgnoreCase(argv[idx])) {
action = UpgradeAction.GET_STATUS;
} else if ("details".equalsIgnoreCase(argv[idx])) {
action = UpgradeAction.DETAILED_STATUS;
} else if ("force".equalsIgnoreCase(argv[idx])) {
action = UpgradeAction.FORCE_PROCEED;
} else {
printUsage("-upgradeProgress");
return -1;
}
DistributedFileSystem dfs = getDFS();
UpgradeStatusReport status = dfs.distributedUpgradeProgress(action);
String statusText = (status == null ?
"There are no upgrades in progress." :
status.getStatusText(action == UpgradeAction.DETAILED_STATUS));
System.out.println(statusText);
return 0;
}
/**
* Dumps DFS data structures into specified file.
* Usage: java DFSAdmin -metasave filename
@ -918,9 +868,6 @@ private static void printUsage(String cmd) {
} else if ("-finalizeUpgrade".equals(cmd)) {
System.err.println("Usage: java DFSAdmin"
+ " [-finalizeUpgrade]");
} else if ("-upgradeProgress".equals(cmd)) {
System.err.println("Usage: java DFSAdmin"
+ " [-upgradeProgress status | details | force]");
} else if ("-metasave".equals(cmd)) {
System.err.println("Usage: java DFSAdmin"
+ " [-metasave filename]");
@ -969,7 +916,6 @@ private static void printUsage(String cmd) {
System.err.println(" [-restoreFailedStorage true|false|check]");
System.err.println(" [-refreshNodes]");
System.err.println(" [-finalizeUpgrade]");
System.err.println(" [-upgradeProgress status | details | force]");
System.err.println(" [-metasave filename]");
System.err.println(" [-refreshServiceAcl]");
System.err.println(" [-refreshUserToGroupsMappings]");
@ -1039,11 +985,6 @@ public int run(String[] argv) throws Exception {
printUsage(cmd);
return exitCode;
}
} else if ("-upgradeProgress".equals(cmd)) {
if (argv.length != 2) {
printUsage(cmd);
return exitCode;
}
} else if ("-metasave".equals(cmd)) {
if (argv.length != 2) {
printUsage(cmd);
@ -1113,8 +1054,6 @@ public int run(String[] argv) throws Exception {
exitCode = refreshNodes();
} else if ("-finalizeUpgrade".equals(cmd)) {
exitCode = finalizeUpgrade();
} else if ("-upgradeProgress".equals(cmd)) {
exitCode = upgradeProgress(argv, i);
} else if ("-metasave".equals(cmd)) {
exitCode = metaSave(argv, i);
} else if (ClearQuotaCommand.matches(cmd)) {

View File

@ -325,7 +325,7 @@ message RemoteEditLogManifestProto {
*/
message NamespaceInfoProto {
required string buildVersion = 1; // Software revision version (e.g. an svn or git revision)
required uint32 distUpgradeVersion = 2; // Distributed upgrade version
required uint32 unused = 2; // Retained for backward compatibility
required string blockPoolID = 3; // block pool used by the namespace
required StorageInfoProto storageInfo = 4;// Node information
required string softwareVersion = 5; // Software version number (e.g. 2.0.0)

View File

@ -75,9 +75,12 @@ protected String getTestFile() {
@After
@Override
public void tearDown() throws Exception {
if (null != fs)
if (fs != null) {
fs.close();
dfsCluster.shutdown();
}
if (dfsCluster != null) {
dfsCluster.shutdown();
}
Thread.sleep(2000);
super.tearDown();
}

View File

@ -381,14 +381,12 @@ public void testConvertBlockToken() {
@Test
public void testConvertNamespaceInfo() {
NamespaceInfo info = new NamespaceInfo(37, "clusterID", "bpID", 2300, 53);
NamespaceInfo info = new NamespaceInfo(37, "clusterID", "bpID", 2300);
NamespaceInfoProto proto = PBHelper.convert(info);
NamespaceInfo info2 = PBHelper.convert(proto);
compare(info, info2); //Compare the StorageInfo
assertEquals(info.getBlockPoolID(), info2.getBlockPoolID());
assertEquals(info.getBuildVersion(), info2.getBuildVersion());
assertEquals(info.getDistributedUpgradeVersion(),
info2.getDistributedUpgradeVersion());
}
private void compare(StorageInfo expected, StorageInfo actual) {
@ -440,7 +438,7 @@ public void testConvertDatanodeRegistration() {
DatanodeRegistration reg2 = PBHelper.convert(proto);
compare(reg.getStorageInfo(), reg2.getStorageInfo());
compare(reg.getExportedKeys(), reg2.getExportedKeys());
compare((DatanodeID)reg, (DatanodeID)reg2);
compare(reg, reg2);
assertEquals(reg.getSoftwareVersion(), reg2.getSoftwareVersion());
}

View File

@ -155,7 +155,7 @@ public void testCancelDelegationToken() throws Exception {
@Test
public void testAddDelegationTokensDFSApi() throws Exception {
UserGroupInformation ugi = UserGroupInformation.createRemoteUser("JobTracker");
DistributedFileSystem dfs = (DistributedFileSystem) cluster.getFileSystem();
DistributedFileSystem dfs = cluster.getFileSystem();
Credentials creds = new Credentials();
final Token<?> tokens[] = dfs.addDelegationTokens("JobTracker", creds);
Assert.assertEquals(1, tokens.length);
@ -198,7 +198,7 @@ public WebHdfsFileSystem run() throws Exception {
@SuppressWarnings("deprecation")
@Test
public void testDelegationTokenWithDoAs() throws Exception {
final DistributedFileSystem dfs = (DistributedFileSystem) cluster.getFileSystem();
final DistributedFileSystem dfs = cluster.getFileSystem();
final Credentials creds = new Credentials();
final Token<?> tokens[] = dfs.addDelegationTokens("JobTracker", creds);
Assert.assertEquals(1, tokens.length);
@ -212,8 +212,7 @@ public void testDelegationTokenWithDoAs() throws Exception {
longUgi.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws IOException {
final DistributedFileSystem dfs = (DistributedFileSystem) cluster
.getFileSystem();
final DistributedFileSystem dfs = cluster.getFileSystem();
try {
//try renew with long name
dfs.renewDelegationToken(token);
@ -226,8 +225,7 @@ public Object run() throws IOException {
shortUgi.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws IOException {
final DistributedFileSystem dfs = (DistributedFileSystem) cluster
.getFileSystem();
final DistributedFileSystem dfs = cluster.getFileSystem();
dfs.renewDelegationToken(token);
return null;
}
@ -235,8 +233,7 @@ public Object run() throws IOException {
longUgi.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws IOException {
final DistributedFileSystem dfs = (DistributedFileSystem) cluster
.getFileSystem();
final DistributedFileSystem dfs = cluster.getFileSystem();
try {
//try cancel with long name
dfs.cancelDelegationToken(token);
@ -273,7 +270,7 @@ public void testDTManagerInSafeMode() throws Exception {
NameNodeAdapter.getDtSecretManager(nn.getNamesystem());
assertFalse("Secret manager should not run in safe mode", sm.isRunning());
NameNodeAdapter.leaveSafeMode(nn, false);
NameNodeAdapter.leaveSafeMode(nn);
assertTrue("Secret manager should start when safe mode is exited",
sm.isRunning());

View File

@ -32,7 +32,6 @@
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
@ -111,10 +110,8 @@ private DatanodeProtocolClientSideTranslatorPB setupNNMock(int nnIdx)
throws Exception {
DatanodeProtocolClientSideTranslatorPB mock =
Mockito.mock(DatanodeProtocolClientSideTranslatorPB.class);
Mockito.doReturn(
new NamespaceInfo(1, FAKE_CLUSTERID, FAKE_BPID,
0, HdfsConstants.LAYOUT_VERSION))
.when(mock).versionRequest();
Mockito.doReturn(new NamespaceInfo(1, FAKE_CLUSTERID, FAKE_BPID, 0))
.when(mock).versionRequest();
Mockito.doReturn(DFSTestUtil.getLocalDatanodeRegistration())
.when(mock).registerDatanode(Mockito.any(DatanodeRegistration.class));
@ -229,10 +226,9 @@ public void testIgnoreDeletionsFromNonActive() throws Exception {
*/
@Test
public void testNNsFromDifferentClusters() throws Exception {
Mockito.doReturn(
new NamespaceInfo(1, "fake foreign cluster", FAKE_BPID,
0, HdfsConstants.LAYOUT_VERSION))
.when(mockNN1).versionRequest();
Mockito
.doReturn(new NamespaceInfo(1, "fake foreign cluster", FAKE_BPID, 0))
.when(mockNN1).versionRequest();
BPOfferService bpos = setupBPOSForNNs(mockNN1, mockNN2);
bpos.start();

View File

@ -147,7 +147,7 @@ public DatanodeRegistration answer(InvocationOnMock invocation)
Mockito.any(DatanodeRegistration.class));
when(namenode.versionRequest()).thenReturn(new NamespaceInfo
(1, CLUSTER_ID, POOL_ID, 1L, 1));
(1, CLUSTER_ID, POOL_ID, 1L));
when(namenode.sendHeartbeat(
Mockito.any(DatanodeRegistration.class),

View File

@ -239,8 +239,7 @@ public void runTest(int parallelism) throws Exception {
fds = DataNodeTestUtils.getFSDataset(cluster.getDataNodes().get(0));
CONF.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY,
parallelism);
DataNode dn = cluster.getDataNodes().get(0);
scanner = new DirectoryScanner(dn, fds, CONF);
scanner = new DirectoryScanner(fds, CONF);
scanner.setRetainDiffs(true);
// Add files with 100 blocks

View File

@ -81,9 +81,8 @@ public static void enterSafeMode(NameNode namenode, boolean resourcesLow)
namenode.getNamesystem().enterSafeMode(resourcesLow);
}
public static void leaveSafeMode(NameNode namenode, boolean checkForUpgrades)
throws SafeModeException {
namenode.getNamesystem().leaveSafeMode(checkForUpgrades);
public static void leaveSafeMode(NameNode namenode) {
namenode.getNamesystem().leaveSafeMode();
}
public static void abortEditLogs(NameNode nn) {

View File

@ -184,10 +184,7 @@ public void testEditLogRolling() throws Exception {
cluster.waitActive();
fileSys = cluster.getFileSystem();
final FSNamesystem namesystem = cluster.getNamesystem();
FSImage fsimage = namesystem.getFSImage();
FSEditLog editLog = fsimage.getEditLog();
StorageDirectory sd = fsimage.getStorage().getStorageDir(0);
startTransactionWorkers(namesystem, caughtErr);
@ -306,7 +303,7 @@ public void testSaveNamespace() throws Exception {
assertEquals(fsimage.getStorage().getMostRecentCheckpointTxId(),
editLog.getLastWrittenTxId() - 1);
namesystem.leaveSafeMode(false);
namesystem.leaveSafeMode();
LOG.info("Save " + i + ": complete");
}
} finally {

View File

@ -75,7 +75,7 @@ public void setupCluster() throws IOException {
}
@After
public void shutdownCluster() throws IOException {
public void shutdownCluster() {
if (cluster != null) {
cluster.shutdown();
}
@ -125,7 +125,7 @@ public void testDownloadingLaterCheckpoint() throws Exception {
// Make checkpoint
NameNodeAdapter.enterSafeMode(nn0, false);
NameNodeAdapter.saveNamespace(nn0);
NameNodeAdapter.leaveSafeMode(nn0, false);
NameNodeAdapter.leaveSafeMode(nn0);
long expectedCheckpointTxId = NameNodeAdapter.getNamesystem(nn0)
.getFSImage().getMostRecentCheckpointTxId();
assertEquals(6, expectedCheckpointTxId);

View File

@ -91,7 +91,7 @@ public void setupCluster() throws Exception {
}
@After
public void shutdownCluster() throws IOException {
public void shutdownCluster() {
if (cluster != null) {
cluster.shutdown();
}
@ -408,7 +408,7 @@ public void testBlocksDeletedInEditLog() throws Exception {
4*BLOCK_SIZE, (short) 3, 1L);
NameNodeAdapter.enterSafeMode(nn0, false);
NameNodeAdapter.saveNamespace(nn0);
NameNodeAdapter.leaveSafeMode(nn0, false);
NameNodeAdapter.leaveSafeMode(nn0);
// OP_ADD for 2 blocks
DFSTestUtil.createFile(fs, new Path("/test2"),

View File

@ -27,7 +27,6 @@
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.logging.Log;
@ -317,8 +316,7 @@ public void testLeasesRenewedOnTransition() throws Exception {
* Test that delegation tokens continue to work after the failover.
*/
@Test
public void testDelegationTokensAfterFailover() throws IOException,
URISyntaxException {
public void testDelegationTokensAfterFailover() throws IOException {
Configuration conf = new Configuration();
conf.setBoolean(
DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY, true);
@ -472,7 +470,7 @@ public void testSecretManagerState() throws Exception {
assertFalse(isDTRunning(nn));
banner("Transition 1->2. Should not start secret manager");
NameNodeAdapter.leaveSafeMode(nn, false);
NameNodeAdapter.leaveSafeMode(nn);
assertTrue(nn.isStandbyState());
assertFalse(nn.isInSafeMode());
assertFalse(isDTRunning(nn));
@ -497,7 +495,7 @@ public void testSecretManagerState() throws Exception {
banner("Transition 1->3->4. Should start secret manager.");
nn.getRpcServer().transitionToActive(REQ_INFO);
NameNodeAdapter.leaveSafeMode(nn, false);
NameNodeAdapter.leaveSafeMode(nn);
assertFalse(nn.isStandbyState());
assertFalse(nn.isInSafeMode());
assertTrue(isDTRunning(nn));
@ -509,7 +507,7 @@ public void testSecretManagerState() throws Exception {
assertFalse(isDTRunning(nn));
banner("Transition 3->4. Should start secret manager");
NameNodeAdapter.leaveSafeMode(nn, false);
NameNodeAdapter.leaveSafeMode(nn);
assertFalse(nn.isStandbyState());
assertFalse(nn.isInSafeMode());
assertTrue(isDTRunning(nn));

View File

@ -15269,29 +15269,6 @@
</comparators>
</test>
<test> <!--Tested -->
<description>help: help for dfsadmin upgradeProgress</description>
<test-commands>
<dfs-admin-command>-fs NAMENODE -help upgradeProgress</dfs-admin-command>
</test-commands>
<cleanup-commands>
</cleanup-commands>
<comparators>
<comparator>
<type>RegexpComparator</type>
<expected-output>^-upgradeProgress &lt;status\|details\|force&gt;:( )*</expected-output>
</comparator>
<comparator>
<type>RegexpComparator</type>
<expected-output>^( |\t)*request current distributed upgrade status,( )*</expected-output>
</comparator>
<comparator>
<type>RegexpComparator</type>
<expected-output>^( |\t)*a detailed status or force the upgrade to proceed.( )*</expected-output>
</comparator>
</comparators>
</test>
<test> <!--Tested -->
<description>help: help for dfsadmin metasave</description>
<test-commands>